1use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
27use crate::submit_options::SubmitOptions;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
35#[serde(rename_all = "lowercase")]
36pub enum ContentType {
37 Json,
40 #[default]
43 Binary,
44}
45
46impl ContentType {
47 #[must_use]
52 pub fn detect(content: &[u8]) -> Self {
53 if content.is_empty() {
55 return ContentType::Binary;
56 }
57
58 let first = content.iter().find(|b| !b.is_ascii_whitespace());
60 match first {
61 Some(b'{') | Some(b'[') | Some(b'"') => {
62 if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
64 ContentType::Json
65 } else {
66 ContentType::Binary
67 }
68 }
69 _ => ContentType::Binary
70 }
71 }
72
73 #[inline]
75 #[must_use]
76 pub fn is_json(&self) -> bool {
77 matches!(self, ContentType::Json)
78 }
79
80 #[inline]
82 #[must_use]
83 pub fn is_binary(&self) -> bool {
84 matches!(self, ContentType::Binary)
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SyncItem {
111 pub object_id: String,
113 pub version: u64,
115 pub updated_at: i64,
117 #[serde(default)]
121 pub content_type: ContentType,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
124 pub batch_id: Option<String>,
125 pub trace_parent: Option<String>,
130 pub trace_state: Option<String>,
132 #[doc(hidden)]
134 pub priority_score: f64,
135 pub merkle_root: String,
137 pub last_accessed: u64,
139 pub access_count: u64,
141 #[serde(with = "serde_bytes")]
143 pub content: Vec<u8>,
144 pub home_instance_id: Option<String>,
146
147 #[serde(skip)]
150 pub(crate) submit_options: Option<SubmitOptions>,
151
152 #[serde(skip)]
154 cached_size: OnceLock<usize>,
155}
156
157impl SyncItem {
158 pub fn new(object_id: String, content: Vec<u8>) -> Self {
179 let content_type = ContentType::detect(&content);
180 Self {
181 object_id,
182 version: 1,
183 updated_at: std::time::SystemTime::now()
184 .duration_since(std::time::UNIX_EPOCH)
185 .unwrap_or_default()
186 .as_millis() as i64,
187 content_type,
188 batch_id: None,
189 trace_parent: None,
190 trace_state: None,
191 priority_score: 0.0,
192 merkle_root: String::new(),
193 last_accessed: 0,
194 access_count: 0,
195 content,
196 home_instance_id: None,
197 submit_options: None, cached_size: OnceLock::new(),
199 }
200 }
201
202 pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
207 let content = serde_json::to_vec(&value).unwrap_or_default();
208 let mut item = Self::new(object_id, content);
209 item.content_type = ContentType::Json; item
211 }
212
213 #[doc(hidden)]
218 #[allow(clippy::too_many_arguments)]
219 pub fn reconstruct(
220 object_id: String,
221 version: u64,
222 updated_at: i64,
223 content_type: ContentType,
224 content: Vec<u8>,
225 batch_id: Option<String>,
226 trace_parent: Option<String>,
227 merkle_root: String,
228 home_instance_id: Option<String>,
229 ) -> Self {
230 Self {
231 object_id,
232 version,
233 updated_at,
234 content_type,
235 batch_id,
236 trace_parent,
237 trace_state: None,
238 priority_score: 0.0,
239 merkle_root,
240 last_accessed: 0,
241 access_count: 0,
242 content,
243 home_instance_id,
244 submit_options: None,
245 cached_size: OnceLock::new(),
246 }
247 }
248
249 #[must_use]
264 pub fn with_options(mut self, options: SubmitOptions) -> Self {
265 self.submit_options = Some(options);
266 self
267 }
268
269 #[must_use]
271 pub fn effective_options(&self) -> SubmitOptions {
272 self.submit_options.clone().unwrap_or_default()
273 }
274
275 #[must_use]
279 pub fn content_as_json(&self) -> Option<serde_json::Value> {
280 serde_json::from_slice(&self.content).ok()
281 }
282
283 #[cfg(feature = "otel")]
285 pub fn with_current_trace_context(mut self) -> Self {
286 use opentelemetry::trace::TraceContextExt;
287 use tracing_opentelemetry::OpenTelemetrySpanExt;
288
289 let cx = tracing::Span::current().context();
290 let span_ref = cx.span();
291 let sc = span_ref.span_context();
292 if sc.is_valid() {
293 self.trace_parent = Some(format!(
294 "00-{}-{}-{:02x}",
295 sc.trace_id(),
296 sc.span_id(),
297 sc.trace_flags().to_u8()
298 ));
299 }
300 self
301 }
302}
303
304impl SizedItem for SyncItem {
305 fn size_bytes(&self) -> usize {
306 *self.cached_size.get_or_init(|| {
307 std::mem::size_of::<Self>()
309 + self.object_id.len()
310 + self.trace_parent.as_ref().map_or(0, String::len)
311 + self.trace_state.as_ref().map_or(0, String::len)
312 + self.merkle_root.len()
313 + self.content.len()
314 + self.home_instance_id.as_ref().map_or(0, String::len)
315 })
316 }
317}
318
319impl BatchableItem for SyncItem {
320 fn id(&self) -> &str {
321 &self.object_id
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use serde_json::json;
329
330 #[test]
331 fn test_new_sync_item() {
332 let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
333
334 assert_eq!(item.object_id, "test-id");
335 assert_eq!(item.version, 1);
336 assert!(item.updated_at > 0);
337 assert!(item.batch_id.is_none());
338 assert!(item.trace_parent.is_none());
339 assert!(item.trace_state.is_none());
340 assert_eq!(item.priority_score, 0.0);
341 assert!(item.merkle_root.is_empty());
342 assert_eq!(item.last_accessed, 0);
343 assert_eq!(item.access_count, 0);
344 assert!(item.home_instance_id.is_none());
345 assert_eq!(item.content, b"hello");
346 }
347
348 #[test]
349 fn test_from_json() {
350 let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
351
352 assert_eq!(item.object_id, "test-id");
353 let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
355 assert_eq!(parsed, json!({"key": "value"}));
356 }
357
358 #[test]
359 fn test_content_as_json() {
360 let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
361
362 let parsed = item.content_as_json().unwrap();
363 assert_eq!(parsed["nested"]["key"], 42);
364
365 let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
367 assert!(binary_item.content_as_json().is_none());
368 }
369
370 #[test]
371 fn test_size_bytes_calculation() {
372 let item = SyncItem::from_json(
373 "uk.nhs.patient.record.123456".to_string(),
374 json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
375 );
376
377 let size = item.size_bytes();
378
379 assert!(size > 0);
381
382 assert!(size > std::mem::size_of::<SyncItem>());
384 }
385
386 #[test]
387 fn test_size_bytes_cached() {
388 let item = SyncItem::new("test".to_string(), b"data".to_vec());
389
390 let size1 = item.size_bytes();
391 let size2 = item.size_bytes();
392
393 assert_eq!(size1, size2);
395 }
396
397 #[test]
398 fn test_size_includes_optional_fields() {
399 let mut item = SyncItem::new("test".to_string(), vec![]);
400
401 item.trace_parent = Some("00-abc123-def456-01".to_string());
403 item.trace_state = Some("vendor=data".to_string());
404 item.home_instance_id = Some("instance-1".to_string());
405
406 let size = item.size_bytes();
407
408 assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
412 }
413
414 #[test]
415 fn test_serialize_deserialize() {
416 let item = SyncItem::from_json(
417 "test-id".to_string(),
418 json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
419 );
420
421 let json_str = serde_json::to_string(&item).unwrap();
422 let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
423
424 assert_eq!(deserialized.object_id, item.object_id);
425 assert_eq!(deserialized.version, item.version);
426 assert_eq!(deserialized.content, item.content);
427 }
428
429 #[test]
430 fn test_serialize_skips_none_batch_id() {
431 let item = SyncItem::new("test".to_string(), vec![]);
432
433 let json_str = serde_json::to_string(&item).unwrap();
434
435 assert!(!json_str.contains("batch_id"));
437 }
438
439 #[test]
440 fn test_serialize_includes_batch_id_when_some() {
441 let mut item = SyncItem::new("test".to_string(), vec![]);
442 item.batch_id = Some("batch-123".to_string());
443
444 let json_str = serde_json::to_string(&item).unwrap();
445
446 assert!(json_str.contains("batch_id"));
447 assert!(json_str.contains("batch-123"));
448 }
449
450 #[test]
451 fn test_clone() {
452 let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
453 let cloned = item.clone();
454
455 assert_eq!(cloned.object_id, item.object_id);
456 assert_eq!(cloned.content, item.content);
457 }
458
459 #[test]
460 fn test_debug_format() {
461 let item = SyncItem::new("test".to_string(), vec![]);
462 let debug_str = format!("{:?}", item);
463
464 assert!(debug_str.contains("SyncItem"));
465 assert!(debug_str.contains("test"));
466 }
467
468 #[test]
469 fn test_updated_at_is_recent() {
470 let before = std::time::SystemTime::now()
471 .duration_since(std::time::UNIX_EPOCH)
472 .unwrap()
473 .as_millis() as i64;
474
475 let item = SyncItem::new("test".to_string(), vec![]);
476
477 let after = std::time::SystemTime::now()
478 .duration_since(std::time::UNIX_EPOCH)
479 .unwrap()
480 .as_millis() as i64;
481
482 assert!(item.updated_at >= before);
483 assert!(item.updated_at <= after);
484 }
485
486 #[test]
487 fn test_large_content_size() {
488 let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
490 let item = SyncItem::new("large".to_string(), large_data);
491
492 let size = item.size_bytes();
493
494 assert!(size > 10000, "Large content should result in large size");
496 }
497}