1use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use sha2::{Sha256, Digest};
27use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
28use crate::submit_options::SubmitOptions;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
36#[serde(rename_all = "lowercase")]
37pub enum ContentType {
38 Json,
41 #[default]
44 Binary,
45}
46
47impl ContentType {
48 #[must_use]
53 pub fn detect(content: &[u8]) -> Self {
54 if content.is_empty() {
56 return ContentType::Binary;
57 }
58
59 let first = content.iter().find(|b| !b.is_ascii_whitespace());
61 match first {
62 Some(b'{') | Some(b'[') | Some(b'"') => {
63 if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
65 ContentType::Json
66 } else {
67 ContentType::Binary
68 }
69 }
70 _ => ContentType::Binary
71 }
72 }
73
74 #[inline]
76 #[must_use]
77 pub fn is_json(&self) -> bool {
78 matches!(self, ContentType::Json)
79 }
80
81 #[inline]
83 #[must_use]
84 pub fn is_binary(&self) -> bool {
85 matches!(self, ContentType::Binary)
86 }
87
88 #[inline]
90 #[must_use]
91 pub fn as_str(&self) -> &'static str {
92 match self {
93 ContentType::Json => "json",
94 ContentType::Binary => "binary",
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct SyncItem {
122 pub object_id: String,
124 pub version: u64,
126 pub updated_at: i64,
128 #[serde(default)]
132 pub content_type: ContentType,
133 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub batch_id: Option<String>,
136 pub trace_parent: Option<String>,
141 pub trace_state: Option<String>,
143 #[doc(hidden)]
145 pub priority_score: f64,
146 #[serde(alias = "merkle_root")] pub content_hash: String,
150 pub last_accessed: u64,
152 pub access_count: u64,
154 #[serde(with = "serde_bytes")]
156 pub content: Vec<u8>,
157 pub home_instance_id: Option<String>,
159 #[serde(default = "default_state")]
163 pub state: String,
164
165 #[serde(skip)]
168 pub(crate) submit_options: Option<SubmitOptions>,
169
170 #[serde(skip)]
172 cached_size: OnceLock<usize>,
173}
174
175fn default_state() -> String {
177 "default".to_string()
178}
179
180impl SyncItem {
181 pub fn new(object_id: String, content: Vec<u8>) -> Self {
202 let content_type = ContentType::detect(&content);
203 let content_hash = hex::encode(Sha256::digest(&content));
205 Self {
206 object_id,
207 version: 1,
208 updated_at: std::time::SystemTime::now()
209 .duration_since(std::time::UNIX_EPOCH)
210 .unwrap_or_default()
211 .as_millis() as i64,
212 content_type,
213 batch_id: None,
214 trace_parent: None,
215 trace_state: None,
216 priority_score: 0.0,
217 content_hash,
218 last_accessed: 0,
219 access_count: 0,
220 content,
221 home_instance_id: None,
222 state: "default".to_string(),
223 submit_options: None, cached_size: OnceLock::new(),
225 }
226 }
227
228 pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
233 let content = serde_json::to_vec(&value).unwrap_or_default();
234 let mut item = Self::new(object_id, content);
235 item.content_type = ContentType::Json; item
237 }
238
239 pub fn from_serializable<T: Serialize>(object_id: String, value: &T) -> Result<Self, serde_json::Error> {
244 let content = serde_json::to_vec(value)?;
245 let mut item = Self::new(object_id, content);
246 item.content_type = ContentType::Json;
247 Ok(item)
248 }
249
250 #[doc(hidden)]
255 #[allow(clippy::too_many_arguments)]
256 pub fn reconstruct(
257 object_id: String,
258 version: u64,
259 updated_at: i64,
260 content_type: ContentType,
261 content: Vec<u8>,
262 batch_id: Option<String>,
263 trace_parent: Option<String>,
264 content_hash: String,
265 home_instance_id: Option<String>,
266 state: String,
267 ) -> Self {
268 Self {
269 object_id,
270 version,
271 updated_at,
272 content_type,
273 batch_id,
274 trace_parent,
275 trace_state: None,
276 priority_score: 0.0,
277 content_hash,
278 last_accessed: 0,
279 access_count: 0,
280 content,
281 home_instance_id,
282 state,
283 submit_options: None,
284 cached_size: OnceLock::new(),
285 }
286 }
287
288 #[must_use]
303 pub fn with_options(mut self, options: SubmitOptions) -> Self {
304 self.submit_options = Some(options);
305 self
306 }
307
308 #[must_use]
322 pub fn with_state(mut self, state: impl Into<String>) -> Self {
323 self.state = state.into();
324 self
325 }
326
327 #[must_use]
329 pub fn effective_options(&self) -> SubmitOptions {
330 self.submit_options.clone().unwrap_or_default()
331 }
332
333 #[must_use]
337 pub fn content_as_json(&self) -> Option<serde_json::Value> {
338 serde_json::from_slice(&self.content).ok()
339 }
340
341 #[cfg(feature = "otel")]
343 pub fn with_current_trace_context(mut self) -> Self {
344 use opentelemetry::trace::TraceContextExt;
345 use tracing_opentelemetry::OpenTelemetrySpanExt;
346
347 let cx = tracing::Span::current().context();
348 let span_ref = cx.span();
349 let sc = span_ref.span_context();
350 if sc.is_valid() {
351 self.trace_parent = Some(format!(
352 "00-{}-{}-{:02x}",
353 sc.trace_id(),
354 sc.span_id(),
355 sc.trace_flags().to_u8()
356 ));
357 }
358 self
359 }
360}
361
362impl SizedItem for SyncItem {
363 fn size_bytes(&self) -> usize {
364 *self.cached_size.get_or_init(|| {
365 std::mem::size_of::<Self>()
367 + self.object_id.len()
368 + self.trace_parent.as_ref().map_or(0, String::len)
369 + self.trace_state.as_ref().map_or(0, String::len)
370 + self.content_hash.len()
371 + self.content.len()
372 + self.home_instance_id.as_ref().map_or(0, String::len)
373 })
374 }
375}
376
377impl BatchableItem for SyncItem {
378 fn id(&self) -> &str {
379 &self.object_id
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use serde_json::json;
387
388 #[test]
389 fn test_new_sync_item() {
390 let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
391
392 assert_eq!(item.object_id, "test-id");
393 assert_eq!(item.version, 1);
394 assert!(item.updated_at > 0);
395 assert!(item.batch_id.is_none());
396 assert!(item.trace_parent.is_none());
397 assert!(item.trace_state.is_none());
398 assert_eq!(item.priority_score, 0.0);
399 assert_eq!(item.content_hash, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824");
402 assert_eq!(item.last_accessed, 0);
403 assert_eq!(item.access_count, 0);
404 assert!(item.home_instance_id.is_none());
405 assert_eq!(item.content, b"hello");
406 }
407
408 #[test]
409 fn test_from_json() {
410 let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
411
412 assert_eq!(item.object_id, "test-id");
413 let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
415 assert_eq!(parsed, json!({"key": "value"}));
416 }
417
418 #[test]
419 fn test_content_as_json() {
420 let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
421
422 let parsed = item.content_as_json().unwrap();
423 assert_eq!(parsed["nested"]["key"], 42);
424
425 let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
427 assert!(binary_item.content_as_json().is_none());
428 }
429
430 #[test]
431 fn test_size_bytes_calculation() {
432 let item = SyncItem::from_json(
433 "uk.nhs.patient.record.123456".to_string(),
434 json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
435 );
436
437 let size = item.size_bytes();
438
439 assert!(size > 0);
441
442 assert!(size > std::mem::size_of::<SyncItem>());
444 }
445
446 #[test]
447 fn test_size_bytes_cached() {
448 let item = SyncItem::new("test".to_string(), b"data".to_vec());
449
450 let size1 = item.size_bytes();
451 let size2 = item.size_bytes();
452
453 assert_eq!(size1, size2);
455 }
456
457 #[test]
458 fn test_size_includes_optional_fields() {
459 let mut item = SyncItem::new("test".to_string(), vec![]);
460
461 item.trace_parent = Some("00-abc123-def456-01".to_string());
463 item.trace_state = Some("vendor=data".to_string());
464 item.home_instance_id = Some("instance-1".to_string());
465
466 let size = item.size_bytes();
467
468 assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
472 }
473
474 #[test]
475 fn test_serialize_deserialize() {
476 let item = SyncItem::from_json(
477 "test-id".to_string(),
478 json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
479 );
480
481 let json_str = serde_json::to_string(&item).unwrap();
482 let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
483
484 assert_eq!(deserialized.object_id, item.object_id);
485 assert_eq!(deserialized.version, item.version);
486 assert_eq!(deserialized.content, item.content);
487 }
488
489 #[test]
490 fn test_serialize_skips_none_batch_id() {
491 let item = SyncItem::new("test".to_string(), vec![]);
492
493 let json_str = serde_json::to_string(&item).unwrap();
494
495 assert!(!json_str.contains("batch_id"));
497 }
498
499 #[test]
500 fn test_serialize_includes_batch_id_when_some() {
501 let mut item = SyncItem::new("test".to_string(), vec![]);
502 item.batch_id = Some("batch-123".to_string());
503
504 let json_str = serde_json::to_string(&item).unwrap();
505
506 assert!(json_str.contains("batch_id"));
507 assert!(json_str.contains("batch-123"));
508 }
509
510 #[test]
511 fn test_clone() {
512 let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
513 let cloned = item.clone();
514
515 assert_eq!(cloned.object_id, item.object_id);
516 assert_eq!(cloned.content, item.content);
517 }
518
519 #[test]
520 fn test_debug_format() {
521 let item = SyncItem::new("test".to_string(), vec![]);
522 let debug_str = format!("{:?}", item);
523
524 assert!(debug_str.contains("SyncItem"));
525 assert!(debug_str.contains("test"));
526 }
527
528 #[test]
529 fn test_updated_at_is_recent() {
530 let before = std::time::SystemTime::now()
531 .duration_since(std::time::UNIX_EPOCH)
532 .unwrap()
533 .as_millis() as i64;
534
535 let item = SyncItem::new("test".to_string(), vec![]);
536
537 let after = std::time::SystemTime::now()
538 .duration_since(std::time::UNIX_EPOCH)
539 .unwrap()
540 .as_millis() as i64;
541
542 assert!(item.updated_at >= before);
543 assert!(item.updated_at <= after);
544 }
545
546 #[test]
547 fn test_large_content_size() {
548 let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
550 let item = SyncItem::new("large".to_string(), large_data);
551
552 let size = item.size_bytes();
553
554 assert!(size > 10000, "Large content should result in large size");
556 }
557
558 #[test]
559 fn test_state_default() {
560 let item = SyncItem::new("test".to_string(), b"data".to_vec());
561 assert_eq!(item.state, "default");
562 }
563
564 #[test]
565 fn test_state_with_state_builder() {
566 let item = SyncItem::new("test".to_string(), b"data".to_vec())
567 .with_state("delta");
568 assert_eq!(item.state, "delta");
569 }
570
571 #[test]
572 fn test_state_with_state_chaining() {
573 let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
574 .with_state("pending");
575
576 assert_eq!(item.state, "pending");
577 assert_eq!(item.object_id, "test");
578 }
579
580 #[test]
581 fn test_state_serialization() {
582 let item = SyncItem::new("test".to_string(), b"data".to_vec())
583 .with_state("custom_state");
584
585 let json = serde_json::to_string(&item).unwrap();
586 assert!(json.contains("\"state\":\"custom_state\""));
587
588 let parsed: SyncItem = serde_json::from_str(&json).unwrap();
590 assert_eq!(parsed.state, "custom_state");
591 }
592
593 #[test]
594 fn test_state_deserialization_default() {
595 let json = r#"{
598 "object_id": "test",
599 "version": 1,
600 "updated_at": 12345,
601 "priority_score": 0.0,
602 "merkle_root": "",
603 "last_accessed": 0,
604 "access_count": 0,
605 "content": [100, 97, 116, 97]
606 }"#;
607
608 let item: SyncItem = serde_json::from_str(json).unwrap();
609 assert_eq!(item.state, "default");
610 }
611}