sync_engine/
sync_item.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync item data structure.
5//!
6//! The [`SyncItem`] is the core data unit that flows through the sync engine.
7//! Each item has a hierarchical ID (reverse DNS style), version, and binary content.
8//!
9//! # Binary Content
10//!
11//! The `content` field is `Vec<u8>` - raw bytes that sync-engine treats as opaque.
12//! The caller is responsible for serialization (JSON, MessagePack, Cap'n Proto, etc.).
13//!
14//! ```rust
15//! use sync_engine::SyncItem;
16//! use serde_json::json;
17//!
18//! // Store JSON as bytes
19//! let json_bytes = serde_json::to_vec(&json!({"name": "Alice"})).unwrap();
20//! let item = SyncItem::new("user.123".into(), json_bytes);
21//!
22//! // Or store any binary format
23//! let binary_data = vec![0x01, 0x02, 0x03];
24//! let item = SyncItem::new("binary.456".into(), binary_data);
25//! ```
26
27use std::sync::OnceLock;
28use serde::{Deserialize, Serialize};
29use sha2::{Sha256, Digest};
30use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
31use crate::submit_options::SubmitOptions;
32
33/// Content type classification for storage routing.
34///
35/// This enables intelligent storage: JSON content can be stored in Redis as
36/// searchable hashes (HSET) and in SQL as queryable JSON columns, while binary
37/// content uses efficient blob storage.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
39#[serde(rename_all = "lowercase")]
40pub enum ContentType {
41    /// JSON content - stored as Redis HASH, SQL JSON column
42    /// Enables: RedisSearch FT.SEARCH, SQL JSON path queries
43    Json,
44    /// Binary/opaque content - stored as Redis STRING, SQL BLOB
45    /// Fast path for non-structured data
46    #[default]
47    Binary,
48}
49
50impl ContentType {
51    /// Detect content type from raw bytes.
52    /// 
53    /// Fast heuristic: checks first non-whitespace byte for JSON indicators,
54    /// then validates with a full parse only for likely-JSON content.
55    #[must_use]
56    pub fn detect(content: &[u8]) -> Self {
57        // Empty content is binary
58        if content.is_empty() {
59            return ContentType::Binary;
60        }
61        
62        // Fast path: check first non-whitespace byte
63        let first = content.iter().find(|b| !b.is_ascii_whitespace());
64        match first {
65            Some(b'{') | Some(b'[') | Some(b'"') => {
66                // Likely JSON - validate with parse
67                if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
68                    ContentType::Json
69                } else {
70                    ContentType::Binary
71                }
72            }
73            _ => ContentType::Binary
74        }
75    }
76    
77    /// Check if this is JSON content
78    #[inline]
79    #[must_use]
80    pub fn is_json(&self) -> bool {
81        matches!(self, ContentType::Json)
82    }
83    
84    /// Check if this is binary content
85    #[inline]
86    #[must_use]
87    pub fn is_binary(&self) -> bool {
88        matches!(self, ContentType::Binary)
89    }
90    
91    /// Return the string representation for serialization
92    #[inline]
93    #[must_use]
94    pub fn as_str(&self) -> &'static str {
95        match self {
96            ContentType::Json => "json",
97            ContentType::Binary => "binary",
98        }
99    }
100}
101
102/// A wrapper struct that separates metadata from content.
103///
104/// # Binary-First Design
105///
106/// sync-engine is a **dumb storage layer** - it stores your bytes and routes
107/// them to L1/L2/L3 based on [`SubmitOptions`]. The
108/// `content` field is opaque `Vec<u8>` that we never interpret.
109///
110/// # Example
111///
112/// ```rust
113/// use sync_engine::SyncItem;
114/// use serde_json::json;
115///
116/// // JSON content (serialize to bytes yourself)
117/// let json_bytes = serde_json::to_vec(&json!({"name": "John Doe"})).unwrap();
118/// let item = SyncItem::new("uk.nhs.patient.12345".into(), json_bytes);
119///
120/// assert_eq!(item.object_id, "uk.nhs.patient.12345");
121/// assert_eq!(item.version, 1);
122/// ```
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SyncItem {
125    /// Reverse DNS style ID (e.g., `uk.nhs.patient.record.1234567890`)
126    pub object_id: String,
127    /// Version number (monotonically increasing within this item)
128    pub version: u64,
129    /// Last update timestamp (epoch millis)
130    pub updated_at: i64,
131    /// Content type (json or binary) - determines storage format
132    /// JSON → Redis HSET (searchable), SQL JSON column
133    /// Binary → Redis SET, SQL BLOB column
134    #[serde(default)]
135    pub content_type: ContentType,
136    /// Batch ID for tracking batch writes (UUID, set during batch flush)
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub batch_id: Option<String>,
139    /// W3C Trace Context traceparent header (for cross-item trace linking)
140    /// Format: "00-{trace_id}-{span_id}-{flags}"
141    /// This is NOT for in-process tracing (that flows via Span::current()),
142    /// but for linking related operations across items/time.
143    pub trace_parent: Option<String>,
144    /// W3C Trace Context tracestate header (optional vendor-specific data)
145    pub trace_state: Option<String>,
146    /// Reserved for future use. Currently unused.
147    #[doc(hidden)]
148    pub priority_score: f64,
149    /// SHA256 hash of the content (hex-encoded).
150    /// Computed eagerly on creation for CDC dedup and integrity checks.
151    #[serde(alias = "merkle_root")]  // Wire compat with v0.2.x
152    pub content_hash: String,
153    /// Timestamp of last access (epoch millis)
154    pub last_accessed: u64,
155    /// Number of times accessed
156    pub access_count: u64,
157    /// The actual payload (opaque binary, caller handles serialization)
158    #[serde(with = "serde_bytes")]
159    pub content: Vec<u8>,
160    /// Optional guest data owner ID (for routing engine)
161    pub home_instance_id: Option<String>,
162    /// Arbitrary state tag for caller-defined grouping (e.g., "delta", "base", "pending").
163    /// Indexed in SQL and tracked via Redis SETs for fast state-based queries.
164    /// Default: "default"
165    #[serde(default = "default_state")]
166    pub state: String,
167    
168    /// Transient submit options (travels with item through pipeline, not serialized)
169    /// Set via `submit_with()`, defaults to `SubmitOptions::default()` if None.
170    #[serde(skip)]
171    pub(crate) submit_options: Option<SubmitOptions>,
172    
173    /// Cached computed size in bytes (lazily computed, not serialized)
174    #[serde(skip)]
175    cached_size: OnceLock<usize>,
176}
177
178/// Default state value for SyncItem
179fn default_state() -> String {
180    "default".to_string()
181}
182
183impl SyncItem {
184    /// Create a new SyncItem with binary content.
185    ///
186    /// The content type is auto-detected: if the bytes are valid JSON,
187    /// `content_type` will be `Json`, otherwise `Binary`. This enables
188    /// intelligent storage routing (HSET vs SET in Redis, JSON vs BLOB in SQL).
189    ///
190    /// # Example
191    ///
192    /// ```rust
193    /// use sync_engine::{SyncItem, ContentType};
194    ///
195    /// // From raw bytes (detected as Binary)
196    /// let item = SyncItem::new("id".into(), vec![1, 2, 3]);
197    /// assert_eq!(item.content_type, ContentType::Binary);
198    ///
199    /// // From JSON bytes (detected as Json)
200    /// let json = serde_json::to_vec(&serde_json::json!({"key": "value"})).unwrap();
201    /// let item = SyncItem::new("id".into(), json);
202    /// assert_eq!(item.content_type, ContentType::Json);
203    /// ```
204    pub fn new(object_id: String, content: Vec<u8>) -> Self {
205        let content_type = ContentType::detect(&content);
206        // Compute content hash eagerly for CDC dedup
207        let content_hash = hex::encode(Sha256::digest(&content));
208        Self {
209            object_id,
210            version: 1,
211            updated_at: std::time::SystemTime::now()
212                .duration_since(std::time::UNIX_EPOCH)
213                .unwrap_or_default()
214                .as_millis() as i64,
215            content_type,
216            batch_id: None,
217            trace_parent: None,
218            trace_state: None,
219            priority_score: 0.0,
220            content_hash,
221            last_accessed: 0,
222            access_count: 0,
223            content,
224            home_instance_id: None,
225            state: "default".to_string(),
226            submit_options: None,  // Set via submit_with() if needed
227            cached_size: OnceLock::new(),
228        }
229    }
230
231    /// Create a new SyncItem from a JSON value (convenience method).
232    ///
233    /// This serializes the JSON to bytes and sets `content_type` to `Json`.
234    /// For binary formats (MessagePack, Cap'n Proto), use [`new`](Self::new).
235    pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
236        let content = serde_json::to_vec(&value).unwrap_or_default();
237        let mut item = Self::new(object_id, content);
238        item.content_type = ContentType::Json; // Explicit, since we know it's JSON
239        item
240    }
241
242    /// Create a new SyncItem from any serializable type.
243    ///
244    /// This avoids creating an intermediate `serde_json::Value` if you have a struct.
245    /// This is more efficient than `from_json` if you already have a typed object.
246    pub fn from_serializable<T: Serialize>(object_id: String, value: &T) -> Result<Self, serde_json::Error> {
247        let content = serde_json::to_vec(value)?;
248        let mut item = Self::new(object_id, content);
249        item.content_type = ContentType::Json;
250        Ok(item)
251    }
252    
253    /// Reconstruct a SyncItem from stored components (used by storage backends).
254    /// 
255    /// This allows storage backends to rebuild a SyncItem from flattened data
256    /// (e.g., Redis HGETALL, SQL column reads) without accessing private fields.
257    #[doc(hidden)]
258    #[allow(clippy::too_many_arguments)]
259    pub fn reconstruct(
260        object_id: String,
261        version: u64,
262        updated_at: i64,
263        content_type: ContentType,
264        content: Vec<u8>,
265        batch_id: Option<String>,
266        trace_parent: Option<String>,
267        content_hash: String,
268        home_instance_id: Option<String>,
269        state: String,
270        access_count: u64,
271        last_accessed: u64,
272    ) -> Self {
273        Self {
274            object_id,
275            version,
276            updated_at,
277            content_type,
278            batch_id,
279            trace_parent,
280            trace_state: None,
281            priority_score: 0.0,
282            content_hash,
283            last_accessed,
284            access_count,
285            content,
286            home_instance_id,
287            state,
288            submit_options: None,
289            cached_size: OnceLock::new(),
290        }
291    }
292
293    /// Set submit options for this item (builder pattern).
294    ///
295    /// These options control where the item is stored (Redis, SQL) and
296    /// how it's compressed. Options travel with the item through the
297    /// batch pipeline.
298    ///
299    /// # Example
300    ///
301    /// ```rust
302    /// use sync_engine::{SyncItem, SubmitOptions, CacheTtl};
303    ///
304    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec())
305    ///     .with_options(SubmitOptions::cache(CacheTtl::Minute));
306    /// ```
307    #[must_use]
308    pub fn with_options(mut self, options: SubmitOptions) -> Self {
309        self.submit_options = Some(options);
310        self
311    }
312    
313    /// Set state tag for this item (builder pattern).
314    ///
315    /// State is an arbitrary string for caller-defined grouping.
316    /// Common uses: "delta"/"base" for CRDTs, "pending"/"approved" for workflows.
317    ///
318    /// # Example
319    ///
320    /// ```rust
321    /// use sync_engine::SyncItem;
322    ///
323    /// let item = SyncItem::new("crdt.123".into(), b"data".to_vec())
324    ///     .with_state("delta");
325    /// ```
326    #[must_use]
327    pub fn with_state(mut self, state: impl Into<String>) -> Self {
328        self.state = state.into();
329        self
330    }
331
332    /// Get the effective submit options (returns default if not set).
333    #[must_use]
334    pub fn effective_options(&self) -> SubmitOptions {
335        self.submit_options.clone().unwrap_or_default()
336    }
337
338    /// Try to parse content as JSON.
339    ///
340    /// Returns `None` if content is not valid JSON.
341    #[must_use]
342    pub fn content_as_json(&self) -> Option<serde_json::Value> {
343        serde_json::from_slice(&self.content).ok()
344    }
345
346    /// Attach trace context from current span (for distributed trace linking)
347    #[cfg(feature = "otel")]
348    pub fn with_current_trace_context(mut self) -> Self {
349        use opentelemetry::trace::TraceContextExt;
350        use tracing_opentelemetry::OpenTelemetrySpanExt;
351        
352        let cx = tracing::Span::current().context();
353        let span_ref = cx.span();
354        let sc = span_ref.span_context();
355        if sc.is_valid() {
356            self.trace_parent = Some(format!(
357                "00-{}-{}-{:02x}",
358                sc.trace_id(),
359                sc.span_id(),
360                sc.trace_flags().to_u8()
361            ));
362        }
363        self
364    }
365}
366
367impl SizedItem for SyncItem {
368    fn size_bytes(&self) -> usize {
369        *self.cached_size.get_or_init(|| {
370            // Approximate size: struct overhead + string lengths + content bytes
371            std::mem::size_of::<Self>()
372                + self.object_id.len()
373                + self.trace_parent.as_ref().map_or(0, String::len)
374                + self.trace_state.as_ref().map_or(0, String::len)
375                + self.content_hash.len()
376                + self.content.len()
377                + self.home_instance_id.as_ref().map_or(0, String::len)
378        })
379    }
380}
381
382impl BatchableItem for SyncItem {
383    fn id(&self) -> &str {
384        &self.object_id
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use serde_json::json;
392
393    #[test]
394    fn test_new_sync_item() {
395        let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
396        
397        assert_eq!(item.object_id, "test-id");
398        assert_eq!(item.version, 1);
399        assert!(item.updated_at > 0);
400        assert!(item.batch_id.is_none());
401        assert!(item.trace_parent.is_none());
402        assert!(item.trace_state.is_none());
403        assert_eq!(item.priority_score, 0.0);
404        // Content hash is computed eagerly for CDC dedup
405        // SHA256("hello") = 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
406        assert_eq!(item.content_hash, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824");
407        assert_eq!(item.last_accessed, 0);
408        assert_eq!(item.access_count, 0);
409        assert!(item.home_instance_id.is_none());
410        assert_eq!(item.content, b"hello");
411    }
412
413    #[test]
414    fn test_from_json() {
415        let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
416        
417        assert_eq!(item.object_id, "test-id");
418        // Content should be serialized JSON bytes
419        let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
420        assert_eq!(parsed, json!({"key": "value"}));
421    }
422
423    #[test]
424    fn test_content_as_json() {
425        let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
426        
427        let parsed = item.content_as_json().unwrap();
428        assert_eq!(parsed["nested"]["key"], 42);
429        
430        // Binary content should return None
431        let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
432        assert!(binary_item.content_as_json().is_none());
433    }
434
435    #[test]
436    fn test_size_bytes_calculation() {
437        let item = SyncItem::from_json(
438            "uk.nhs.patient.record.123456".to_string(),
439            json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
440        );
441        
442        let size = item.size_bytes();
443        
444        // Should be non-zero
445        assert!(size > 0);
446        
447        // Should include struct overhead + content
448        assert!(size > std::mem::size_of::<SyncItem>());
449    }
450
451    #[test]
452    fn test_size_bytes_cached() {
453        let item = SyncItem::new("test".to_string(), b"data".to_vec());
454        
455        let size1 = item.size_bytes();
456        let size2 = item.size_bytes();
457        
458        // Same value (cached)
459        assert_eq!(size1, size2);
460    }
461
462    #[test]
463    fn test_size_includes_optional_fields() {
464        let mut item = SyncItem::new("test".to_string(), vec![]);
465        
466        // Manually set optional fields
467        item.trace_parent = Some("00-abc123-def456-01".to_string());
468        item.trace_state = Some("vendor=data".to_string());
469        item.home_instance_id = Some("instance-1".to_string());
470        
471        let size = item.size_bytes();
472        
473        // Should be larger than minimal
474        // Note: can't compare directly because cached_size is already set
475        // But we can verify size includes the optional field lengths
476        assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
477    }
478
479    #[test]
480    fn test_serialize_deserialize() {
481        let item = SyncItem::from_json(
482            "test-id".to_string(),
483            json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
484        );
485        
486        let json_str = serde_json::to_string(&item).unwrap();
487        let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
488        
489        assert_eq!(deserialized.object_id, item.object_id);
490        assert_eq!(deserialized.version, item.version);
491        assert_eq!(deserialized.content, item.content);
492    }
493
494    #[test]
495    fn test_serialize_skips_none_batch_id() {
496        let item = SyncItem::new("test".to_string(), vec![]);
497        
498        let json_str = serde_json::to_string(&item).unwrap();
499        
500        // batch_id should not appear in JSON when None
501        assert!(!json_str.contains("batch_id"));
502    }
503
504    #[test]
505    fn test_serialize_includes_batch_id_when_some() {
506        let mut item = SyncItem::new("test".to_string(), vec![]);
507        item.batch_id = Some("batch-123".to_string());
508        
509        let json_str = serde_json::to_string(&item).unwrap();
510        
511        assert!(json_str.contains("batch_id"));
512        assert!(json_str.contains("batch-123"));
513    }
514
515    #[test]
516    fn test_clone() {
517        let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
518        let cloned = item.clone();
519        
520        assert_eq!(cloned.object_id, item.object_id);
521        assert_eq!(cloned.content, item.content);
522    }
523
524    #[test]
525    fn test_debug_format() {
526        let item = SyncItem::new("test".to_string(), vec![]);
527        let debug_str = format!("{:?}", item);
528        
529        assert!(debug_str.contains("SyncItem"));
530        assert!(debug_str.contains("test"));
531    }
532
533    #[test]
534    fn test_updated_at_is_recent() {
535        let before = std::time::SystemTime::now()
536            .duration_since(std::time::UNIX_EPOCH)
537            .unwrap()
538            .as_millis() as i64;
539        
540        let item = SyncItem::new("test".to_string(), vec![]);
541        
542        let after = std::time::SystemTime::now()
543            .duration_since(std::time::UNIX_EPOCH)
544            .unwrap()
545            .as_millis() as i64;
546        
547        assert!(item.updated_at >= before);
548        assert!(item.updated_at <= after);
549    }
550
551    #[test]
552    fn test_large_content_size() {
553        // Create item with large binary content
554        let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
555        let item = SyncItem::new("large".to_string(), large_data);
556        
557        let size = item.size_bytes();
558        
559        // Should be substantial (10000 * 4 bytes = 40000)
560        assert!(size > 10000, "Large content should result in large size");
561    }
562
563    #[test]
564    fn test_state_default() {
565        let item = SyncItem::new("test".to_string(), b"data".to_vec());
566        assert_eq!(item.state, "default");
567    }
568
569    #[test]
570    fn test_state_with_state_builder() {
571        let item = SyncItem::new("test".to_string(), b"data".to_vec())
572            .with_state("delta");
573        assert_eq!(item.state, "delta");
574    }
575
576    #[test]
577    fn test_state_with_state_chaining() {
578        let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
579            .with_state("pending");
580        
581        assert_eq!(item.state, "pending");
582        assert_eq!(item.object_id, "test");
583    }
584
585    #[test]
586    fn test_state_serialization() {
587        let item = SyncItem::new("test".to_string(), b"data".to_vec())
588            .with_state("custom_state");
589        
590        let json = serde_json::to_string(&item).unwrap();
591        assert!(json.contains("\"state\":\"custom_state\""));
592        
593        // Deserialize back
594        let parsed: SyncItem = serde_json::from_str(&json).unwrap();
595        assert_eq!(parsed.state, "custom_state");
596    }
597
598    #[test]
599    fn test_state_deserialization_default() {
600        // JSON without state field should default to "default"
601        // Uses merkle_root in JSON to test serde alias backward compat
602        let json = r#"{
603            "object_id": "test",
604            "version": 1,
605            "updated_at": 12345,
606            "priority_score": 0.0,
607            "merkle_root": "",
608            "last_accessed": 0,
609            "access_count": 0,
610            "content": [100, 97, 116, 97]
611        }"#;
612        
613        let item: SyncItem = serde_json::from_str(json).unwrap();
614        assert_eq!(item.state, "default");
615    }
616}