1use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
27use crate::submit_options::SubmitOptions;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
35#[serde(rename_all = "lowercase")]
36pub enum ContentType {
37 Json,
40 #[default]
43 Binary,
44}
45
46impl ContentType {
47 #[must_use]
52 pub fn detect(content: &[u8]) -> Self {
53 if content.is_empty() {
55 return ContentType::Binary;
56 }
57
58 let first = content.iter().find(|b| !b.is_ascii_whitespace());
60 match first {
61 Some(b'{') | Some(b'[') | Some(b'"') => {
62 if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
64 ContentType::Json
65 } else {
66 ContentType::Binary
67 }
68 }
69 _ => ContentType::Binary
70 }
71 }
72
73 #[inline]
75 #[must_use]
76 pub fn is_json(&self) -> bool {
77 matches!(self, ContentType::Json)
78 }
79
80 #[inline]
82 #[must_use]
83 pub fn is_binary(&self) -> bool {
84 matches!(self, ContentType::Binary)
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SyncItem {
111 pub object_id: String,
113 pub version: u64,
115 pub updated_at: i64,
117 #[serde(default)]
121 pub content_type: ContentType,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
124 pub batch_id: Option<String>,
125 pub trace_parent: Option<String>,
130 pub trace_state: Option<String>,
132 #[doc(hidden)]
134 pub priority_score: f64,
135 pub merkle_root: String,
137 pub last_accessed: u64,
139 pub access_count: u64,
141 #[serde(with = "serde_bytes")]
143 pub content: Vec<u8>,
144 pub home_instance_id: Option<String>,
146 #[serde(default = "default_state")]
150 pub state: String,
151
152 #[serde(skip)]
155 pub(crate) submit_options: Option<SubmitOptions>,
156
157 #[serde(skip)]
159 cached_size: OnceLock<usize>,
160}
161
162fn default_state() -> String {
164 "default".to_string()
165}
166
167impl SyncItem {
168 pub fn new(object_id: String, content: Vec<u8>) -> Self {
189 let content_type = ContentType::detect(&content);
190 Self {
191 object_id,
192 version: 1,
193 updated_at: std::time::SystemTime::now()
194 .duration_since(std::time::UNIX_EPOCH)
195 .unwrap_or_default()
196 .as_millis() as i64,
197 content_type,
198 batch_id: None,
199 trace_parent: None,
200 trace_state: None,
201 priority_score: 0.0,
202 merkle_root: String::new(),
203 last_accessed: 0,
204 access_count: 0,
205 content,
206 home_instance_id: None,
207 state: "default".to_string(),
208 submit_options: None, cached_size: OnceLock::new(),
210 }
211 }
212
213 pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
218 let content = serde_json::to_vec(&value).unwrap_or_default();
219 let mut item = Self::new(object_id, content);
220 item.content_type = ContentType::Json; item
222 }
223
224 #[doc(hidden)]
229 #[allow(clippy::too_many_arguments)]
230 pub fn reconstruct(
231 object_id: String,
232 version: u64,
233 updated_at: i64,
234 content_type: ContentType,
235 content: Vec<u8>,
236 batch_id: Option<String>,
237 trace_parent: Option<String>,
238 merkle_root: String,
239 home_instance_id: Option<String>,
240 state: String,
241 ) -> Self {
242 Self {
243 object_id,
244 version,
245 updated_at,
246 content_type,
247 batch_id,
248 trace_parent,
249 trace_state: None,
250 priority_score: 0.0,
251 merkle_root,
252 last_accessed: 0,
253 access_count: 0,
254 content,
255 home_instance_id,
256 state,
257 submit_options: None,
258 cached_size: OnceLock::new(),
259 }
260 }
261
262 #[must_use]
277 pub fn with_options(mut self, options: SubmitOptions) -> Self {
278 self.submit_options = Some(options);
279 self
280 }
281
282 #[must_use]
296 pub fn with_state(mut self, state: impl Into<String>) -> Self {
297 self.state = state.into();
298 self
299 }
300
301 #[must_use]
303 pub fn effective_options(&self) -> SubmitOptions {
304 self.submit_options.clone().unwrap_or_default()
305 }
306
307 #[must_use]
311 pub fn content_as_json(&self) -> Option<serde_json::Value> {
312 serde_json::from_slice(&self.content).ok()
313 }
314
315 #[cfg(feature = "otel")]
317 pub fn with_current_trace_context(mut self) -> Self {
318 use opentelemetry::trace::TraceContextExt;
319 use tracing_opentelemetry::OpenTelemetrySpanExt;
320
321 let cx = tracing::Span::current().context();
322 let span_ref = cx.span();
323 let sc = span_ref.span_context();
324 if sc.is_valid() {
325 self.trace_parent = Some(format!(
326 "00-{}-{}-{:02x}",
327 sc.trace_id(),
328 sc.span_id(),
329 sc.trace_flags().to_u8()
330 ));
331 }
332 self
333 }
334}
335
336impl SizedItem for SyncItem {
337 fn size_bytes(&self) -> usize {
338 *self.cached_size.get_or_init(|| {
339 std::mem::size_of::<Self>()
341 + self.object_id.len()
342 + self.trace_parent.as_ref().map_or(0, String::len)
343 + self.trace_state.as_ref().map_or(0, String::len)
344 + self.merkle_root.len()
345 + self.content.len()
346 + self.home_instance_id.as_ref().map_or(0, String::len)
347 })
348 }
349}
350
351impl BatchableItem for SyncItem {
352 fn id(&self) -> &str {
353 &self.object_id
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use serde_json::json;
361
362 #[test]
363 fn test_new_sync_item() {
364 let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
365
366 assert_eq!(item.object_id, "test-id");
367 assert_eq!(item.version, 1);
368 assert!(item.updated_at > 0);
369 assert!(item.batch_id.is_none());
370 assert!(item.trace_parent.is_none());
371 assert!(item.trace_state.is_none());
372 assert_eq!(item.priority_score, 0.0);
373 assert!(item.merkle_root.is_empty());
374 assert_eq!(item.last_accessed, 0);
375 assert_eq!(item.access_count, 0);
376 assert!(item.home_instance_id.is_none());
377 assert_eq!(item.content, b"hello");
378 }
379
380 #[test]
381 fn test_from_json() {
382 let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
383
384 assert_eq!(item.object_id, "test-id");
385 let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
387 assert_eq!(parsed, json!({"key": "value"}));
388 }
389
390 #[test]
391 fn test_content_as_json() {
392 let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
393
394 let parsed = item.content_as_json().unwrap();
395 assert_eq!(parsed["nested"]["key"], 42);
396
397 let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
399 assert!(binary_item.content_as_json().is_none());
400 }
401
402 #[test]
403 fn test_size_bytes_calculation() {
404 let item = SyncItem::from_json(
405 "uk.nhs.patient.record.123456".to_string(),
406 json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
407 );
408
409 let size = item.size_bytes();
410
411 assert!(size > 0);
413
414 assert!(size > std::mem::size_of::<SyncItem>());
416 }
417
418 #[test]
419 fn test_size_bytes_cached() {
420 let item = SyncItem::new("test".to_string(), b"data".to_vec());
421
422 let size1 = item.size_bytes();
423 let size2 = item.size_bytes();
424
425 assert_eq!(size1, size2);
427 }
428
429 #[test]
430 fn test_size_includes_optional_fields() {
431 let mut item = SyncItem::new("test".to_string(), vec![]);
432
433 item.trace_parent = Some("00-abc123-def456-01".to_string());
435 item.trace_state = Some("vendor=data".to_string());
436 item.home_instance_id = Some("instance-1".to_string());
437
438 let size = item.size_bytes();
439
440 assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
444 }
445
446 #[test]
447 fn test_serialize_deserialize() {
448 let item = SyncItem::from_json(
449 "test-id".to_string(),
450 json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
451 );
452
453 let json_str = serde_json::to_string(&item).unwrap();
454 let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
455
456 assert_eq!(deserialized.object_id, item.object_id);
457 assert_eq!(deserialized.version, item.version);
458 assert_eq!(deserialized.content, item.content);
459 }
460
461 #[test]
462 fn test_serialize_skips_none_batch_id() {
463 let item = SyncItem::new("test".to_string(), vec![]);
464
465 let json_str = serde_json::to_string(&item).unwrap();
466
467 assert!(!json_str.contains("batch_id"));
469 }
470
471 #[test]
472 fn test_serialize_includes_batch_id_when_some() {
473 let mut item = SyncItem::new("test".to_string(), vec![]);
474 item.batch_id = Some("batch-123".to_string());
475
476 let json_str = serde_json::to_string(&item).unwrap();
477
478 assert!(json_str.contains("batch_id"));
479 assert!(json_str.contains("batch-123"));
480 }
481
482 #[test]
483 fn test_clone() {
484 let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
485 let cloned = item.clone();
486
487 assert_eq!(cloned.object_id, item.object_id);
488 assert_eq!(cloned.content, item.content);
489 }
490
491 #[test]
492 fn test_debug_format() {
493 let item = SyncItem::new("test".to_string(), vec![]);
494 let debug_str = format!("{:?}", item);
495
496 assert!(debug_str.contains("SyncItem"));
497 assert!(debug_str.contains("test"));
498 }
499
500 #[test]
501 fn test_updated_at_is_recent() {
502 let before = std::time::SystemTime::now()
503 .duration_since(std::time::UNIX_EPOCH)
504 .unwrap()
505 .as_millis() as i64;
506
507 let item = SyncItem::new("test".to_string(), vec![]);
508
509 let after = std::time::SystemTime::now()
510 .duration_since(std::time::UNIX_EPOCH)
511 .unwrap()
512 .as_millis() as i64;
513
514 assert!(item.updated_at >= before);
515 assert!(item.updated_at <= after);
516 }
517
518 #[test]
519 fn test_large_content_size() {
520 let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
522 let item = SyncItem::new("large".to_string(), large_data);
523
524 let size = item.size_bytes();
525
526 assert!(size > 10000, "Large content should result in large size");
528 }
529
530 #[test]
531 fn test_state_default() {
532 let item = SyncItem::new("test".to_string(), b"data".to_vec());
533 assert_eq!(item.state, "default");
534 }
535
536 #[test]
537 fn test_state_with_state_builder() {
538 let item = SyncItem::new("test".to_string(), b"data".to_vec())
539 .with_state("delta");
540 assert_eq!(item.state, "delta");
541 }
542
543 #[test]
544 fn test_state_with_state_chaining() {
545 let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
546 .with_state("pending");
547
548 assert_eq!(item.state, "pending");
549 assert_eq!(item.object_id, "test");
550 }
551
552 #[test]
553 fn test_state_serialization() {
554 let item = SyncItem::new("test".to_string(), b"data".to_vec())
555 .with_state("custom_state");
556
557 let json = serde_json::to_string(&item).unwrap();
558 assert!(json.contains("\"state\":\"custom_state\""));
559
560 let parsed: SyncItem = serde_json::from_str(&json).unwrap();
562 assert_eq!(parsed.state, "custom_state");
563 }
564
565 #[test]
566 fn test_state_deserialization_default() {
567 let json = r#"{
569 "object_id": "test",
570 "version": 1,
571 "updated_at": 12345,
572 "priority_score": 0.0,
573 "merkle_root": "",
574 "last_accessed": 0,
575 "access_count": 0,
576 "content": [100, 97, 116, 97]
577 }"#;
578
579 let item: SyncItem = serde_json::from_str(json).unwrap();
580 assert_eq!(item.state, "default");
581 }
582}