sync_engine/
sync_item.rs

1//! Sync item data structure.
2//!
3//! The [`SyncItem`] is the core data unit that flows through the sync engine.
4//! Each item has a hierarchical ID (reverse DNS style), version, and binary content.
5//!
6//! # Binary Content
7//!
8//! The `content` field is `Vec<u8>` - raw bytes that sync-engine treats as opaque.
9//! The caller is responsible for serialization (JSON, MessagePack, Cap'n Proto, etc.).
10//!
11//! ```rust
12//! use sync_engine::SyncItem;
13//! use serde_json::json;
14//!
15//! // Store JSON as bytes
16//! let json_bytes = serde_json::to_vec(&json!({"name": "Alice"})).unwrap();
17//! let item = SyncItem::new("user.123".into(), json_bytes);
18//!
19//! // Or store any binary format
20//! let binary_data = vec![0x01, 0x02, 0x03];
21//! let item = SyncItem::new("binary.456".into(), binary_data);
22//! ```
23
24use std::sync::OnceLock;
25use serde::{Deserialize, Serialize};
26use crate::batching::hybrid_batcher::{SizedItem, BatchableItem};
27use crate::submit_options::SubmitOptions;
28
29/// Content type classification for storage routing.
30///
31/// This enables intelligent storage: JSON content can be stored in Redis as
32/// searchable hashes (HSET) and in SQL as queryable JSON columns, while binary
33/// content uses efficient blob storage.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
35#[serde(rename_all = "lowercase")]
36pub enum ContentType {
37    /// JSON content - stored as Redis HASH, SQL JSON column
38    /// Enables: RedisSearch FT.SEARCH, SQL JSON path queries
39    Json,
40    /// Binary/opaque content - stored as Redis STRING, SQL BLOB
41    /// Fast path for non-structured data
42    #[default]
43    Binary,
44}
45
46impl ContentType {
47    /// Detect content type from raw bytes.
48    /// 
49    /// Fast heuristic: checks first non-whitespace byte for JSON indicators,
50    /// then validates with a full parse only for likely-JSON content.
51    #[must_use]
52    pub fn detect(content: &[u8]) -> Self {
53        // Empty content is binary
54        if content.is_empty() {
55            return ContentType::Binary;
56        }
57        
58        // Fast path: check first non-whitespace byte
59        let first = content.iter().find(|b| !b.is_ascii_whitespace());
60        match first {
61            Some(b'{') | Some(b'[') | Some(b'"') => {
62                // Likely JSON - validate with parse
63                if serde_json::from_slice::<serde_json::Value>(content).is_ok() {
64                    ContentType::Json
65                } else {
66                    ContentType::Binary
67                }
68            }
69            _ => ContentType::Binary
70        }
71    }
72    
73    /// Check if this is JSON content
74    #[inline]
75    #[must_use]
76    pub fn is_json(&self) -> bool {
77        matches!(self, ContentType::Json)
78    }
79    
80    /// Check if this is binary content
81    #[inline]
82    #[must_use]
83    pub fn is_binary(&self) -> bool {
84        matches!(self, ContentType::Binary)
85    }
86}
87
88/// A wrapper struct that separates metadata from content.
89///
90/// # Binary-First Design
91///
92/// sync-engine is a **dumb storage layer** - it stores your bytes and routes
93/// them to L1/L2/L3 based on [`SubmitOptions`]. The
94/// `content` field is opaque `Vec<u8>` that we never interpret.
95///
96/// # Example
97///
98/// ```rust
99/// use sync_engine::SyncItem;
100/// use serde_json::json;
101///
102/// // JSON content (serialize to bytes yourself)
103/// let json_bytes = serde_json::to_vec(&json!({"name": "John Doe"})).unwrap();
104/// let item = SyncItem::new("uk.nhs.patient.12345".into(), json_bytes);
105///
106/// assert_eq!(item.object_id, "uk.nhs.patient.12345");
107/// assert_eq!(item.version, 1);
108/// ```
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct SyncItem {
111    /// Reverse DNS style ID (e.g., `uk.nhs.patient.record.1234567890`)
112    pub object_id: String,
113    /// Version number (monotonically increasing within this item)
114    pub version: u64,
115    /// Last update timestamp (epoch millis)
116    pub updated_at: i64,
117    /// Content type (json or binary) - determines storage format
118    /// JSON → Redis HSET (searchable), SQL JSON column
119    /// Binary → Redis SET, SQL BLOB column
120    #[serde(default)]
121    pub content_type: ContentType,
122    /// Batch ID for tracking batch writes (UUID, set during batch flush)
123    #[serde(default, skip_serializing_if = "Option::is_none")]
124    pub batch_id: Option<String>,
125    /// W3C Trace Context traceparent header (for cross-item trace linking)
126    /// Format: "00-{trace_id}-{span_id}-{flags}"
127    /// This is NOT for in-process tracing (that flows via Span::current()),
128    /// but for linking related operations across items/time.
129    pub trace_parent: Option<String>,
130    /// W3C Trace Context tracestate header (optional vendor-specific data)
131    pub trace_state: Option<String>,
132    /// Reserved for future use. Currently unused.
133    #[doc(hidden)]
134    pub priority_score: f64,
135    /// Hash of the content for quick integrity checks
136    pub merkle_root: String,
137    /// Timestamp of last access (epoch millis)
138    pub last_accessed: u64,
139    /// Number of times accessed
140    pub access_count: u64,
141    /// The actual payload (opaque binary, caller handles serialization)
142    #[serde(with = "serde_bytes")]
143    pub content: Vec<u8>,
144    /// Optional guest data owner ID (for routing engine)
145    pub home_instance_id: Option<String>,
146    /// Arbitrary state tag for caller-defined grouping (e.g., "delta", "base", "pending").
147    /// Indexed in SQL and tracked via Redis SETs for fast state-based queries.
148    /// Default: "default"
149    #[serde(default = "default_state")]
150    pub state: String,
151    
152    /// Transient submit options (travels with item through pipeline, not serialized)
153    /// Set via `submit_with()`, defaults to `SubmitOptions::default()` if None.
154    #[serde(skip)]
155    pub(crate) submit_options: Option<SubmitOptions>,
156    
157    /// Cached computed size in bytes (lazily computed, not serialized)
158    #[serde(skip)]
159    cached_size: OnceLock<usize>,
160}
161
162/// Default state value for SyncItem
163fn default_state() -> String {
164    "default".to_string()
165}
166
167impl SyncItem {
168    /// Create a new SyncItem with binary content.
169    ///
170    /// The content type is auto-detected: if the bytes are valid JSON,
171    /// `content_type` will be `Json`, otherwise `Binary`. This enables
172    /// intelligent storage routing (HSET vs SET in Redis, JSON vs BLOB in SQL).
173    ///
174    /// # Example
175    ///
176    /// ```rust
177    /// use sync_engine::{SyncItem, ContentType};
178    ///
179    /// // From raw bytes (detected as Binary)
180    /// let item = SyncItem::new("id".into(), vec![1, 2, 3]);
181    /// assert_eq!(item.content_type, ContentType::Binary);
182    ///
183    /// // From JSON bytes (detected as Json)
184    /// let json = serde_json::to_vec(&serde_json::json!({"key": "value"})).unwrap();
185    /// let item = SyncItem::new("id".into(), json);
186    /// assert_eq!(item.content_type, ContentType::Json);
187    /// ```
188    pub fn new(object_id: String, content: Vec<u8>) -> Self {
189        let content_type = ContentType::detect(&content);
190        Self {
191            object_id,
192            version: 1,
193            updated_at: std::time::SystemTime::now()
194                .duration_since(std::time::UNIX_EPOCH)
195                .unwrap_or_default()
196                .as_millis() as i64,
197            content_type,
198            batch_id: None,
199            trace_parent: None,
200            trace_state: None,
201            priority_score: 0.0,
202            merkle_root: String::new(),
203            last_accessed: 0,
204            access_count: 0,
205            content,
206            home_instance_id: None,
207            state: "default".to_string(),
208            submit_options: None,  // Set via submit_with() if needed
209            cached_size: OnceLock::new(),
210        }
211    }
212
213    /// Create a new SyncItem from a JSON value (convenience method).
214    ///
215    /// This serializes the JSON to bytes and sets `content_type` to `Json`.
216    /// For binary formats (MessagePack, Cap'n Proto), use [`new`](Self::new).
217    pub fn from_json(object_id: String, value: serde_json::Value) -> Self {
218        let content = serde_json::to_vec(&value).unwrap_or_default();
219        let mut item = Self::new(object_id, content);
220        item.content_type = ContentType::Json; // Explicit, since we know it's JSON
221        item
222    }
223
224    /// Create a new SyncItem from any serializable type.
225    ///
226    /// This avoids creating an intermediate `serde_json::Value` if you have a struct.
227    /// This is more efficient than `from_json` if you already have a typed object.
228    pub fn from_serializable<T: Serialize>(object_id: String, value: &T) -> Result<Self, serde_json::Error> {
229        let content = serde_json::to_vec(value)?;
230        let mut item = Self::new(object_id, content);
231        item.content_type = ContentType::Json;
232        Ok(item)
233    }
234    
235    /// Reconstruct a SyncItem from stored components (used by storage backends).
236    /// 
237    /// This allows storage backends to rebuild a SyncItem from flattened data
238    /// (e.g., Redis HGETALL, SQL column reads) without accessing private fields.
239    #[doc(hidden)]
240    #[allow(clippy::too_many_arguments)]
241    pub fn reconstruct(
242        object_id: String,
243        version: u64,
244        updated_at: i64,
245        content_type: ContentType,
246        content: Vec<u8>,
247        batch_id: Option<String>,
248        trace_parent: Option<String>,
249        merkle_root: String,
250        home_instance_id: Option<String>,
251        state: String,
252    ) -> Self {
253        Self {
254            object_id,
255            version,
256            updated_at,
257            content_type,
258            batch_id,
259            trace_parent,
260            trace_state: None,
261            priority_score: 0.0,
262            merkle_root,
263            last_accessed: 0,
264            access_count: 0,
265            content,
266            home_instance_id,
267            state,
268            submit_options: None,
269            cached_size: OnceLock::new(),
270        }
271    }
272
273    /// Set submit options for this item (builder pattern).
274    ///
275    /// These options control where the item is stored (Redis, SQL) and
276    /// how it's compressed. Options travel with the item through the
277    /// batch pipeline.
278    ///
279    /// # Example
280    ///
281    /// ```rust
282    /// use sync_engine::{SyncItem, SubmitOptions, CacheTtl};
283    ///
284    /// let item = SyncItem::new("cache.key".into(), b"data".to_vec())
285    ///     .with_options(SubmitOptions::cache(CacheTtl::Minute));
286    /// ```
287    #[must_use]
288    pub fn with_options(mut self, options: SubmitOptions) -> Self {
289        self.submit_options = Some(options);
290        self
291    }
292    
293    /// Set state tag for this item (builder pattern).
294    ///
295    /// State is an arbitrary string for caller-defined grouping.
296    /// Common uses: "delta"/"base" for CRDTs, "pending"/"approved" for workflows.
297    ///
298    /// # Example
299    ///
300    /// ```rust
301    /// use sync_engine::SyncItem;
302    ///
303    /// let item = SyncItem::new("crdt.123".into(), b"data".to_vec())
304    ///     .with_state("delta");
305    /// ```
306    #[must_use]
307    pub fn with_state(mut self, state: impl Into<String>) -> Self {
308        self.state = state.into();
309        self
310    }
311
312    /// Get the effective submit options (returns default if not set).
313    #[must_use]
314    pub fn effective_options(&self) -> SubmitOptions {
315        self.submit_options.clone().unwrap_or_default()
316    }
317
318    /// Try to parse content as JSON.
319    ///
320    /// Returns `None` if content is not valid JSON.
321    #[must_use]
322    pub fn content_as_json(&self) -> Option<serde_json::Value> {
323        serde_json::from_slice(&self.content).ok()
324    }
325
326    /// Attach trace context from current span (for distributed trace linking)
327    #[cfg(feature = "otel")]
328    pub fn with_current_trace_context(mut self) -> Self {
329        use opentelemetry::trace::TraceContextExt;
330        use tracing_opentelemetry::OpenTelemetrySpanExt;
331        
332        let cx = tracing::Span::current().context();
333        let span_ref = cx.span();
334        let sc = span_ref.span_context();
335        if sc.is_valid() {
336            self.trace_parent = Some(format!(
337                "00-{}-{}-{:02x}",
338                sc.trace_id(),
339                sc.span_id(),
340                sc.trace_flags().to_u8()
341            ));
342        }
343        self
344    }
345}
346
347impl SizedItem for SyncItem {
348    fn size_bytes(&self) -> usize {
349        *self.cached_size.get_or_init(|| {
350            // Approximate size: struct overhead + string lengths + content bytes
351            std::mem::size_of::<Self>()
352                + self.object_id.len()
353                + self.trace_parent.as_ref().map_or(0, String::len)
354                + self.trace_state.as_ref().map_or(0, String::len)
355                + self.merkle_root.len()
356                + self.content.len()
357                + self.home_instance_id.as_ref().map_or(0, String::len)
358        })
359    }
360}
361
362impl BatchableItem for SyncItem {
363    fn id(&self) -> &str {
364        &self.object_id
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use serde_json::json;
372
373    #[test]
374    fn test_new_sync_item() {
375        let item = SyncItem::new("test-id".to_string(), b"hello".to_vec());
376        
377        assert_eq!(item.object_id, "test-id");
378        assert_eq!(item.version, 1);
379        assert!(item.updated_at > 0);
380        assert!(item.batch_id.is_none());
381        assert!(item.trace_parent.is_none());
382        assert!(item.trace_state.is_none());
383        assert_eq!(item.priority_score, 0.0);
384        assert!(item.merkle_root.is_empty());
385        assert_eq!(item.last_accessed, 0);
386        assert_eq!(item.access_count, 0);
387        assert!(item.home_instance_id.is_none());
388        assert_eq!(item.content, b"hello");
389    }
390
391    #[test]
392    fn test_from_json() {
393        let item = SyncItem::from_json("test-id".to_string(), json!({"key": "value"}));
394        
395        assert_eq!(item.object_id, "test-id");
396        // Content should be serialized JSON bytes
397        let parsed: serde_json::Value = serde_json::from_slice(&item.content).unwrap();
398        assert_eq!(parsed, json!({"key": "value"}));
399    }
400
401    #[test]
402    fn test_content_as_json() {
403        let item = SyncItem::from_json("test".into(), json!({"nested": {"key": 42}}));
404        
405        let parsed = item.content_as_json().unwrap();
406        assert_eq!(parsed["nested"]["key"], 42);
407        
408        // Binary content should return None
409        let binary_item = SyncItem::new("bin".into(), vec![0xFF, 0xFE, 0x00]);
410        assert!(binary_item.content_as_json().is_none());
411    }
412
413    #[test]
414    fn test_size_bytes_calculation() {
415        let item = SyncItem::from_json(
416            "uk.nhs.patient.record.123456".to_string(),
417            json!({"name": "John Doe", "age": 42, "conditions": ["diabetes", "hypertension"]})
418        );
419        
420        let size = item.size_bytes();
421        
422        // Should be non-zero
423        assert!(size > 0);
424        
425        // Should include struct overhead + content
426        assert!(size > std::mem::size_of::<SyncItem>());
427    }
428
429    #[test]
430    fn test_size_bytes_cached() {
431        let item = SyncItem::new("test".to_string(), b"data".to_vec());
432        
433        let size1 = item.size_bytes();
434        let size2 = item.size_bytes();
435        
436        // Same value (cached)
437        assert_eq!(size1, size2);
438    }
439
440    #[test]
441    fn test_size_includes_optional_fields() {
442        let mut item = SyncItem::new("test".to_string(), vec![]);
443        
444        // Manually set optional fields
445        item.trace_parent = Some("00-abc123-def456-01".to_string());
446        item.trace_state = Some("vendor=data".to_string());
447        item.home_instance_id = Some("instance-1".to_string());
448        
449        let size = item.size_bytes();
450        
451        // Should be larger than minimal
452        // Note: can't compare directly because cached_size is already set
453        // But we can verify size includes the optional field lengths
454        assert!(size > std::mem::size_of::<SyncItem>() + "test".len());
455    }
456
457    #[test]
458    fn test_serialize_deserialize() {
459        let item = SyncItem::from_json(
460            "test-id".to_string(),
461            json!({"nested": {"key": "value"}, "array": [1, 2, 3]})
462        );
463        
464        let json_str = serde_json::to_string(&item).unwrap();
465        let deserialized: SyncItem = serde_json::from_str(&json_str).unwrap();
466        
467        assert_eq!(deserialized.object_id, item.object_id);
468        assert_eq!(deserialized.version, item.version);
469        assert_eq!(deserialized.content, item.content);
470    }
471
472    #[test]
473    fn test_serialize_skips_none_batch_id() {
474        let item = SyncItem::new("test".to_string(), vec![]);
475        
476        let json_str = serde_json::to_string(&item).unwrap();
477        
478        // batch_id should not appear in JSON when None
479        assert!(!json_str.contains("batch_id"));
480    }
481
482    #[test]
483    fn test_serialize_includes_batch_id_when_some() {
484        let mut item = SyncItem::new("test".to_string(), vec![]);
485        item.batch_id = Some("batch-123".to_string());
486        
487        let json_str = serde_json::to_string(&item).unwrap();
488        
489        assert!(json_str.contains("batch_id"));
490        assert!(json_str.contains("batch-123"));
491    }
492
493    #[test]
494    fn test_clone() {
495        let item = SyncItem::from_json("original".to_string(), json!({"key": "value"}));
496        let cloned = item.clone();
497        
498        assert_eq!(cloned.object_id, item.object_id);
499        assert_eq!(cloned.content, item.content);
500    }
501
502    #[test]
503    fn test_debug_format() {
504        let item = SyncItem::new("test".to_string(), vec![]);
505        let debug_str = format!("{:?}", item);
506        
507        assert!(debug_str.contains("SyncItem"));
508        assert!(debug_str.contains("test"));
509    }
510
511    #[test]
512    fn test_updated_at_is_recent() {
513        let before = std::time::SystemTime::now()
514            .duration_since(std::time::UNIX_EPOCH)
515            .unwrap()
516            .as_millis() as i64;
517        
518        let item = SyncItem::new("test".to_string(), vec![]);
519        
520        let after = std::time::SystemTime::now()
521            .duration_since(std::time::UNIX_EPOCH)
522            .unwrap()
523            .as_millis() as i64;
524        
525        assert!(item.updated_at >= before);
526        assert!(item.updated_at <= after);
527    }
528
529    #[test]
530    fn test_large_content_size() {
531        // Create item with large binary content
532        let large_data: Vec<u8> = (0..10000u32).flat_map(|i| i.to_le_bytes()).collect();
533        let item = SyncItem::new("large".to_string(), large_data);
534        
535        let size = item.size_bytes();
536        
537        // Should be substantial (10000 * 4 bytes = 40000)
538        assert!(size > 10000, "Large content should result in large size");
539    }
540
541    #[test]
542    fn test_state_default() {
543        let item = SyncItem::new("test".to_string(), b"data".to_vec());
544        assert_eq!(item.state, "default");
545    }
546
547    #[test]
548    fn test_state_with_state_builder() {
549        let item = SyncItem::new("test".to_string(), b"data".to_vec())
550            .with_state("delta");
551        assert_eq!(item.state, "delta");
552    }
553
554    #[test]
555    fn test_state_with_state_chaining() {
556        let item = SyncItem::from_json("test".into(), json!({"key": "value"}))
557            .with_state("pending");
558        
559        assert_eq!(item.state, "pending");
560        assert_eq!(item.object_id, "test");
561    }
562
563    #[test]
564    fn test_state_serialization() {
565        let item = SyncItem::new("test".to_string(), b"data".to_vec())
566            .with_state("custom_state");
567        
568        let json = serde_json::to_string(&item).unwrap();
569        assert!(json.contains("\"state\":\"custom_state\""));
570        
571        // Deserialize back
572        let parsed: SyncItem = serde_json::from_str(&json).unwrap();
573        assert_eq!(parsed.state, "custom_state");
574    }
575
576    #[test]
577    fn test_state_deserialization_default() {
578        // JSON without state field should default to "default"
579        let json = r#"{
580            "object_id": "test",
581            "version": 1,
582            "updated_at": 12345,
583            "priority_score": 0.0,
584            "merkle_root": "",
585            "last_accessed": 0,
586            "access_count": 0,
587            "content": [100, 97, 116, 97]
588        }"#;
589        
590        let item: SyncItem = serde_json::from_str(json).unwrap();
591        assert_eq!(item.state, "default");
592    }
593}