1use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
27use crate::submit_options::SubmitOptions;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
35#[serde(rename_all = "lowercase")]
36pub enum ContentType {
37 Json,
40 #[default]
43 Binary,
44}
45
46impl ContentType {
47 #[must_use]
52 pub fn detect(content: &[u8]) -> Self {
53 if content.is_empty() {
55 return ContentType::Binary;
56 }
57
58 let first = content.iter().find(|b| !b.is_ascii_whitespace());
60 match first {
61 Some(b'{') | Some(b'[') | Some(b'"') => {
62 if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
64 ContentType::Json
65 } else {
66 ContentType::Binary
67 }
68 }
69 _ => ContentType::Binary
70 }
71 }
72
73 #[inline]
75 #[must_use]
76 pub fn is_json(&self) -> bool {
77 matches!(self, ContentType::Json)
78 }
79
80 #[inline]
82 #[must_use]
83 pub fn is_binary(&self) -> bool {
84 matches!(self, ContentType::Binary)
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SyncItem {
111 pub object_id: String,
113 pub version: u64,
115 pub updated_at: i64,
117 #[serde(default)]
121 pub content_type: ContentType,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
124 pub batch_id: Option<String>,
125 pub trace_parent: Option<String>,
130 pub trace_state: Option<String>,
132 #[doc(hidden)]
134 pub priority_score: f64,
135 pub merkle_root: String,
137 pub last_accessed: u64,
139 pub access_count: u64,
141 #[serde(with = "serde_bytes")]
143 pub content: Vec<u8>,
144 pub home_instance_id: Option<String>,
146 #[serde(default = "default_state")]
150 pub state: String,
151
152 #[serde(skip)]
155 pub(crate) submit_options: Option<SubmitOptions>,
156
157 #[serde(skip)]
159 cached_size: OnceLock<usize>,
160}
161
162fn default_state() -> String {
164 "default".to_string()
165}
166
167impl SyncItem {
168 pub fn new(object_id: String, content: Vec<u8>) -> Self {
189 let content_type = ContentType::detect(&content);
190 Self {
191 object_id,
192 version: 1,
193 updated_at: std::time::SystemTime::now()
194 .duration_since(std::time::UNIX_EPOCH)
195 .unwrap_or_default()
196 .as_millis() as i64,
197 content_type,
198 batch_id: None,
199 trace_parent: None,
200 trace_state: None,
201 priority_score: 0.0,
202 merkle_root: String::new(),
203 last_accessed: 0,
204 access_count: 0,
205 content,
206 home_instance_id: None,
207 state: "default".to_string(),
208 submit_options: None, cached_size: OnceLock::new(),
210 }
211 }
212
213 pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
218 let content = serde_json::to_vec(&value).unwrap_or_default();
219 let mut item = Self::new(object_id, content);
220 item.content_type = ContentType::Json; item
222 }
223
224 pub fn from_serializable<T: Serialize>(object_id: String, value: &T) -> Result<Self, serde_json::Error> {
229 let content = serde_json::to_vec(value)?;
230 let mut item = Self::new(object_id, content);
231 item.content_type = ContentType::Json;
232 Ok(item)
233 }
234
235 #[doc(hidden)]
240 #[allow(clippy::too_many_arguments)]
241 pub fn reconstruct(
242 object_id: String,
243 version: u64,
244 updated_at: i64,
245 content_type: ContentType,
246 content: Vec<u8>,
247 batch_id: Option<String>,
248 trace_parent: Option<String>,
249 merkle_root: String,
250 home_instance_id: Option<String>,
251 state: String,
252 ) -> Self {
253 Self {
254 object_id,
255 version,
256 updated_at,
257 content_type,
258 batch_id,
259 trace_parent,
260 trace_state: None,
261 priority_score: 0.0,
262 merkle_root,
263 last_accessed: 0,
264 access_count: 0,
265 content,
266 home_instance_id,
267 state,
268 submit_options: None,
269 cached_size: OnceLock::new(),
270 }
271 }
272
273 #[must_use]
288 pub fn with_options(mut self, options: SubmitOptions) -> Self {
289 self.submit_options = Some(options);
290 self
291 }
292
293 #[must_use]
307 pub fn with_state(mut self, state: impl Into<String>) -> Self {
308 self.state = state.into();
309 self
310 }
311
312 #[must_use]
314 pub fn effective_options(&self) -> SubmitOptions {
315 self.submit_options.clone().unwrap_or_default()
316 }
317
318 #[must_use]
322 pub fn content_as_json(&self) -> Option<serde_json::Value> {
323 serde_json::from_slice(&self.content).ok()
324 }
325
326 #[cfg(feature = "otel")]
328 pub fn with_current_trace_context(mut self) -> Self {
329 use opentelemetry::trace::TraceContextExt;
330 use tracing_opentelemetry::OpenTelemetrySpanExt;
331
332 let cx = tracing::Span::current().context();
333 let span_ref = cx.span();
334 let sc = span_ref.span_context();
335 if sc.is_valid() {
336 self.trace_parent = Some(format!(
337 "00-{}-{}-{:02x}",
338 sc.trace_id(),
339 sc.span_id(),
340 sc.trace_flags().to_u8()
341 ));
342 }
343 self
344 }
345}
346
347impl SizedItem for SyncItem {
348 fn size_bytes(&self) -> usize {
349 *self.cached_size.get_or_init(|| {
350 std::mem::size_of::<Self>()
352 + self.object_id.len()
353 + self.trace_parent.as_ref().map_or(0, String::len)
354 + self.trace_state.as_ref().map_or(0, String::len)
355 + self.merkle_root.len()
356 + self.content.len()
357 + self.home_instance_id.as_ref().map_or(0, String::len)
358 })
359 }
360}
361
362impl BatchableItem for SyncItem {
363 fn id(&self) -> &str {
364 &self.object_id
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use serde_json::json;
372
373 #[test]
374 fn test_new_sync_item() {
375 let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
376
377 assert_eq!(item.object_id, "test-id");
378 assert_eq!(item.version, 1);
379 assert!(item.updated_at > 0);
380 assert!(item.batch_id.is_none());
381 assert!(item.trace_parent.is_none());
382 assert!(item.trace_state.is_none());
383 assert_eq!(item.priority_score, 0.0);
384 assert!(item.merkle_root.is_empty());
385 assert_eq!(item.last_accessed, 0);
386 assert_eq!(item.access_count, 0);
387 assert!(item.home_instance_id.is_none());
388 assert_eq!(item.content, b"hello");
389 }
390
391 #[test]
392 fn test_from_json() {
393 let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
394
395 assert_eq!(item.object_id, "test-id");
396 let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
398 assert_eq!(parsed, json!({"key": "value"}));
399 }
400
401 #[test]
402 fn test_content_as_json() {
403 let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
404
405 let parsed = item.content_as_json().unwrap();
406 assert_eq!(parsed["nested"]["key"], 42);
407
408 let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
410 assert!(binary_item.content_as_json().is_none());
411 }
412
413 #[test]
414 fn test_size_bytes_calculation() {
415 let item = SyncItem::from_json(
416 "uk.nhs.patient.record.123456".to_string(),
417 json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
418 );
419
420 let size = item.size_bytes();
421
422 assert!(size > 0);
424
425 assert!(size > std::mem::size_of::<SyncItem>());
427 }
428
429 #[test]
430 fn test_size_bytes_cached() {
431 let item = SyncItem::new("test".to_string(), b"data".to_vec());
432
433 let size1 = item.size_bytes();
434 let size2 = item.size_bytes();
435
436 assert_eq!(size1, size2);
438 }
439
440 #[test]
441 fn test_size_includes_optional_fields() {
442 let mut item = SyncItem::new("test".to_string(), vec![]);
443
444 item.trace_parent = Some("00-abc123-def456-01".to_string());
446 item.trace_state = Some("vendor=data".to_string());
447 item.home_instance_id = Some("instance-1".to_string());
448
449 let size = item.size_bytes();
450
451 assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
455 }
456
457 #[test]
458 fn test_serialize_deserialize() {
459 let item = SyncItem::from_json(
460 "test-id".to_string(),
461 json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
462 );
463
464 let json_str = serde_json::to_string(&item).unwrap();
465 let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
466
467 assert_eq!(deserialized.object_id, item.object_id);
468 assert_eq!(deserialized.version, item.version);
469 assert_eq!(deserialized.content, item.content);
470 }
471
472 #[test]
473 fn test_serialize_skips_none_batch_id() {
474 let item = SyncItem::new("test".to_string(), vec![]);
475
476 let json_str = serde_json::to_string(&item).unwrap();
477
478 assert!(!json_str.contains("batch_id"));
480 }
481
482 #[test]
483 fn test_serialize_includes_batch_id_when_some() {
484 let mut item = SyncItem::new("test".to_string(), vec![]);
485 item.batch_id = Some("batch-123".to_string());
486
487 let json_str = serde_json::to_string(&item).unwrap();
488
489 assert!(json_str.contains("batch_id"));
490 assert!(json_str.contains("batch-123"));
491 }
492
493 #[test]
494 fn test_clone() {
495 let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
496 let cloned = item.clone();
497
498 assert_eq!(cloned.object_id, item.object_id);
499 assert_eq!(cloned.content, item.content);
500 }
501
502 #[test]
503 fn test_debug_format() {
504 let item = SyncItem::new("test".to_string(), vec![]);
505 let debug_str = format!("{:?}", item);
506
507 assert!(debug_str.contains("SyncItem"));
508 assert!(debug_str.contains("test"));
509 }
510
511 #[test]
512 fn test_updated_at_is_recent() {
513 let before = std::time::SystemTime::now()
514 .duration_since(std::time::UNIX_EPOCH)
515 .unwrap()
516 .as_millis() as i64;
517
518 let item = SyncItem::new("test".to_string(), vec![]);
519
520 let after = std::time::SystemTime::now()
521 .duration_since(std::time::UNIX_EPOCH)
522 .unwrap()
523 .as_millis() as i64;
524
525 assert!(item.updated_at >= before);
526 assert!(item.updated_at <= after);
527 }
528
529 #[test]
530 fn test_large_content_size() {
531 let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
533 let item = SyncItem::new("large".to_string(), large_data);
534
535 let size = item.size_bytes();
536
537 assert!(size > 10000, "Large content should result in large size");
539 }
540
541 #[test]
542 fn test_state_default() {
543 let item = SyncItem::new("test".to_string(), b"data".to_vec());
544 assert_eq!(item.state, "default");
545 }
546
547 #[test]
548 fn test_state_with_state_builder() {
549 let item = SyncItem::new("test".to_string(), b"data".to_vec())
550 .with_state("delta");
551 assert_eq!(item.state, "delta");
552 }
553
554 #[test]
555 fn test_state_with_state_chaining() {
556 let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
557 .with_state("pending");
558
559 assert_eq!(item.state, "pending");
560 assert_eq!(item.object_id, "test");
561 }
562
563 #[test]
564 fn test_state_serialization() {
565 let item = SyncItem::new("test".to_string(), b"data".to_vec())
566 .with_state("custom_state");
567
568 let json = serde_json::to_string(&item).unwrap();
569 assert!(json.contains("\"state\":\"custom_state\""));
570
571 let parsed: SyncItem = serde_json::from_str(&json).unwrap();
573 assert_eq!(parsed.state, "custom_state");
574 }
575
576 #[test]
577 fn test_state_deserialization_default() {
578 let json = r#"{
580 "object_id": "test",
581 "version": 1,
582 "updated_at": 12345,
583 "priority_score": 0.0,
584 "merkle_root": "",
585 "last_accessed": 0,
586 "access_count": 0,
587 "content": [100, 97, 116, 97]
588 }"#;
589
590 let item: SyncItem = serde_json::from_str(json).unwrap();
591 assert_eq!(item.state, "default");
592 }
593}