1use std::sync::OnceLock;
28use serde::{Deserialize, Serialize};
29use sha2::{Sha256, Digest};
30use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
31use crate::submit_options::SubmitOptions;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
39#[serde(rename_all = "lowercase")]
40pub enum ContentType {
41 Json,
44 #[default]
47 Binary,
48}
49
50impl ContentType {
51 #[must_use]
56 pub fn detect(content: &[u8]) -> Self {
57 if content.is_empty() {
59 return ContentType::Binary;
60 }
61
62 let first = content.iter().find(|b| !b.is_ascii_whitespace());
64 match first {
65 Some(b'{') | Some(b'[') | Some(b'"') => {
66 if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
68 ContentType::Json
69 } else {
70 ContentType::Binary
71 }
72 }
73 _ => ContentType::Binary
74 }
75 }
76
77 #[inline]
79 #[must_use]
80 pub fn is_json(&self) -> bool {
81 matches!(self, ContentType::Json)
82 }
83
84 #[inline]
86 #[must_use]
87 pub fn is_binary(&self) -> bool {
88 matches!(self, ContentType::Binary)
89 }
90
91 #[inline]
93 #[must_use]
94 pub fn as_str(&self) -> &'static str {
95 match self {
96 ContentType::Json => "json",
97 ContentType::Binary => "binary",
98 }
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SyncItem {
125 pub object_id: String,
127 pub version: u64,
129 pub updated_at: i64,
131 #[serde(default)]
135 pub content_type: ContentType,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub batch_id: Option<String>,
139 pub trace_parent: Option<String>,
144 pub trace_state: Option<String>,
146 #[doc(hidden)]
148 pub priority_score: f64,
149 #[serde(alias = "merkle_root")] pub content_hash: String,
153 pub last_accessed: u64,
155 pub access_count: u64,
157 #[serde(with = "serde_bytes")]
159 pub content: Vec<u8>,
160 pub home_instance_id: Option<String>,
162 #[serde(default = "default_state")]
166 pub state: String,
167
168 #[serde(skip)]
171 pub(crate) submit_options: Option<SubmitOptions>,
172
173 #[serde(skip)]
175 cached_size: OnceLock<usize>,
176}
177
178fn default_state() -> String {
180 "default".to_string()
181}
182
183impl SyncItem {
184 pub fn new(object_id: String, content: Vec<u8>) -> Self {
205 let content_type = ContentType::detect(&content);
206 let content_hash = hex::encode(Sha256::digest(&content));
208 Self {
209 object_id,
210 version: 1,
211 updated_at: std::time::SystemTime::now()
212 .duration_since(std::time::UNIX_EPOCH)
213 .unwrap_or_default()
214 .as_millis() as i64,
215 content_type,
216 batch_id: None,
217 trace_parent: None,
218 trace_state: None,
219 priority_score: 0.0,
220 content_hash,
221 last_accessed: 0,
222 access_count: 0,
223 content,
224 home_instance_id: None,
225 state: "default".to_string(),
226 submit_options: None, cached_size: OnceLock::new(),
228 }
229 }
230
231 pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
236 let content = serde_json::to_vec(&value).unwrap_or_default();
237 let mut item = Self::new(object_id, content);
238 item.content_type = ContentType::Json; item
240 }
241
242 pub fn from_serializable<T: Serialize>(object_id: String, value: &T) -> Result<Self, serde_json::Error> {
247 let content = serde_json::to_vec(value)?;
248 let mut item = Self::new(object_id, content);
249 item.content_type = ContentType::Json;
250 Ok(item)
251 }
252
253 #[doc(hidden)]
258 #[allow(clippy::too_many_arguments)]
259 pub fn reconstruct(
260 object_id: String,
261 version: u64,
262 updated_at: i64,
263 content_type: ContentType,
264 content: Vec<u8>,
265 batch_id: Option<String>,
266 trace_parent: Option<String>,
267 content_hash: String,
268 home_instance_id: Option<String>,
269 state: String,
270 access_count: u64,
271 last_accessed: u64,
272 ) -> Self {
273 Self {
274 object_id,
275 version,
276 updated_at,
277 content_type,
278 batch_id,
279 trace_parent,
280 trace_state: None,
281 priority_score: 0.0,
282 content_hash,
283 last_accessed,
284 access_count,
285 content,
286 home_instance_id,
287 state,
288 submit_options: None,
289 cached_size: OnceLock::new(),
290 }
291 }
292
293 #[must_use]
308 pub fn with_options(mut self, options: SubmitOptions) -> Self {
309 self.submit_options = Some(options);
310 self
311 }
312
313 #[must_use]
327 pub fn with_state(mut self, state: impl Into<String>) -> Self {
328 self.state = state.into();
329 self
330 }
331
332 #[must_use]
334 pub fn effective_options(&self) -> SubmitOptions {
335 self.submit_options.clone().unwrap_or_default()
336 }
337
338 #[must_use]
342 pub fn content_as_json(&self) -> Option<serde_json::Value> {
343 serde_json::from_slice(&self.content).ok()
344 }
345
346 #[cfg(feature = "otel")]
348 pub fn with_current_trace_context(mut self) -> Self {
349 use opentelemetry::trace::TraceContextExt;
350 use tracing_opentelemetry::OpenTelemetrySpanExt;
351
352 let cx = tracing::Span::current().context();
353 let span_ref = cx.span();
354 let sc = span_ref.span_context();
355 if sc.is_valid() {
356 self.trace_parent = Some(format!(
357 "00-{}-{}-{:02x}",
358 sc.trace_id(),
359 sc.span_id(),
360 sc.trace_flags().to_u8()
361 ));
362 }
363 self
364 }
365}
366
367impl SizedItem for SyncItem {
368 fn size_bytes(&self) -> usize {
369 *self.cached_size.get_or_init(|| {
370 std::mem::size_of::<Self>()
372 + self.object_id.len()
373 + self.trace_parent.as_ref().map_or(0, String::len)
374 + self.trace_state.as_ref().map_or(0, String::len)
375 + self.content_hash.len()
376 + self.content.len()
377 + self.home_instance_id.as_ref().map_or(0, String::len)
378 })
379 }
380}
381
382impl BatchableItem for SyncItem {
383 fn id(&self) -> &str {
384 &self.object_id
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use serde_json::json;
392
393 #[test]
394 fn test_new_sync_item() {
395 let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
396
397 assert_eq!(item.object_id, "test-id");
398 assert_eq!(item.version, 1);
399 assert!(item.updated_at > 0);
400 assert!(item.batch_id.is_none());
401 assert!(item.trace_parent.is_none());
402 assert!(item.trace_state.is_none());
403 assert_eq!(item.priority_score, 0.0);
404 assert_eq!(item.content_hash, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824");
407 assert_eq!(item.last_accessed, 0);
408 assert_eq!(item.access_count, 0);
409 assert!(item.home_instance_id.is_none());
410 assert_eq!(item.content, b"hello");
411 }
412
413 #[test]
414 fn test_from_json() {
415 let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
416
417 assert_eq!(item.object_id, "test-id");
418 let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
420 assert_eq!(parsed, json!({"key": "value"}));
421 }
422
423 #[test]
424 fn test_content_as_json() {
425 let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
426
427 let parsed = item.content_as_json().unwrap();
428 assert_eq!(parsed["nested"]["key"], 42);
429
430 let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
432 assert!(binary_item.content_as_json().is_none());
433 }
434
435 #[test]
436 fn test_size_bytes_calculation() {
437 let item = SyncItem::from_json(
438 "uk.nhs.patient.record.123456".to_string(),
439 json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
440 );
441
442 let size = item.size_bytes();
443
444 assert!(size > 0);
446
447 assert!(size > std::mem::size_of::<SyncItem>());
449 }
450
451 #[test]
452 fn test_size_bytes_cached() {
453 let item = SyncItem::new("test".to_string(), b"data".to_vec());
454
455 let size1 = item.size_bytes();
456 let size2 = item.size_bytes();
457
458 assert_eq!(size1, size2);
460 }
461
462 #[test]
463 fn test_size_includes_optional_fields() {
464 let mut item = SyncItem::new("test".to_string(), vec![]);
465
466 item.trace_parent = Some("00-abc123-def456-01".to_string());
468 item.trace_state = Some("vendor=data".to_string());
469 item.home_instance_id = Some("instance-1".to_string());
470
471 let size = item.size_bytes();
472
473 assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
477 }
478
479 #[test]
480 fn test_serialize_deserialize() {
481 let item = SyncItem::from_json(
482 "test-id".to_string(),
483 json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
484 );
485
486 let json_str = serde_json::to_string(&item).unwrap();
487 let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
488
489 assert_eq!(deserialized.object_id, item.object_id);
490 assert_eq!(deserialized.version, item.version);
491 assert_eq!(deserialized.content, item.content);
492 }
493
494 #[test]
495 fn test_serialize_skips_none_batch_id() {
496 let item = SyncItem::new("test".to_string(), vec![]);
497
498 let json_str = serde_json::to_string(&item).unwrap();
499
500 assert!(!json_str.contains("batch_id"));
502 }
503
504 #[test]
505 fn test_serialize_includes_batch_id_when_some() {
506 let mut item = SyncItem::new("test".to_string(), vec![]);
507 item.batch_id = Some("batch-123".to_string());
508
509 let json_str = serde_json::to_string(&item).unwrap();
510
511 assert!(json_str.contains("batch_id"));
512 assert!(json_str.contains("batch-123"));
513 }
514
515 #[test]
516 fn test_clone() {
517 let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
518 let cloned = item.clone();
519
520 assert_eq!(cloned.object_id, item.object_id);
521 assert_eq!(cloned.content, item.content);
522 }
523
524 #[test]
525 fn test_debug_format() {
526 let item = SyncItem::new("test".to_string(), vec![]);
527 let debug_str = format!("{:?}", item);
528
529 assert!(debug_str.contains("SyncItem"));
530 assert!(debug_str.contains("test"));
531 }
532
533 #[test]
534 fn test_updated_at_is_recent() {
535 let before = std::time::SystemTime::now()
536 .duration_since(std::time::UNIX_EPOCH)
537 .unwrap()
538 .as_millis() as i64;
539
540 let item = SyncItem::new("test".to_string(), vec![]);
541
542 let after = std::time::SystemTime::now()
543 .duration_since(std::time::UNIX_EPOCH)
544 .unwrap()
545 .as_millis() as i64;
546
547 assert!(item.updated_at >= before);
548 assert!(item.updated_at <= after);
549 }
550
551 #[test]
552 fn test_large_content_size() {
553 let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
555 let item = SyncItem::new("large".to_string(), large_data);
556
557 let size = item.size_bytes();
558
559 assert!(size > 10000, "Large content should result in large size");
561 }
562
563 #[test]
564 fn test_state_default() {
565 let item = SyncItem::new("test".to_string(), b"data".to_vec());
566 assert_eq!(item.state, "default");
567 }
568
569 #[test]
570 fn test_state_with_state_builder() {
571 let item = SyncItem::new("test".to_string(), b"data".to_vec())
572 .with_state("delta");
573 assert_eq!(item.state, "delta");
574 }
575
576 #[test]
577 fn test_state_with_state_chaining() {
578 let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
579 .with_state("pending");
580
581 assert_eq!(item.state, "pending");
582 assert_eq!(item.object_id, "test");
583 }
584
585 #[test]
586 fn test_state_serialization() {
587 let item = SyncItem::new("test".to_string(), b"data".to_vec())
588 .with_state("custom_state");
589
590 let json = serde_json::to_string(&item).unwrap();
591 assert!(json.contains("\"state\":\"custom_state\""));
592
593 let parsed: SyncItem = serde_json::from_str(&json).unwrap();
595 assert_eq!(parsed.state, "custom_state");
596 }
597
598 #[test]
599 fn test_state_deserialization_default() {
600 let json = r#"{
603 "object_id": "test",
604 "version": 1,
605 "updated_at": 12345,
606 "priority_score": 0.0,
607 "merkle_root": "",
608 "last_accessed": 0,
609 "access_count": 0,
610 "content": [100, 97, 116, 97]
611 }"#;
612
613 let item: SyncItem = serde_json::from_str(json).unwrap();
614 assert_eq!(item.state, "default");
615 }
616}