sync_engine/
sync_item.rs

1//! Sync item data structure.
2//!
3//! The [`SyncItem`] is the core data unit that flows through the sync engine.
4//! Each item has a hierarchical ID (reverse DNS style), version, and binary content.
5//!
6//! # Binary Content
7//!
8//! The `content` field is `Vec<u8>` - raw bytes that sync-engine treats as opaque.
9//! The caller is responsible for serialization (JSON, MessagePack, Cap'n Proto, etc.).
10//!
11//! ```rust
12//! use sync_engine::SyncItem;
13//! use serde_json::json;
14//!
15//! // Store JSON as bytes
16//! let json_bytes = serde_json::to_vec(&json!({"name": "Alice"})).unwrap();
17//! let item = SyncItem::new("user.123".into(), json_bytes);
18//!
19//! // Or store any binary format
20//! let binary_data = vec![0x01, 0x02, 0x03];
21//! let item = SyncItem::new("binary.456".into(), binary_data);
22//! ```
23
24use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use sha2::{Sha256, Digest};
27use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
28use crate::submit_options::SubmitOptions;
29
30/// Content type classification for storage routing.
31///
32/// This enables intelligent storage: JSON content can be stored in Redis as
33/// searchable hashes (HSET) and in SQL as queryable JSON columns, while binary
34/// content uses efficient blob storage.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
36#[serde(rename_all = "lowercase")]
37pub enum ContentType {
38    /// JSON content - stored as Redis HASH, SQL JSON column
39    /// Enables: RedisSearch FT.SEARCH, SQL JSON path queries
40    Json,
41    /// Binary/opaque content - stored as Redis STRING, SQL BLOB
42    /// Fast path for non-structured data
43    #[default]
44    Binary,
45}
46
47impl ContentType {
48    /// Detect content type from raw bytes.
49    /// 
50    /// Fast heuristic: checks first non-whitespace byte for JSON indicators,
51    /// then validates with a full parse only for likely-JSON content.
52    #[must_use]
53    pub fn detect(content: &[u8]) -> Self {
54        // Empty content is binary
55        if content.is_empty() {
56            return ContentType::Binary;
57        }
58        
59        // Fast path: check first non-whitespace byte
60        let first = content.iter().find(|b| !b.is_ascii_whitespace());
61        match first {
62            Some(b'{') | Some(b'[') | Some(b'"') => {
63                // Likely JSON - validate with parse
64                if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
65                    ContentType::Json
66                } else {
67                    ContentType::Binary
68                }
69            }
70            _ => ContentType::Binary
71        }
72    }
73    
74    /// Check if this is JSON content
75    #[inline]
76    #[must_use]
77    pub fn is_json(&self) -> bool {
78        matches!(self, ContentType::Json)
79    }
80    
81    /// Check if this is binary content
82    #[inline]
83    #[must_use]
84    pub fn is_binary(&self) -> bool {
85        matches!(self, ContentType::Binary)
86    }
87    
88    /// Return the string representation for serialization
89    #[inline]
90    #[must_use]
91    pub fn as_str(&self) -> &'static str {
92        match self {
93            ContentType::Json => "json",
94            ContentType::Binary => "binary",
95        }
96    }
97}
98
99/// A wrapper struct that separates metadata from content.
100///
101/// # Binary-First Design
102///
103/// sync-engine is a **dumb storage layer** - it stores your bytes and routes
104/// them to L1/L2/L3 based on [`SubmitOptions`]. The
105/// `content` field is opaque `Vec<u8>` that we never interpret.
106///
107/// # Example
108///
109/// ```rust
110/// use sync_engine::SyncItem;
111/// use serde_json::json;
112///
113/// // JSON content (serialize to bytes yourself)
114/// let json_bytes = serde_json::to_vec(&json!({"name": "John Doe"})).unwrap();
115/// let item = SyncItem::new("uk.nhs.patient.12345".into(), json_bytes);
116///
117/// assert_eq!(item.object_id, "uk.nhs.patient.12345");
118/// assert_eq!(item.version, 1);
119/// ```
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct SyncItem {
122    /// Reverse DNS style ID (e.g., `uk.nhs.patient.record.1234567890`)
123    pub object_id: String,
124    /// Version number (monotonically increasing within this item)
125    pub version: u64,
126    /// Last update timestamp (epoch millis)
127    pub updated_at: i64,
128    /// Content type (json or binary) - determines storage format
129    /// JSON → Redis HSET (searchable), SQL JSON column
130    /// Binary → Redis SET, SQL BLOB column
131    #[serde(default)]
132    pub content_type: ContentType,
133    /// Batch ID for tracking batch writes (UUID, set during batch flush)
134    #[serde(default, skip_serializing_if = "Option::is_none")]
135    pub batch_id: Option<String>,
136    /// W3C Trace Context traceparent header (for cross-item trace linking)
137    /// Format: "00-{trace_id}-{span_id}-{flags}"
138    /// This is NOT for in-process tracing (that flows via Span::current()),
139    /// but for linking related operations across items/time.
140    pub trace_parent: Option<String>,
141    /// W3C Trace Context tracestate header (optional vendor-specific data)
142    pub trace_state: Option<String>,
143    /// Reserved for future use. Currently unused.
144    #[doc(hidden)]
145    pub priority_score: f64,
146    /// SHA256 hash of the content (hex-encoded).
147    /// Computed eagerly on creation for CDC dedup and integrity checks.
148    #[serde(alias = "merkle_root")]  // Wire compat with v0.2.x
149    pub content_hash: String,
150    /// Timestamp of last access (epoch millis)
151    pub last_accessed: u64,
152    /// Number of times accessed
153    pub access_count: u64,
154    /// The actual payload (opaque binary, caller handles serialization)
155    #[serde(with = "serde_bytes")]
156    pub content: Vec<u8>,
157    /// Optional guest data owner ID (for routing engine)
158    pub home_instance_id: Option<String>,
159    /// Arbitrary state tag for caller-defined grouping (e.g., "delta", "base", "pending").
160    /// Indexed in SQL and tracked via Redis SETs for fast state-based queries.
161    /// Default: "default"
162    #[serde(default = "default_state")]
163    pub state: String,
164    
165    /// Transient submit options (travels with item through pipeline, not serialized)
166    /// Set via `submit_with()`, defaults to `SubmitOptions::default()` if None.
167    #[serde(skip)]
168    pub(crate) submit_options: Option<SubmitOptions>,
169    
170    /// Cached computed size in bytes (lazily computed, not serialized)
171    #[serde(skip)]
172    cached_size: OnceLock<usize>,
173}
174
175/// Default state value for SyncItem
176fn default_state() -> String {
177    "default".to_string()
178}
179
180impl SyncItem {
181    /// Create a new SyncItem with binary content.
182    ///
183    /// The content type is auto-detected: if the bytes are valid JSON,
184    /// `content_type` will be `Json`, otherwise `Binary`. This enables
185    /// intelligent storage routing (HSET vs SET in Redis, JSON vs BLOB in SQL).
186    ///
187    /// # Example
188    ///
189    /// ```rust
190    /// use sync_engine::{SyncItem, ContentType};
191    ///
192    /// // From raw bytes (detected as Binary)
193    /// let item = SyncItem::new("id".into(), vec![1, 2, 3]);
194    /// assert_eq!(item.content_type, ContentType::Binary);
195    ///
196    /// // From JSON bytes (detected as Json)
197    /// let json = serde_json::to_vec(&serde_json::json!({"key": "value"})).unwrap();
198    /// let item = SyncItem::new("id".into(), json);
199    /// assert_eq!(item.content_type, ContentType::Json);
200    /// ```
201    pub fn new(object_id: String, content: Vec<u8>) -> Self {
202        let content_type = ContentType::detect(&content);
203        // Compute content hash eagerly for CDC dedup
204        let content_hash = hex::encode(Sha256::digest(&content));
205        Self {
206            object_id,
207            version: 1,
208            updated_at: std::time::SystemTime::now()
209                .duration_since(std::time::UNIX_EPOCH)
210                .unwrap_or_default()
211                .as_millis() as i64,
212            content_type,
213            batch_id: None,
214            trace_parent: None,
215            trace_state: None,
216            priority_score: 0.0,
217            content_hash,
218            last_accessed: 0,
219            access_count: 0,
220            content,
221            home_instance_id: None,
222            state: "default".to_string(),
223            submit_options: None,  // Set via submit_with() if needed
224            cached_size: OnceLock::new(),
225        }
226    }
227
228    /// Create a new SyncItem from a JSON value (convenience method).
229    ///
230    /// This serializes the JSON to bytes and sets `content_type` to `Json`.
231    /// For binary formats (MessagePack, Cap'n Proto), use [`new`](Self::new).
232    pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
233        let content = serde_json::to_vec(&value).unwrap_or_default();
234        let mut item = Self::new(object_id, content);
235        item.content_type = ContentType::Json; // Explicit, since we know it's JSON
236        item
237    }
238
239    /// Create a new SyncItem from any serializable type.
240    ///
241    /// This avoids creating an intermediate `serde_json::Value` if you have a struct.
242    /// This is more efficient than `from_json` if you already have a typed object.
243    pub fn from_serializable<T: Serialize>(object_id: String, value: &T) -> Result<Self, serde_json::Error> {
244        let content = serde_json::to_vec(value)?;
245        let mut item = Self::new(object_id, content);
246        item.content_type = ContentType::Json;
247        Ok(item)
248    }
249    
250    /// Reconstruct a SyncItem from stored components (used by storage backends).
251    /// 
252    /// This allows storage backends to rebuild a SyncItem from flattened data
253    /// (e.g., Redis HGETALL, SQL column reads) without accessing private fields.
254    #[doc(hidden)]
255    #[allow(clippy::too_many_arguments)]
256    pub fn reconstruct(
257        object_id: String,
258        version: u64,
259        updated_at: i64,
260        content_type: ContentType,
261        content: Vec<u8>,
262        batch_id: Option<String>,
263        trace_parent: Option<String>,
264        content_hash: String,
265        home_instance_id: Option<String>,
266        state: String,
267    ) -> Self {
268        Self {
269            object_id,
270            version,
271            updated_at,
272            content_type,
273            batch_id,
274            trace_parent,
275            trace_state: None,
276            priority_score: 0.0,
277            content_hash,
278            last_accessed: 0,
279            access_count: 0,
280            content,
281            home_instance_id,
282            state,
283            submit_options: None,
284            cached_size: OnceLock::new(),
285        }
286    }
287
288    /// Set submit options for this item (builder pattern).
289    ///
290    /// These options control where the item is stored (Redis, SQL) and
291    /// how it's compressed. Options travel with the item through the
292    /// batch pipeline.
293    ///
294    /// # Example
295    ///
296    /// ```rust
297    /// use sync_engine::{SyncItem, SubmitOptions, CacheTtl};
298    ///
299    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec())
300    ///     .with_options(SubmitOptions::cache(CacheTtl::Minute));
301    /// ```
302    #[must_use]
303    pub fn with_options(mut self, options: SubmitOptions) -> Self {
304        self.submit_options = Some(options);
305        self
306    }
307    
308    /// Set state tag for this item (builder pattern).
309    ///
310    /// State is an arbitrary string for caller-defined grouping.
311    /// Common uses: "delta"/"base" for CRDTs, "pending"/"approved" for workflows.
312    ///
313    /// # Example
314    ///
315    /// ```rust
316    /// use sync_engine::SyncItem;
317    ///
318    /// let item = SyncItem::new("crdt.123".into(), b"data".to_vec())
319    ///     .with_state("delta");
320    /// ```
321    #[must_use]
322    pub fn with_state(mut self, state: impl Into<String>) -> Self {
323        self.state = state.into();
324        self
325    }
326
327    /// Get the effective submit options (returns default if not set).
328    #[must_use]
329    pub fn effective_options(&self) -> SubmitOptions {
330        self.submit_options.clone().unwrap_or_default()
331    }
332
333    /// Try to parse content as JSON.
334    ///
335    /// Returns `None` if content is not valid JSON.
336    #[must_use]
337    pub fn content_as_json(&self) -> Option<serde_json::Value> {
338        serde_json::from_slice(&self.content).ok()
339    }
340
341    /// Attach trace context from current span (for distributed trace linking)
342    #[cfg(feature = "otel")]
343    pub fn with_current_trace_context(mut self) -> Self {
344        use opentelemetry::trace::TraceContextExt;
345        use tracing_opentelemetry::OpenTelemetrySpanExt;
346        
347        let cx = tracing::Span::current().context();
348        let span_ref = cx.span();
349        let sc = span_ref.span_context();
350        if sc.is_valid() {
351            self.trace_parent = Some(format!(
352                "00-{}-{}-{:02x}",
353                sc.trace_id(),
354                sc.span_id(),
355                sc.trace_flags().to_u8()
356            ));
357        }
358        self
359    }
360}
361
362impl SizedItem for SyncItem {
363    fn size_bytes(&self) -> usize {
364        *self.cached_size.get_or_init(|| {
365            // Approximate size: struct overhead + string lengths + content bytes
366            std::mem::size_of::<Self>()
367                + self.object_id.len()
368                + self.trace_parent.as_ref().map_or(0, String::len)
369                + self.trace_state.as_ref().map_or(0, String::len)
370                + self.content_hash.len()
371                + self.content.len()
372                + self.home_instance_id.as_ref().map_or(0, String::len)
373        })
374    }
375}
376
377impl BatchableItem for SyncItem {
378    fn id(&self) -> &str {
379        &self.object_id
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use serde_json::json;
387
388    #[test]
389    fn test_new_sync_item() {
390        let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
391        
392        assert_eq!(item.object_id, "test-id");
393        assert_eq!(item.version, 1);
394        assert!(item.updated_at > 0);
395        assert!(item.batch_id.is_none());
396        assert!(item.trace_parent.is_none());
397        assert!(item.trace_state.is_none());
398        assert_eq!(item.priority_score, 0.0);
399        // Content hash is computed eagerly for CDC dedup
400        // SHA256("hello") = 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
401        assert_eq!(item.content_hash, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824");
402        assert_eq!(item.last_accessed, 0);
403        assert_eq!(item.access_count, 0);
404        assert!(item.home_instance_id.is_none());
405        assert_eq!(item.content, b"hello");
406    }
407
408    #[test]
409    fn test_from_json() {
410        let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
411        
412        assert_eq!(item.object_id, "test-id");
413        // Content should be serialized JSON bytes
414        let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
415        assert_eq!(parsed, json!({"key": "value"}));
416    }
417
418    #[test]
419    fn test_content_as_json() {
420        let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
421        
422        let parsed = item.content_as_json().unwrap();
423        assert_eq!(parsed["nested"]["key"], 42);
424        
425        // Binary content should return None
426        let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
427        assert!(binary_item.content_as_json().is_none());
428    }
429
430    #[test]
431    fn test_size_bytes_calculation() {
432        let item = SyncItem::from_json(
433            "uk.nhs.patient.record.123456".to_string(),
434            json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
435        );
436        
437        let size = item.size_bytes();
438        
439        // Should be non-zero
440        assert!(size > 0);
441        
442        // Should include struct overhead + content
443        assert!(size > std::mem::size_of::<SyncItem>());
444    }
445
446    #[test]
447    fn test_size_bytes_cached() {
448        let item = SyncItem::new("test".to_string(), b"data".to_vec());
449        
450        let size1 = item.size_bytes();
451        let size2 = item.size_bytes();
452        
453        // Same value (cached)
454        assert_eq!(size1, size2);
455    }
456
457    #[test]
458    fn test_size_includes_optional_fields() {
459        let mut item = SyncItem::new("test".to_string(), vec![]);
460        
461        // Manually set optional fields
462        item.trace_parent = Some("00-abc123-def456-01".to_string());
463        item.trace_state = Some("vendor=data".to_string());
464        item.home_instance_id = Some("instance-1".to_string());
465        
466        let size = item.size_bytes();
467        
468        // Should be larger than minimal
469        // Note: can't compare directly because cached_size is already set
470        // But we can verify size includes the optional field lengths
471        assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
472    }
473
474    #[test]
475    fn test_serialize_deserialize() {
476        let item = SyncItem::from_json(
477            "test-id".to_string(),
478            json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
479        );
480        
481        let json_str = serde_json::to_string(&item).unwrap();
482        let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
483        
484        assert_eq!(deserialized.object_id, item.object_id);
485        assert_eq!(deserialized.version, item.version);
486        assert_eq!(deserialized.content, item.content);
487    }
488
489    #[test]
490    fn test_serialize_skips_none_batch_id() {
491        let item = SyncItem::new("test".to_string(), vec![]);
492        
493        let json_str = serde_json::to_string(&item).unwrap();
494        
495        // batch_id should not appear in JSON when None
496        assert!(!json_str.contains("batch_id"));
497    }
498
499    #[test]
500    fn test_serialize_includes_batch_id_when_some() {
501        let mut item = SyncItem::new("test".to_string(), vec![]);
502        item.batch_id = Some("batch-123".to_string());
503        
504        let json_str = serde_json::to_string(&item).unwrap();
505        
506        assert!(json_str.contains("batch_id"));
507        assert!(json_str.contains("batch-123"));
508    }
509
510    #[test]
511    fn test_clone() {
512        let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
513        let cloned = item.clone();
514        
515        assert_eq!(cloned.object_id, item.object_id);
516        assert_eq!(cloned.content, item.content);
517    }
518
519    #[test]
520    fn test_debug_format() {
521        let item = SyncItem::new("test".to_string(), vec![]);
522        let debug_str = format!("{:?}", item);
523        
524        assert!(debug_str.contains("SyncItem"));
525        assert!(debug_str.contains("test"));
526    }
527
528    #[test]
529    fn test_updated_at_is_recent() {
530        let before = std::time::SystemTime::now()
531            .duration_since(std::time::UNIX_EPOCH)
532            .unwrap()
533            .as_millis() as i64;
534        
535        let item = SyncItem::new("test".to_string(), vec![]);
536        
537        let after = std::time::SystemTime::now()
538            .duration_since(std::time::UNIX_EPOCH)
539            .unwrap()
540            .as_millis() as i64;
541        
542        assert!(item.updated_at >= before);
543        assert!(item.updated_at <= after);
544    }
545
546    #[test]
547    fn test_large_content_size() {
548        // Create item with large binary content
549        let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
550        let item = SyncItem::new("large".to_string(), large_data);
551        
552        let size = item.size_bytes();
553        
554        // Should be substantial (10000 * 4 bytes = 40000)
555        assert!(size > 10000, "Large content should result in large size");
556    }
557
558    #[test]
559    fn test_state_default() {
560        let item = SyncItem::new("test".to_string(), b"data".to_vec());
561        assert_eq!(item.state, "default");
562    }
563
564    #[test]
565    fn test_state_with_state_builder() {
566        let item = SyncItem::new("test".to_string(), b"data".to_vec())
567            .with_state("delta");
568        assert_eq!(item.state, "delta");
569    }
570
571    #[test]
572    fn test_state_with_state_chaining() {
573        let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
574            .with_state("pending");
575        
576        assert_eq!(item.state, "pending");
577        assert_eq!(item.object_id, "test");
578    }
579
580    #[test]
581    fn test_state_serialization() {
582        let item = SyncItem::new("test".to_string(), b"data".to_vec())
583            .with_state("custom_state");
584        
585        let json = serde_json::to_string(&item).unwrap();
586        assert!(json.contains("\"state\":\"custom_state\""));
587        
588        // Deserialize back
589        let parsed: SyncItem = serde_json::from_str(&json).unwrap();
590        assert_eq!(parsed.state, "custom_state");
591    }
592
593    #[test]
594    fn test_state_deserialization_default() {
595        // JSON without state field should default to "default"
596        // Uses merkle_root in JSON to test serde alias backward compat
597        let json = r#"{
598            "object_id": "test",
599            "version": 1,
600            "updated_at": 12345,
601            "priority_score": 0.0,
602            "merkle_root": "",
603            "last_accessed": 0,
604            "access_count": 0,
605            "content": [100, 97, 116, 97]
606        }"#;
607        
608        let item: SyncItem = serde_json::from_str(json).unwrap();
609        assert_eq!(item.state, "default");
610    }
611}