sync_engine/
sync_item.rs

1//! Sync item data structure.
2//!
3//! The [`SyncItem`] is the core data unit that flows through the sync engine.
4//! Each item has a hierarchical ID (reverse DNS style), version, and binary content.
5//!
6//! # Binary Content
7//!
8//! The `content` field is `Vec<u8>` - raw bytes that sync-engine treats as opaque.
9//! The caller is responsible for serialization (JSON, MessagePack, Cap'n Proto, etc.).
10//!
11//! ```rust
12//! use sync_engine::SyncItem;
13//! use serde_json::json;
14//!
15//! // Store JSON as bytes
16//! let json_bytes = serde_json::to_vec(&json!({"name": "Alice"})).unwrap();
17//! let item = SyncItem::new("user.123".into(), json_bytes);
18//!
19//! // Or store any binary format
20//! let binary_data = vec![0x01, 0x02, 0x03];
21//! let item = SyncItem::new("binary.456".into(), binary_data);
22//! ```
23
24use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
27use crate::submit_options::SubmitOptions;
28
29/// Content type classification for storage routing.
30///
31/// This enables intelligent storage: JSON content can be stored in Redis as
32/// searchable hashes (HSET) and in SQL as queryable JSON columns, while binary
33/// content uses efficient blob storage.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
35#[serde(rename_all = "lowercase")]
36pub enum ContentType {
37    /// JSON content - stored as Redis HASH, SQL JSON column
38    /// Enables: RedisSearch FT.SEARCH, SQL JSON path queries
39    Json,
40    /// Binary/opaque content - stored as Redis STRING, SQL BLOB
41    /// Fast path for non-structured data
42    #[default]
43    Binary,
44}
45
46impl ContentType {
47    /// Detect content type from raw bytes.
48    /// 
49    /// Fast heuristic: checks first non-whitespace byte for JSON indicators,
50    /// then validates with a full parse only for likely-JSON content.
51    #[must_use]
52    pub fn detect(content: &[u8]) -> Self {
53        // Empty content is binary
54        if content.is_empty() {
55            return ContentType::Binary;
56        }
57        
58        // Fast path: check first non-whitespace byte
59        let first = content.iter().find(|b| !b.is_ascii_whitespace());
60        match first {
61            Some(b'{') | Some(b'[') | Some(b'"') => {
62                // Likely JSON - validate with parse
63                if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
64                    ContentType::Json
65                } else {
66                    ContentType::Binary
67                }
68            }
69            _ => ContentType::Binary
70        }
71    }
72    
73    /// Check if this is JSON content
74    #[inline]
75    #[must_use]
76    pub fn is_json(&self) -> bool {
77        matches!(self, ContentType::Json)
78    }
79    
80    /// Check if this is binary content
81    #[inline]
82    #[must_use]
83    pub fn is_binary(&self) -> bool {
84        matches!(self, ContentType::Binary)
85    }
86}
87
88/// A wrapper struct that separates metadata from content.
89///
90/// # Binary-First Design
91///
92/// sync-engine is a **dumb storage layer** - it stores your bytes and routes
93/// them to L1/L2/L3 based on [`SubmitOptions`]. The
94/// `content` field is opaque `Vec<u8>` that we never interpret.
95///
96/// # Example
97///
98/// ```rust
99/// use sync_engine::SyncItem;
100/// use serde_json::json;
101///
102/// // JSON content (serialize to bytes yourself)
103/// let json_bytes = serde_json::to_vec(&json!({"name": "John Doe"})).unwrap();
104/// let item = SyncItem::new("uk.nhs.patient.12345".into(), json_bytes);
105///
106/// assert_eq!(item.object_id, "uk.nhs.patient.12345");
107/// assert_eq!(item.version, 1);
108/// ```
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SyncItem {
111    /// Reverse DNS style ID (e.g., `uk.nhs.patient.record.1234567890`)
112    pub object_id: String,
113    /// Version number (monotonically increasing within this item)
114    pub version: u64,
115    /// Last update timestamp (epoch millis)
116    pub updated_at: i64,
117    /// Content type (json or binary) - determines storage format
118    /// JSON → Redis HSET (searchable), SQL JSON column
119    /// Binary → Redis SET, SQL BLOB column
120    #[serde(default)]
121    pub content_type: ContentType,
122    /// Batch ID for tracking batch writes (UUID, set during batch flush)
123    #[serde(default, skip_serializing_if = "Option::is_none")]
124    pub batch_id: Option<String>,
125    /// W3C Trace Context traceparent header (for cross-item trace linking)
126    /// Format: "00-{trace_id}-{span_id}-{flags}"
127    /// This is NOT for in-process tracing (that flows via Span::current()),
128    /// but for linking related operations across items/time.
129    pub trace_parent: Option<String>,
130    /// W3C Trace Context tracestate header (optional vendor-specific data)
131    pub trace_state: Option<String>,
132    /// Reserved for future use. Currently unused.
133    #[doc(hidden)]
134    pub priority_score: f64,
135    /// Hash of the content for quick integrity checks
136    pub merkle_root: String,
137    /// Timestamp of last access (epoch millis)
138    pub last_accessed: u64,
139    /// Number of times accessed
140    pub access_count: u64,
141    /// The actual payload (opaque binary, caller handles serialization)
142    #[serde(with = "serde_bytes")]
143    pub content: Vec<u8>,
144    /// Optional guest data owner ID (for routing engine)
145    pub home_instance_id: Option<String>,
146    
147    /// Transient submit options (travels with item through pipeline, not serialized)
148    /// Set via `submit_with()`, defaults to `SubmitOptions::default()` if None.
149    #[serde(skip)]
150    pub(crate) submit_options: Option<SubmitOptions>,
151    
152    /// Cached computed size in bytes (lazily computed, not serialized)
153    #[serde(skip)]
154    cached_size: OnceLock<usize>,
155}
156
157impl SyncItem {
158    /// Create a new SyncItem with binary content.
159    ///
160    /// The content type is auto-detected: if the bytes are valid JSON,
161    /// `content_type` will be `Json`, otherwise `Binary`. This enables
162    /// intelligent storage routing (HSET vs SET in Redis, JSON vs BLOB in SQL).
163    ///
164    /// # Example
165    ///
166    /// ```rust
167    /// use sync_engine::{SyncItem, ContentType};
168    ///
169    /// // From raw bytes (detected as Binary)
170    /// let item = SyncItem::new("id".into(), vec![1, 2, 3]);
171    /// assert_eq!(item.content_type, ContentType::Binary);
172    ///
173    /// // From JSON bytes (detected as Json)
174    /// let json = serde_json::to_vec(&serde_json::json!({"key": "value"})).unwrap();
175    /// let item = SyncItem::new("id".into(), json);
176    /// assert_eq!(item.content_type, ContentType::Json);
177    /// ```
178    pub fn new(object_id: String, content: Vec<u8>) -> Self {
179        let content_type = ContentType::detect(&content);
180        Self {
181            object_id,
182            version: 1,
183            updated_at: std::time::SystemTime::now()
184                .duration_since(std::time::UNIX_EPOCH)
185                .unwrap_or_default()
186                .as_millis() as i64,
187            content_type,
188            batch_id: None,
189            trace_parent: None,
190            trace_state: None,
191            priority_score: 0.0,
192            merkle_root: String::new(),
193            last_accessed: 0,
194            access_count: 0,
195            content,
196            home_instance_id: None,
197            submit_options: None,  // Set via submit_with() if needed
198            cached_size: OnceLock::new(),
199        }
200    }
201
202    /// Create a new SyncItem from a JSON value (convenience method).
203    ///
204    /// This serializes the JSON to bytes and sets `content_type` to `Json`.
205    /// For binary formats (MessagePack, Cap'n Proto), use [`new`](Self::new).
206    pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
207        let content = serde_json::to_vec(&value).unwrap_or_default();
208        let mut item = Self::new(object_id, content);
209        item.content_type = ContentType::Json; // Explicit, since we know it's JSON
210        item
211    }
212    
213    /// Reconstruct a SyncItem from stored components (used by storage backends).
214    /// 
215    /// This allows storage backends to rebuild a SyncItem from flattened data
216    /// (e.g., Redis HGETALL, SQL column reads) without accessing private fields.
217    #[doc(hidden)]
218    #[allow(clippy::too_many_arguments)]
219    pub fn reconstruct(
220        object_id: String,
221        version: u64,
222        updated_at: i64,
223        content_type: ContentType,
224        content: Vec<u8>,
225        batch_id: Option<String>,
226        trace_parent: Option<String>,
227        merkle_root: String,
228        home_instance_id: Option<String>,
229    ) -> Self {
230        Self {
231            object_id,
232            version,
233            updated_at,
234            content_type,
235            batch_id,
236            trace_parent,
237            trace_state: None,
238            priority_score: 0.0,
239            merkle_root,
240            last_accessed: 0,
241            access_count: 0,
242            content,
243            home_instance_id,
244            submit_options: None,
245            cached_size: OnceLock::new(),
246        }
247    }
248
249    /// Set submit options for this item (builder pattern).
250    ///
251    /// These options control where the item is stored (Redis, SQL) and
252    /// how it's compressed. Options travel with the item through the
253    /// batch pipeline.
254    ///
255    /// # Example
256    ///
257    /// ```rust
258    /// use sync_engine::{SyncItem, SubmitOptions, CacheTtl};
259    ///
260    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec())
261    ///     .with_options(SubmitOptions::cache(CacheTtl::Minute));
262    /// ```
263    #[must_use]
264    pub fn with_options(mut self, options: SubmitOptions) -> Self {
265        self.submit_options = Some(options);
266        self
267    }
268
269    /// Get the effective submit options (returns default if not set).
270    #[must_use]
271    pub fn effective_options(&self) -> SubmitOptions {
272        self.submit_options.clone().unwrap_or_default()
273    }
274
275    /// Try to parse content as JSON.
276    ///
277    /// Returns `None` if content is not valid JSON.
278    #[must_use]
279    pub fn content_as_json(&self) -> Option<serde_json::Value> {
280        serde_json::from_slice(&self.content).ok()
281    }
282
283    /// Attach trace context from current span (for distributed trace linking)
284    #[cfg(feature = "otel")]
285    pub fn with_current_trace_context(mut self) -> Self {
286        use opentelemetry::trace::TraceContextExt;
287        use tracing_opentelemetry::OpenTelemetrySpanExt;
288        
289        let cx = tracing::Span::current().context();
290        let span_ref = cx.span();
291        let sc = span_ref.span_context();
292        if sc.is_valid() {
293            self.trace_parent = Some(format!(
294                "00-{}-{}-{:02x}",
295                sc.trace_id(),
296                sc.span_id(),
297                sc.trace_flags().to_u8()
298            ));
299        }
300        self
301    }
302}
303
304impl SizedItem for SyncItem {
305    fn size_bytes(&self) -> usize {
306        *self.cached_size.get_or_init(|| {
307            // Approximate size: struct overhead + string lengths + content bytes
308            std::mem::size_of::<Self>()
309                + self.object_id.len()
310                + self.trace_parent.as_ref().map_or(0, String::len)
311                + self.trace_state.as_ref().map_or(0, String::len)
312                + self.merkle_root.len()
313                + self.content.len()
314                + self.home_instance_id.as_ref().map_or(0, String::len)
315        })
316    }
317}
318
319impl BatchableItem for SyncItem {
320    fn id(&self) -> &str {
321        &self.object_id
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use serde_json::json;
329
330    #[test]
331    fn test_new_sync_item() {
332        let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
333        
334        assert_eq!(item.object_id, "test-id");
335        assert_eq!(item.version, 1);
336        assert!(item.updated_at > 0);
337        assert!(item.batch_id.is_none());
338        assert!(item.trace_parent.is_none());
339        assert!(item.trace_state.is_none());
340        assert_eq!(item.priority_score, 0.0);
341        assert!(item.merkle_root.is_empty());
342        assert_eq!(item.last_accessed, 0);
343        assert_eq!(item.access_count, 0);
344        assert!(item.home_instance_id.is_none());
345        assert_eq!(item.content, b"hello");
346    }
347
348    #[test]
349    fn test_from_json() {
350        let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
351        
352        assert_eq!(item.object_id, "test-id");
353        // Content should be serialized JSON bytes
354        let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
355        assert_eq!(parsed, json!({"key": "value"}));
356    }
357
358    #[test]
359    fn test_content_as_json() {
360        let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
361        
362        let parsed = item.content_as_json().unwrap();
363        assert_eq!(parsed["nested"]["key"], 42);
364        
365        // Binary content should return None
366        let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
367        assert!(binary_item.content_as_json().is_none());
368    }
369
370    #[test]
371    fn test_size_bytes_calculation() {
372        let item = SyncItem::from_json(
373            "uk.nhs.patient.record.123456".to_string(),
374            json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
375        );
376        
377        let size = item.size_bytes();
378        
379        // Should be non-zero
380        assert!(size > 0);
381        
382        // Should include struct overhead + content
383        assert!(size > std::mem::size_of::<SyncItem>());
384    }
385
386    #[test]
387    fn test_size_bytes_cached() {
388        let item = SyncItem::new("test".to_string(), b"data".to_vec());
389        
390        let size1 = item.size_bytes();
391        let size2 = item.size_bytes();
392        
393        // Same value (cached)
394        assert_eq!(size1, size2);
395    }
396
397    #[test]
398    fn test_size_includes_optional_fields() {
399        let mut item = SyncItem::new("test".to_string(), vec![]);
400        
401        // Manually set optional fields
402        item.trace_parent = Some("00-abc123-def456-01".to_string());
403        item.trace_state = Some("vendor=data".to_string());
404        item.home_instance_id = Some("instance-1".to_string());
405        
406        let size = item.size_bytes();
407        
408        // Should be larger than minimal
409        // Note: can't compare directly because cached_size is already set
410        // But we can verify size includes the optional field lengths
411        assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
412    }
413
414    #[test]
415    fn test_serialize_deserialize() {
416        let item = SyncItem::from_json(
417            "test-id".to_string(),
418            json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
419        );
420        
421        let json_str = serde_json::to_string(&item).unwrap();
422        let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
423        
424        assert_eq!(deserialized.object_id, item.object_id);
425        assert_eq!(deserialized.version, item.version);
426        assert_eq!(deserialized.content, item.content);
427    }
428
429    #[test]
430    fn test_serialize_skips_none_batch_id() {
431        let item = SyncItem::new("test".to_string(), vec![]);
432        
433        let json_str = serde_json::to_string(&item).unwrap();
434        
435        // batch_id should not appear in JSON when None
436        assert!(!json_str.contains("batch_id"));
437    }
438
439    #[test]
440    fn test_serialize_includes_batch_id_when_some() {
441        let mut item = SyncItem::new("test".to_string(), vec![]);
442        item.batch_id = Some("batch-123".to_string());
443        
444        let json_str = serde_json::to_string(&item).unwrap();
445        
446        assert!(json_str.contains("batch_id"));
447        assert!(json_str.contains("batch-123"));
448    }
449
450    #[test]
451    fn test_clone() {
452        let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
453        let cloned = item.clone();
454        
455        assert_eq!(cloned.object_id, item.object_id);
456        assert_eq!(cloned.content, item.content);
457    }
458
459    #[test]
460    fn test_debug_format() {
461        let item = SyncItem::new("test".to_string(), vec![]);
462        let debug_str = format!("{:?}", item);
463        
464        assert!(debug_str.contains("SyncItem"));
465        assert!(debug_str.contains("test"));
466    }
467
468    #[test]
469    fn test_updated_at_is_recent() {
470        let before = std::time::SystemTime::now()
471            .duration_since(std::time::UNIX_EPOCH)
472            .unwrap()
473            .as_millis() as i64;
474        
475        let item = SyncItem::new("test".to_string(), vec![]);
476        
477        let after = std::time::SystemTime::now()
478            .duration_since(std::time::UNIX_EPOCH)
479            .unwrap()
480            .as_millis() as i64;
481        
482        assert!(item.updated_at >= before);
483        assert!(item.updated_at <= after);
484    }
485
486    #[test]
487    fn test_large_content_size() {
488        // Create item with large binary content
489        let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
490        let item = SyncItem::new("large".to_string(), large_data);
491        
492        let size = item.size_bytes();
493        
494        // Should be substantial (10000 * 4 bytes = 40000)
495        assert!(size > 10000, "Large content should result in large size");
496    }
497}