Skip to main content

fionn_stream/
format_crdt.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! Format-Aware CRDT Processor
3//!
4//! This module provides a CRDT-enabled processor that wraps `FormatDsonProcessor`,
5//! enabling conflict-free replicated document processing across distributed systems.
6
7use crate::format_dson::{FormatBatchProcessor, FormatBatchResult, FormatDsonProcessor};
8use crate::skiptape::CompiledSchema;
9use fionn_core::Result;
10use fionn_core::format::FormatKind;
11use fionn_ops::dson_traits::{
12    CrdtMerge, CrdtOperation, DeltaCrdt, MergeConflict, OpBasedCrdt, VectorClock,
13};
14use fionn_ops::{DsonOperation, MergeStrategy, OperationValue};
15use smallvec::SmallVec;
16use std::collections::HashMap;
17
18// =============================================================================
19// CRDT Delta Types
20// =============================================================================
21
22/// Delta state for format-aware CRDT synchronization
23#[derive(Debug, Clone)]
24pub struct FormatDelta {
25    /// Operations included in this delta
26    pub operations: Vec<CrdtOperation>,
27    /// Vector clock at delta generation
28    pub clock: VectorClock,
29    /// Format kind this delta applies to
30    pub format_kind: FormatKind,
31}
32
33impl FormatDelta {
34    /// Create a new empty delta
35    #[must_use]
36    pub fn new(format_kind: FormatKind) -> Self {
37        Self {
38            operations: Vec::new(),
39            clock: VectorClock::new(),
40            format_kind,
41        }
42    }
43
44    /// Create a delta with operations
45    #[must_use]
46    pub const fn with_operations(
47        operations: Vec<CrdtOperation>,
48        clock: VectorClock,
49        format_kind: FormatKind,
50    ) -> Self {
51        Self {
52            operations,
53            clock,
54            format_kind,
55        }
56    }
57
58    /// Check if delta is empty
59    #[must_use]
60    pub const fn is_empty(&self) -> bool {
61        self.operations.is_empty()
62    }
63
64    /// Get number of operations
65    #[must_use]
66    pub const fn len(&self) -> usize {
67        self.operations.len()
68    }
69}
70
71// =============================================================================
72// Document State
73// =============================================================================
74
75/// State for a single document in the CRDT
76#[derive(Debug, Clone)]
77struct DocumentState {
78    /// Current document content (JSON)
79    content: String,
80    /// Last write timestamp for each field path
81    field_timestamps: HashMap<String, u64>,
82}
83
84impl DocumentState {
85    fn new(content: String) -> Self {
86        Self {
87            content,
88            field_timestamps: HashMap::new(),
89        }
90    }
91}
92
93// =============================================================================
94// FormatCrdtProcessor
95// =============================================================================
96
97/// CRDT-enabled processor for any format implementing `FormatBatchProcessor`
98///
99/// This wraps a `FormatDsonProcessor` and adds CRDT semantics:
100/// - Vector clock tracking for causality
101/// - Last-Writer-Wins (LWW) register semantics per field
102/// - Delta-state synchronization
103/// - Operation buffering for causal ordering
104pub struct FormatCrdtProcessor<P: FormatBatchProcessor> {
105    /// The underlying DSON processor
106    dson_processor: FormatDsonProcessor<P>,
107    /// Replica identifier
108    replica_id: String,
109    /// Vector clock for causality tracking
110    vector_clock: VectorClock,
111    /// Lamport timestamp counter
112    lamport_timestamp: u64,
113    /// Default merge strategy
114    default_strategy: MergeStrategy,
115    /// Document states indexed by document ID or index
116    document_states: HashMap<String, DocumentState>,
117    /// Operation history for delta generation
118    operation_history: Vec<CrdtOperation>,
119    /// Buffered operations waiting for causal delivery
120    operation_buffer: SmallVec<[CrdtOperation; 16]>,
121}
122
123impl<P: FormatBatchProcessor> FormatCrdtProcessor<P> {
124    /// Create a new CRDT processor with a replica ID
125    #[must_use]
126    pub fn new(batch_processor: P, replica_id: impl Into<String>) -> Self {
127        Self {
128            dson_processor: FormatDsonProcessor::new(batch_processor),
129            replica_id: replica_id.into(),
130            vector_clock: VectorClock::new(),
131            lamport_timestamp: 0,
132            default_strategy: MergeStrategy::LastWriteWins,
133            document_states: HashMap::new(),
134            operation_history: Vec::new(),
135            operation_buffer: SmallVec::new(),
136        }
137    }
138
139    /// Create with a specific merge strategy
140    #[must_use]
141    pub fn with_strategy(mut self, strategy: MergeStrategy) -> Self {
142        self.default_strategy = strategy;
143        self
144    }
145
146    /// Get the format kind
147    #[must_use]
148    pub fn format_kind(&self) -> FormatKind {
149        self.dson_processor.format_kind()
150    }
151
152    /// Get the replica ID
153    #[must_use]
154    pub fn replica_id(&self) -> &str {
155        &self.replica_id
156    }
157
158    /// Get the current vector clock
159    #[must_use]
160    pub const fn vector_clock(&self) -> &VectorClock {
161        &self.vector_clock
162    }
163
164    /// Get the current Lamport timestamp
165    #[must_use]
166    pub const fn lamport_timestamp(&self) -> u64 {
167        self.lamport_timestamp
168    }
169
170    /// Process data with schema filtering and track as CRDT state
171    ///
172    /// # Errors
173    /// Returns an error if processing fails
174    pub fn process(&mut self, data: &[u8], schema: &CompiledSchema) -> Result<FormatBatchResult> {
175        let result = self
176            .dson_processor
177            .process_with_operations(data, schema, &[])?;
178
179        // Track documents in CRDT state
180        for (idx, doc) in result.documents.iter().enumerate() {
181            let doc_id = format!("doc_{idx}");
182            self.document_states
183                .insert(doc_id, DocumentState::new(doc.clone()));
184        }
185
186        // Increment local clock
187        self.vector_clock.increment(&self.replica_id);
188        self.lamport_timestamp += 1;
189
190        Ok(result)
191    }
192
193    /// Process with DSON operations and CRDT tracking
194    ///
195    /// # Errors
196    /// Returns an error if processing fails
197    pub fn process_with_operations(
198        &mut self,
199        data: &[u8],
200        schema: &CompiledSchema,
201        operations: &[DsonOperation],
202    ) -> Result<FormatBatchResult> {
203        let result = self
204            .dson_processor
205            .process_with_operations(data, schema, operations)?;
206
207        // Track operations for delta generation
208        for op in operations {
209            let crdt_op = self.prepare_operation(op);
210            self.operation_history.push(crdt_op);
211        }
212
213        // Track documents
214        for (idx, doc) in result.documents.iter().enumerate() {
215            let doc_id = format!("doc_{idx}");
216            self.document_states
217                .insert(doc_id, DocumentState::new(doc.clone()));
218        }
219
220        self.vector_clock.increment(&self.replica_id);
221        self.lamport_timestamp += 1;
222
223        Ok(result)
224    }
225
226    /// Process unfiltered data with CRDT tracking
227    ///
228    /// # Errors
229    /// Returns an error if processing fails
230    pub fn process_unfiltered(&mut self, data: &[u8]) -> Result<FormatBatchResult> {
231        let result = self
232            .dson_processor
233            .process_unfiltered_with_operations(data, &[])?;
234
235        for (idx, doc) in result.documents.iter().enumerate() {
236            let doc_id = format!("doc_{idx}");
237            self.document_states
238                .insert(doc_id, DocumentState::new(doc.clone()));
239        }
240
241        self.vector_clock.increment(&self.replica_id);
242        self.lamport_timestamp += 1;
243
244        Ok(result)
245    }
246
247    /// Apply a local operation and prepare it for replication
248    fn prepare_operation(&mut self, op: &DsonOperation) -> CrdtOperation {
249        self.lamport_timestamp += 1;
250        self.vector_clock.increment(&self.replica_id);
251
252        CrdtOperation {
253            operation: op.clone(),
254            timestamp: self.lamport_timestamp,
255            replica_id: self.replica_id.clone(),
256            vector_clock: self.vector_clock.clone(),
257        }
258    }
259
260    /// Get a document by ID
261    #[must_use]
262    pub fn get_document(&self, doc_id: &str) -> Option<&str> {
263        self.document_states.get(doc_id).map(|s| s.content.as_str())
264    }
265
266    /// Get all document IDs
267    #[must_use]
268    pub fn document_ids(&self) -> Vec<&str> {
269        self.document_states.keys().map(String::as_str).collect()
270    }
271
272    /// Get number of tracked documents
273    #[must_use]
274    pub fn document_count(&self) -> usize {
275        self.document_states.len()
276    }
277
278    /// Apply a field value to a document with LWW semantics
279    fn apply_field_value(
280        &mut self,
281        doc_id: &str,
282        path: &str,
283        value: &OperationValue,
284        timestamp: u64,
285    ) -> Option<MergeConflict> {
286        let state = self.document_states.get_mut(doc_id)?;
287
288        let current_ts = state.field_timestamps.get(path).copied().unwrap_or(0);
289
290        match timestamp.cmp(&current_ts) {
291            std::cmp::Ordering::Greater => {
292                // Remote wins - update the field
293                state.field_timestamps.insert(path.to_string(), timestamp);
294
295                // Apply the update to the document content
296                if let Ok(mut json_value) =
297                    serde_json::from_str::<serde_json::Value>(&state.content)
298                {
299                    Self::set_json_path(&mut json_value, path, value);
300                    if let Ok(new_content) = serde_json::to_string(&json_value) {
301                        state.content = new_content;
302                    }
303                }
304
305                None
306            }
307            std::cmp::Ordering::Equal => {
308                // Same timestamp - use replica_id as tiebreaker
309                // For now, just report conflict
310                Some(MergeConflict {
311                    path: path.to_string(),
312                    local_value: OperationValue::StringRef("current".to_string()),
313                    remote_value: value.clone(),
314                    local_timestamp: current_ts,
315                    remote_timestamp: timestamp,
316                    resolved_value: None,
317                })
318            }
319            std::cmp::Ordering::Less => {
320                // Local wins - no change
321                None
322            }
323        }
324    }
325
326    /// Set a value at a JSON path
327    fn set_json_path(json: &mut serde_json::Value, path: &str, value: &OperationValue) {
328        let parts: Vec<&str> = path.split('.').collect();
329        let mut current = json;
330
331        for (i, part) in parts.iter().enumerate() {
332            if i == parts.len() - 1 {
333                // Final part - set the value
334                if let serde_json::Value::Object(obj) = current {
335                    obj.insert((*part).to_string(), Self::operation_value_to_json(value));
336                }
337            } else {
338                // Navigate down
339                if let serde_json::Value::Object(obj) = current {
340                    current = obj
341                        .entry((*part).to_string())
342                        .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
343                }
344            }
345        }
346    }
347
348    /// Convert `OperationValue` to JSON
349    ///
350    /// Note: `ArrayRef` and `ObjectRef` are tape position ranges in the actual implementation,
351    /// so we represent them as placeholder values. In practice, these should be resolved
352    /// from the tape before calling this function.
353    fn operation_value_to_json(value: &OperationValue) -> serde_json::Value {
354        match value {
355            OperationValue::Null => serde_json::Value::Null,
356            OperationValue::BoolRef(b) => serde_json::Value::Bool(*b),
357            OperationValue::NumberRef(n) => n
358                .parse::<i64>()
359                .map(|i| serde_json::Value::Number(i.into()))
360                .or_else(|_| {
361                    n.parse::<f64>().map(|f| {
362                        serde_json::Value::Number(
363                            serde_json::Number::from_f64(f).unwrap_or_else(|| 0.into()),
364                        )
365                    })
366                })
367                .unwrap_or_else(|_| serde_json::Value::String(n.clone())),
368            OperationValue::StringRef(s) => serde_json::Value::String(s.clone()),
369            // ArrayRef and ObjectRef are tape position ranges - represent as empty
370            // In practice, CRDT operations work on scalar values
371            OperationValue::ArrayRef { .. } => serde_json::Value::Array(Vec::new()),
372            OperationValue::ObjectRef { .. } => serde_json::Value::Object(serde_json::Map::new()),
373        }
374    }
375
376    /// Reset the processor
377    pub fn reset(&mut self) {
378        self.dson_processor.reset();
379        self.document_states.clear();
380        self.operation_history.clear();
381        self.operation_buffer.clear();
382    }
383
384    /// Get reference to underlying DSON processor
385    #[must_use]
386    pub const fn dson_processor(&self) -> &FormatDsonProcessor<P> {
387        &self.dson_processor
388    }
389
390    /// Get mutable reference to underlying DSON processor
391    #[allow(clippy::missing_const_for_fn)] // Cannot be const: returns &mut
392    pub fn dson_processor_mut(&mut self) -> &mut FormatDsonProcessor<P> {
393        &mut self.dson_processor
394    }
395}
396
397// =============================================================================
398// CrdtMerge Implementation
399// =============================================================================
400
401impl<P: FormatBatchProcessor> CrdtMerge for FormatCrdtProcessor<P> {
402    fn merge_operation(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
403        // Update Lamport timestamp
404        self.lamport_timestamp = self.lamport_timestamp.max(op.timestamp) + 1;
405
406        // Merge vector clocks
407        self.vector_clock.merge(&op.vector_clock);
408        self.vector_clock.increment(&self.replica_id);
409
410        // Apply the operation based on type
411        match &op.operation {
412            DsonOperation::FieldAdd { path, value }
413            | DsonOperation::FieldModify { path, value } => {
414                // Apply to all documents (or specific document if path indicates)
415                let mut conflict = None;
416                let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
417                for doc_id in doc_ids {
418                    if let Some(c) = self.apply_field_value(&doc_id, path, value, op.timestamp) {
419                        conflict = Some(c);
420                    }
421                }
422                Ok(conflict)
423            }
424            DsonOperation::FieldDelete { path } => {
425                // Remove field from all documents
426                let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
427                for doc_id in doc_ids {
428                    if let Some(state) = self.document_states.get_mut(&doc_id)
429                        && let Ok(mut json_value) =
430                            serde_json::from_str::<serde_json::Value>(&state.content)
431                    {
432                        Self::delete_json_path(&mut json_value, path);
433                        if let Ok(new_content) = serde_json::to_string(&json_value) {
434                            state.content = new_content;
435                        }
436                    }
437                }
438                Ok(None)
439            }
440            _ => {
441                // Other operations - record in history
442                self.operation_history.push(op);
443                Ok(None)
444            }
445        }
446    }
447
448    fn merge_field(
449        &mut self,
450        path: &str,
451        value: OperationValue,
452        timestamp: u64,
453        strategy: &MergeStrategy,
454    ) -> Result<Option<MergeConflict>> {
455        let mut conflict = None;
456        let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
457
458        for doc_id in doc_ids {
459            if let Some(state) = self.document_states.get(&doc_id) {
460                let current_ts = state.field_timestamps.get(path).copied().unwrap_or(0);
461
462                if current_ts == timestamp {
463                    // Concurrent writes - need to resolve
464                    let local_value = self
465                        .get_field_value(&doc_id, path)
466                        .unwrap_or(OperationValue::Null);
467
468                    let resolved = strategy.resolve(&local_value, &value, current_ts, timestamp);
469
470                    conflict = Some(MergeConflict {
471                        path: path.to_string(),
472                        local_value,
473                        remote_value: value.clone(),
474                        local_timestamp: current_ts,
475                        remote_timestamp: timestamp,
476                        resolved_value: Some(resolved.clone()),
477                    });
478
479                    // Apply resolved value
480                    self.apply_field_value(&doc_id, path, &resolved, timestamp.max(current_ts) + 1);
481                } else {
482                    self.apply_field_value(&doc_id, path, &value, timestamp);
483                }
484            }
485        }
486
487        Ok(conflict)
488    }
489
490    fn vector_clock(&self) -> &VectorClock {
491        &self.vector_clock
492    }
493
494    fn replica_id(&self) -> &str {
495        &self.replica_id
496    }
497
498    fn resolve_conflict(
499        &mut self,
500        conflict: &MergeConflict,
501        strategy: &MergeStrategy,
502    ) -> Result<OperationValue> {
503        let resolved = strategy.resolve(
504            &conflict.local_value,
505            &conflict.remote_value,
506            conflict.local_timestamp,
507            conflict.remote_timestamp,
508        );
509
510        // Apply the resolution
511        let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
512        for doc_id in doc_ids {
513            self.apply_field_value(
514                &doc_id,
515                &conflict.path,
516                &resolved,
517                conflict.local_timestamp.max(conflict.remote_timestamp) + 1,
518            );
519        }
520
521        Ok(resolved)
522    }
523}
524
525impl<P: FormatBatchProcessor> FormatCrdtProcessor<P> {
526    /// Delete a value at a JSON path
527    fn delete_json_path(json: &mut serde_json::Value, path: &str) {
528        let parts: Vec<&str> = path.split('.').collect();
529        let mut current = json;
530
531        for (i, part) in parts.iter().enumerate() {
532            if i == parts.len() - 1 {
533                // Final part - remove the value
534                if let serde_json::Value::Object(obj) = current {
535                    obj.remove(*part);
536                }
537            } else {
538                // Navigate down
539                if let serde_json::Value::Object(obj) = current {
540                    match obj.get_mut(*part) {
541                        Some(v) => current = v,
542                        None => return,
543                    }
544                } else {
545                    return;
546                }
547            }
548        }
549    }
550
551    /// Get field value from a document
552    fn get_field_value(&self, doc_id: &str, path: &str) -> Option<OperationValue> {
553        let state = self.document_states.get(doc_id)?;
554        let json_value: serde_json::Value = serde_json::from_str(&state.content).ok()?;
555
556        let parts: Vec<&str> = path.split('.').collect();
557        let mut current = &json_value;
558
559        for part in &parts {
560            match current {
561                serde_json::Value::Object(obj) => {
562                    current = obj.get(*part)?;
563                }
564                _ => return None,
565            }
566        }
567
568        Some(Self::json_to_operation_value(current))
569    }
570
571    /// Convert JSON to `OperationValue`
572    ///
573    /// Note: Arrays and objects are represented as JSON strings in `StringRef`,
574    /// since ArrayRef/ObjectRef require tape positions which we don't have here.
575    fn json_to_operation_value(json: &serde_json::Value) -> OperationValue {
576        match json {
577            serde_json::Value::Null => OperationValue::Null,
578            serde_json::Value::Bool(b) => OperationValue::BoolRef(*b),
579            serde_json::Value::Number(n) => OperationValue::NumberRef(n.to_string()),
580            serde_json::Value::String(s) => OperationValue::StringRef(s.clone()),
581            // Arrays and objects are serialized to JSON strings
582            // since ArrayRef/ObjectRef use tape positions
583            serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
584                OperationValue::StringRef(json.to_string())
585            }
586        }
587    }
588}
589
590// =============================================================================
591// DeltaCrdt Implementation
592// =============================================================================
593
594impl<P: FormatBatchProcessor> DeltaCrdt for FormatCrdtProcessor<P> {
595    type Delta = FormatDelta;
596
597    fn generate_delta(&self, since: &VectorClock) -> Self::Delta {
598        // Collect operations that happened after the given vector clock
599        let ops: Vec<CrdtOperation> = self
600            .operation_history
601            .iter()
602            .filter(|op| !op.vector_clock.happened_before(since))
603            .cloned()
604            .collect();
605
606        FormatDelta::with_operations(ops, self.vector_clock.clone(), self.format_kind())
607    }
608
609    fn apply_delta(&mut self, delta: Self::Delta) -> Result<Vec<MergeConflict>> {
610        let mut conflicts = Vec::new();
611
612        for op in delta.operations {
613            if !self.is_causally_ready(&op) {
614                // Buffer for later
615                self.buffer_operation(op);
616            } else if let Some(conflict) = self.merge_operation(op)? {
617                conflicts.push(conflict);
618            }
619        }
620
621        // Process any buffered operations that are now ready
622        conflicts.extend(self.process_buffered()?);
623
624        // Merge the delta clock
625        self.vector_clock.merge(&delta.clock);
626
627        Ok(conflicts)
628    }
629
630    fn compact(&mut self) {
631        // Keep only recent operations (e.g., last 1000)
632        const MAX_HISTORY: usize = 1000;
633        if self.operation_history.len() > MAX_HISTORY {
634            let drain_count = self.operation_history.len() - MAX_HISTORY;
635            self.operation_history.drain(0..drain_count);
636        }
637    }
638}
639
640// =============================================================================
641// OpBasedCrdt Implementation
642// =============================================================================
643
644impl<P: FormatBatchProcessor> OpBasedCrdt for FormatCrdtProcessor<P> {
645    fn prepare(&self, op: &DsonOperation) -> Result<CrdtOperation> {
646        let mut clock = self.vector_clock.clone();
647        clock.increment(&self.replica_id);
648
649        Ok(CrdtOperation {
650            operation: op.clone(),
651            timestamp: self.lamport_timestamp + 1,
652            replica_id: self.replica_id.clone(),
653            vector_clock: clock,
654        })
655    }
656
657    fn effect(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
658        self.merge_operation(op)
659    }
660
661    fn is_causally_ready(&self, op: &CrdtOperation) -> bool {
662        // Check if all causal dependencies are satisfied
663        // For each replica in op's clock, our clock should be >= op's clock - 1 for that replica
664        // (we need to have seen all operations before this one)
665        for (replica, &time) in &op.vector_clock.clocks() {
666            if replica == &op.replica_id {
667                // For the originating replica, we expect time - 1
668                if time > 1 && self.vector_clock.get(replica) < time - 1 {
669                    return false;
670                }
671            } else {
672                // For other replicas, we need to have seen at least that many
673                if self.vector_clock.get(replica) < time {
674                    return false;
675                }
676            }
677        }
678        true
679    }
680
681    fn buffer_operation(&mut self, op: CrdtOperation) {
682        self.operation_buffer.push(op);
683    }
684
685    fn process_buffered(&mut self) -> Result<Vec<MergeConflict>> {
686        let mut conflicts = Vec::new();
687        let mut processed = Vec::new();
688
689        // Keep trying to process buffered operations until no progress
690        loop {
691            let mut made_progress = false;
692
693            for (i, op) in self.operation_buffer.iter().enumerate() {
694                if self.is_causally_ready(op) {
695                    processed.push(i);
696                    made_progress = true;
697                }
698            }
699
700            if !made_progress {
701                break;
702            }
703
704            // Process in reverse order to maintain indices
705            for &idx in processed.iter().rev() {
706                let op = self.operation_buffer.remove(idx);
707                if let Some(conflict) = self.merge_operation(op)? {
708                    conflicts.push(conflict);
709                }
710            }
711
712            processed.clear();
713        }
714
715        Ok(conflicts)
716    }
717}
718
719// =============================================================================
720// Tests
721// =============================================================================
722
723#[cfg(test)]
724mod tests {
725    use super::*;
726    use crate::format_dson::BatchStatistics;
727
728    // Mock batch processor for testing
729    struct MockBatchProcessor;
730
731    impl FormatBatchProcessor for MockBatchProcessor {
732        fn format_kind(&self) -> FormatKind {
733            FormatKind::Json
734        }
735
736        fn process_batch(
737            &mut self,
738            _data: &[u8],
739            _schema: &CompiledSchema,
740        ) -> Result<FormatBatchResult> {
741            Ok(FormatBatchResult {
742                documents: vec![r#"{"name":"test","value":42}"#.to_string()],
743                errors: vec![],
744                statistics: BatchStatistics::default(),
745            })
746        }
747
748        fn process_batch_unfiltered(&mut self, _data: &[u8]) -> Result<FormatBatchResult> {
749            self.process_batch(&[], &CompiledSchema::compile(&[]).unwrap())
750        }
751
752        fn reset(&mut self) {}
753    }
754
755    #[test]
756    fn test_crdt_processor_creation() {
757        let processor = FormatCrdtProcessor::new(MockBatchProcessor, "replica_1");
758        assert_eq!(processor.replica_id(), "replica_1");
759        assert_eq!(processor.format_kind(), FormatKind::Json);
760    }
761
762    #[test]
763    fn test_crdt_processor_with_strategy() {
764        let processor =
765            FormatCrdtProcessor::new(MockBatchProcessor, "r1").with_strategy(MergeStrategy::Max);
766        assert_eq!(processor.replica_id(), "r1");
767    }
768
769    #[test]
770    fn test_process_increments_clock() {
771        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
772        let schema = CompiledSchema::compile(&[]).unwrap();
773
774        let initial_ts = processor.lamport_timestamp();
775        processor.process(b"{}", &schema).unwrap();
776
777        assert!(processor.lamport_timestamp() > initial_ts);
778        assert_eq!(processor.vector_clock().get("r1"), 1);
779    }
780
781    #[test]
782    fn test_process_tracks_documents() {
783        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
784        let schema = CompiledSchema::compile(&[]).unwrap();
785
786        processor.process(b"{}", &schema).unwrap();
787
788        assert_eq!(processor.document_count(), 1);
789        assert!(processor.get_document("doc_0").is_some());
790    }
791
792    #[test]
793    fn test_format_delta_creation() {
794        let delta = FormatDelta::new(FormatKind::Json);
795        assert!(delta.is_empty());
796        assert_eq!(delta.len(), 0);
797    }
798
799    #[test]
800    fn test_format_delta_with_operations() {
801        let op = CrdtOperation {
802            operation: DsonOperation::FieldAdd {
803                path: "test".to_string(),
804                value: OperationValue::StringRef("value".to_string()),
805            },
806            timestamp: 1,
807            replica_id: "r1".to_string(),
808            vector_clock: VectorClock::new(),
809        };
810
811        let delta = FormatDelta::with_operations(vec![op], VectorClock::new(), FormatKind::Json);
812        assert!(!delta.is_empty());
813        assert_eq!(delta.len(), 1);
814    }
815
816    #[test]
817    fn test_merge_operation() {
818        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
819        let schema = CompiledSchema::compile(&[]).unwrap();
820        processor.process(b"{}", &schema).unwrap();
821
822        let op = CrdtOperation {
823            operation: DsonOperation::FieldAdd {
824                path: "new_field".to_string(),
825                value: OperationValue::StringRef("new_value".to_string()),
826            },
827            timestamp: 10,
828            replica_id: "r2".to_string(),
829            vector_clock: VectorClock::new(),
830        };
831
832        let conflict = processor.merge_operation(op).unwrap();
833        // No conflict expected for new field
834        assert!(conflict.is_none());
835    }
836
837    #[test]
838    fn test_generate_and_apply_delta() {
839        let mut processor1 = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
840        let schema = CompiledSchema::compile(&[]).unwrap();
841        processor1.process(b"{}", &schema).unwrap();
842
843        // Generate delta from empty clock
844        let delta = processor1.generate_delta(&VectorClock::new());
845
846        let mut processor2 = FormatCrdtProcessor::new(MockBatchProcessor, "r2");
847        processor2.process(b"{}", &schema).unwrap();
848
849        // Apply delta
850        let conflicts = processor2.apply_delta(delta).unwrap();
851        // Should be no conflicts for initial sync
852        assert!(conflicts.is_empty());
853    }
854
855    #[test]
856    fn test_prepare_operation() {
857        let processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
858        let op = DsonOperation::FieldAdd {
859            path: "test".to_string(),
860            value: OperationValue::Null,
861        };
862
863        let crdt_op = processor.prepare(&op).unwrap();
864        assert_eq!(crdt_op.replica_id, "r1");
865        assert!(crdt_op.timestamp > 0);
866    }
867
868    #[test]
869    fn test_is_causally_ready() {
870        let processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
871
872        // Operation with empty clock is always ready
873        let op = CrdtOperation {
874            operation: DsonOperation::FieldAdd {
875                path: "test".to_string(),
876                value: OperationValue::Null,
877            },
878            timestamp: 1,
879            replica_id: "r2".to_string(),
880            vector_clock: VectorClock::new(),
881        };
882
883        assert!(processor.is_causally_ready(&op));
884    }
885
886    #[test]
887    fn test_buffer_and_process() {
888        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
889
890        let op = CrdtOperation {
891            operation: DsonOperation::FieldAdd {
892                path: "test".to_string(),
893                value: OperationValue::Null,
894            },
895            timestamp: 1,
896            replica_id: "r2".to_string(),
897            vector_clock: VectorClock::new(),
898        };
899
900        processor.buffer_operation(op);
901        assert_eq!(processor.operation_buffer.len(), 1);
902
903        let conflicts = processor.process_buffered().unwrap();
904        // Empty vector clock means causally ready
905        assert!(conflicts.is_empty());
906    }
907
908    #[test]
909    fn test_compact() {
910        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
911
912        // Add many operations
913        for i in 0..2000_u64 {
914            processor.operation_history.push(CrdtOperation {
915                operation: DsonOperation::FieldAdd {
916                    path: format!("field_{i}"),
917                    value: OperationValue::Null,
918                },
919                timestamp: i,
920                replica_id: "r1".to_string(),
921                vector_clock: VectorClock::new(),
922            });
923        }
924
925        processor.compact();
926        assert!(processor.operation_history.len() <= 1000);
927    }
928
929    #[test]
930    fn test_reset() {
931        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
932        let schema = CompiledSchema::compile(&[]).unwrap();
933        processor.process(b"{}", &schema).unwrap();
934
935        assert!(processor.document_count() > 0);
936
937        processor.reset();
938        assert_eq!(processor.document_count(), 0);
939    }
940
941    #[test]
942    fn test_document_ids() {
943        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
944        let schema = CompiledSchema::compile(&[]).unwrap();
945        processor.process(b"{}", &schema).unwrap();
946
947        let ids = processor.document_ids();
948        assert!(!ids.is_empty());
949    }
950
951    #[test]
952    fn test_merge_field_with_conflict() {
953        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
954        let schema = CompiledSchema::compile(&[]).unwrap();
955        processor.process(b"{}", &schema).unwrap();
956
957        // First, set a field
958        processor.apply_field_value(
959            "doc_0",
960            "test_field",
961            &OperationValue::StringRef("value1".to_string()),
962            1,
963        );
964
965        // Now merge with same timestamp (conflict)
966        let conflict = processor
967            .merge_field(
968                "test_field",
969                OperationValue::StringRef("value2".to_string()),
970                1, // Same timestamp
971                &MergeStrategy::LastWriteWins,
972            )
973            .unwrap();
974
975        // Should report conflict for same timestamp
976        assert!(conflict.is_some());
977    }
978
979    #[test]
980    fn test_resolve_conflict() {
981        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
982        let schema = CompiledSchema::compile(&[]).unwrap();
983        processor.process(b"{}", &schema).unwrap();
984
985        let conflict = MergeConflict {
986            path: "test".to_string(),
987            local_value: OperationValue::NumberRef("10".to_string()),
988            remote_value: OperationValue::NumberRef("20".to_string()),
989            local_timestamp: 1,
990            remote_timestamp: 2,
991            resolved_value: None,
992        };
993
994        let resolved = processor
995            .resolve_conflict(&conflict, &MergeStrategy::Max)
996            .unwrap();
997
998        // Max should pick 20
999        match resolved {
1000            OperationValue::NumberRef(n) => assert_eq!(n, "20"),
1001            _ => panic!("Expected NumberRef"),
1002        }
1003    }
1004
1005    #[test]
1006    fn test_operation_value_to_json_all_types() {
1007        // Test all OperationValue variants
1008        let null_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1009            &OperationValue::Null,
1010        );
1011        assert!(null_json.is_null());
1012
1013        let bool_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1014            &OperationValue::BoolRef(true),
1015        );
1016        assert_eq!(bool_json, serde_json::Value::Bool(true));
1017
1018        let int_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1019            &OperationValue::NumberRef("42".to_string()),
1020        );
1021        assert_eq!(int_json, serde_json::json!(42));
1022
1023        let float_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1024            &OperationValue::NumberRef("3.14".to_string()),
1025        );
1026        assert!(float_json.is_number());
1027
1028        let string_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1029            &OperationValue::StringRef("hello".to_string()),
1030        );
1031        assert_eq!(string_json, serde_json::Value::String("hello".to_string()));
1032
1033        // ArrayRef and ObjectRef use tape positions, so they return empty containers
1034        let array_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1035            &OperationValue::ArrayRef { start: 0, end: 0 },
1036        );
1037        assert!(array_json.is_array());
1038
1039        let obj_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1040            &OperationValue::ObjectRef { start: 0, end: 0 },
1041        );
1042        assert!(obj_json.is_object());
1043    }
1044
1045    #[test]
1046    fn test_json_to_operation_value_all_types() {
1047        let null_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1048            &serde_json::Value::Null,
1049        );
1050        assert!(matches!(null_val, OperationValue::Null));
1051
1052        let bool_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1053            &serde_json::Value::Bool(false),
1054        );
1055        assert!(matches!(bool_val, OperationValue::BoolRef(false)));
1056
1057        let num_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1058            &serde_json::json!(123),
1059        );
1060        assert!(matches!(num_val, OperationValue::NumberRef(_)));
1061
1062        let str_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1063            &serde_json::Value::String("test".to_string()),
1064        );
1065        assert!(matches!(str_val, OperationValue::StringRef(_)));
1066
1067        // Arrays and objects are serialized to JSON strings in StringRef
1068        let arr_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1069            &serde_json::json!([1, 2]),
1070        );
1071        assert!(matches!(arr_val, OperationValue::StringRef(_)));
1072
1073        let obj_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1074            &serde_json::json!({"a": 1}),
1075        );
1076        assert!(matches!(obj_val, OperationValue::StringRef(_)));
1077    }
1078
1079    #[test]
1080    fn test_dson_processor_access() {
1081        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
1082        let _ = processor.dson_processor();
1083        let _ = processor.dson_processor_mut();
1084    }
1085
1086    #[test]
1087    fn test_field_delete_operation() {
1088        let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
1089        let schema = CompiledSchema::compile(&[]).unwrap();
1090        processor.process(b"{}", &schema).unwrap();
1091
1092        let op = CrdtOperation {
1093            operation: DsonOperation::FieldDelete {
1094                path: "name".to_string(),
1095            },
1096            timestamp: 10,
1097            replica_id: "r2".to_string(),
1098            vector_clock: VectorClock::new(),
1099        };
1100
1101        let conflict = processor.merge_operation(op).unwrap();
1102        assert!(conflict.is_none());
1103    }
1104}