Skip to main content

fionn_ops/processor/
simd_dson.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! DSON Trait Implementations for SIMD-DSON
3//!
4//! This module provides implementations of the DSON trait abstractions
5//! for the SIMD-DSON types, enabling comparison across implementations.
6
7use super::BlackBoxProcessor;
8use crate::dson_traits::{
9    ArrayOperations, CrdtMerge, CrdtOperation, DeltaCrdt, DocumentProcessor, DsonImplementation,
10    FieldOperations, ImplementationCharacteristics, MergeConflict, OpBasedCrdt, SchemaAware,
11    VectorClock,
12};
13use crate::{
14    DsonOperation, FilterPredicate, MergeStrategy, OperationValue, ReduceFunction,
15    TransformFunction,
16};
17use fionn_core::{DsonError, Result};
18
19use ahash::AHashMap;
20use rayon::prelude::*;
21
22// =============================================================================
23// SimdDsonProcessor - Main implementation wrapping BlackBoxProcessor
24// =============================================================================
25
26/// Delta for delta-state CRDT synchronization
27#[derive(Debug, Clone)]
28pub struct SimdDelta {
29    /// Operations since the given vector clock
30    pub operations: Vec<(String, OperationValue, u64)>,
31    /// The vector clock at time of delta generation
32    pub since_clock: VectorClock,
33    /// The current vector clock
34    pub current_clock: VectorClock,
35}
36
37/// SIMD-accelerated DSON processor implementing all DSON traits
38pub struct SimdDsonProcessor {
39    /// Underlying black box processor
40    processor: BlackBoxProcessor,
41    /// Replica ID for CRDT operations
42    replica_id: String,
43    /// Vector clock for causality tracking
44    vector_clock: VectorClock,
45    /// Local Lamport timestamp
46    local_timestamp: u64,
47    /// Buffered operations awaiting causal delivery
48    operation_buffer: Vec<CrdtOperation>,
49    /// Field value cache for reads (populated from tape, not `serde_json`)
50    field_cache: AHashMap<String, OperationValue>,
51    /// Operation log for delta generation
52    operation_log: Vec<(String, OperationValue, u64, VectorClock)>,
53    /// Parallel processing enabled
54    parallel_enabled: bool,
55}
56
57impl SimdDsonProcessor {
58    /// Create a new SIMD DSON processor
59    #[inline]
60    #[must_use]
61    pub fn new(replica_id: &str) -> Self {
62        Self {
63            processor: BlackBoxProcessor::new_unfiltered(),
64            replica_id: replica_id.to_string(),
65            vector_clock: VectorClock::new(),
66            local_timestamp: 0,
67            operation_buffer: Vec::new(),
68            field_cache: AHashMap::with_capacity(32), // Pre-allocate for typical docs
69            operation_log: Vec::new(),
70            parallel_enabled: false,
71        }
72    }
73
74    /// Create with schema filtering
75    #[inline]
76    #[must_use]
77    pub fn with_schema(
78        replica_id: &str,
79        input_schema: Vec<String>,
80        output_schema: Vec<String>,
81    ) -> Self {
82        Self {
83            processor: BlackBoxProcessor::new(input_schema, output_schema),
84            replica_id: replica_id.to_string(),
85            vector_clock: VectorClock::new(),
86            local_timestamp: 0,
87            operation_buffer: Vec::new(),
88            field_cache: AHashMap::with_capacity(32),
89            operation_log: Vec::new(),
90            parallel_enabled: false,
91        }
92    }
93
94    /// Enable parallel processing
95    #[must_use]
96    pub const fn with_parallel(mut self, enabled: bool) -> Self {
97        self.parallel_enabled = enabled;
98        self
99    }
100
101    /// Check if parallel processing is enabled
102    #[must_use]
103    pub const fn is_parallel(&self) -> bool {
104        self.parallel_enabled
105    }
106
107    /// Read a value directly from the processor's tape for a given path
108    /// This avoids the need to maintain a separate cache that requires `serde_json`
109    #[inline]
110    fn read_from_tape(&self, path: &str) -> Option<OperationValue> {
111        // Use the processor's tape-based read functionality
112        // This leverages the SIMD-parsed tape directly
113        self.processor.read_field_value(path).ok().flatten()
114    }
115}
116
117// =============================================================================
118// DocumentProcessor Implementation
119// =============================================================================
120
121impl DocumentProcessor for SimdDsonProcessor {
122    #[inline]
123    fn process(&mut self, input: &str) -> Result<String> {
124        // Clear cache - will be lazily populated from tape
125        self.field_cache.clear();
126        // Process with SIMD-JSON tape (no double-parsing!)
127        self.processor.process(input)
128    }
129
130    #[inline]
131    fn apply_operation(&mut self, op: &DsonOperation) -> Result<()> {
132        self.processor.apply_operation(op)?;
133
134        // Increment timestamp and vector clock for CRDT tracking
135        self.local_timestamp += 1;
136        self.vector_clock.increment(&self.replica_id);
137
138        // Update cache and operation log for modified fields
139        match op {
140            DsonOperation::FieldAdd { path, value }
141            | DsonOperation::FieldModify { path, value } => {
142                self.field_cache.insert(path.clone(), value.clone());
143                // Log operation for delta generation
144                self.operation_log.push((
145                    path.clone(),
146                    value.clone(),
147                    self.local_timestamp,
148                    self.vector_clock.clone(),
149                ));
150            }
151            DsonOperation::FieldDelete { path } => {
152                self.field_cache.remove(path);
153                // Log delete as a Null value for delta generation
154                self.operation_log.push((
155                    path.clone(),
156                    OperationValue::Null,
157                    self.local_timestamp,
158                    self.vector_clock.clone(),
159                ));
160            }
161            _ => {}
162        }
163        Ok(())
164    }
165
166    #[inline]
167    fn output(&self) -> Result<String> {
168        self.processor.generate_output()
169    }
170}
171
172// =============================================================================
173// SchemaAware Implementation
174// =============================================================================
175
176impl SchemaAware for SimdDsonProcessor {
177    fn matches_input_schema(&self, path: &str) -> bool {
178        let schema = self.processor.input_schema();
179        if schema.is_empty() {
180            return true;
181        }
182        schema.contains(&path.to_string())
183            || schema
184                .iter()
185                .any(|s| path.starts_with(s) || s.starts_with(path))
186    }
187
188    fn matches_output_schema(&self, path: &str) -> bool {
189        let schema = self.processor.output_schema();
190        if schema.is_empty() {
191            return true;
192        }
193        schema.contains(&path.to_string())
194            || schema
195                .iter()
196                .any(|s| path.starts_with(s) || s.starts_with(path))
197    }
198
199    fn input_schema(&self) -> Vec<String> {
200        self.processor.input_schema()
201    }
202
203    fn output_schema(&self) -> Vec<String> {
204        self.processor.output_schema()
205    }
206}
207
208// =============================================================================
209// FieldOperations Implementation
210// =============================================================================
211
212impl FieldOperations for SimdDsonProcessor {
213    #[inline]
214    fn field_add(&mut self, path: &str, value: OperationValue) -> Result<()> {
215        self.apply_operation(&DsonOperation::FieldAdd {
216            path: path.to_string(),
217            value: value.clone(),
218        })?;
219        self.field_cache.insert(path.to_string(), value);
220        Ok(())
221    }
222
223    #[inline]
224    fn field_modify(&mut self, path: &str, value: OperationValue) -> Result<()> {
225        self.apply_operation(&DsonOperation::FieldModify {
226            path: path.to_string(),
227            value: value.clone(),
228        })?;
229        self.field_cache.insert(path.to_string(), value);
230        Ok(())
231    }
232
233    #[inline]
234    fn field_delete(&mut self, path: &str) -> Result<()> {
235        self.apply_operation(&DsonOperation::FieldDelete {
236            path: path.to_string(),
237        })?;
238        self.field_cache.remove(path);
239        Ok(())
240    }
241
242    #[inline]
243    fn field_read(&self, path: &str) -> Result<Option<OperationValue>> {
244        // First check cache (for modified values), then fall back to tape
245        if let Some(value) = self.field_cache.get(path) {
246            return Ok(Some(value.clone()));
247        }
248        // Read directly from the SIMD-parsed tape
249        Ok(self.read_from_tape(path))
250    }
251
252    #[inline]
253    fn field_exists(&self, path: &str) -> bool {
254        // Check cache first, then tape
255        self.field_cache.contains_key(path) || self.read_from_tape(path).is_some()
256    }
257}
258
259// =============================================================================
260// ArrayOperations Implementation
261// =============================================================================
262
263impl ArrayOperations for SimdDsonProcessor {
264    fn array_insert(&mut self, path: &str, index: usize, value: OperationValue) -> Result<()> {
265        self.apply_operation(&DsonOperation::ArrayInsert {
266            path: path.to_string(),
267            index,
268            value,
269        })
270    }
271
272    fn array_remove(&mut self, path: &str, index: usize) -> Result<()> {
273        self.apply_operation(&DsonOperation::ArrayRemove {
274            path: path.to_string(),
275            index,
276        })
277    }
278
279    fn array_replace(&mut self, path: &str, index: usize, value: OperationValue) -> Result<()> {
280        self.apply_operation(&DsonOperation::ArrayReplace {
281            path: path.to_string(),
282            index,
283            value,
284        })
285    }
286
287    fn array_len(&self, path: &str) -> Result<usize> {
288        // Count array elements in cache
289        let prefix = format!("{path}[");
290        let count = self
291            .field_cache
292            .keys()
293            .filter(|k| k.starts_with(&prefix))
294            .filter(|k| {
295                // Only count direct children
296                let suffix = &k[prefix.len()..];
297                suffix.chars().take_while(char::is_ascii_digit).count() > 0 && !suffix.contains('.')
298            })
299            .count();
300        Ok(count)
301    }
302
303    fn array_build(&mut self, path: &str, elements: Vec<OperationValue>) -> Result<()> {
304        self.apply_operation(&DsonOperation::ArrayBuild {
305            path: path.to_string(),
306            elements,
307        })
308    }
309
310    fn array_filter(&mut self, path: &str, predicate: &FilterPredicate) -> Result<()> {
311        self.apply_operation(&DsonOperation::ArrayFilter {
312            path: path.to_string(),
313            predicate: predicate.clone(),
314        })
315    }
316
317    fn array_map(&mut self, path: &str, transform: &TransformFunction) -> Result<()> {
318        self.apply_operation(&DsonOperation::ArrayMap {
319            path: path.to_string(),
320            transform: transform.clone(),
321        })
322    }
323
324    fn array_reduce(
325        &mut self,
326        path: &str,
327        initial: OperationValue,
328        reducer: &ReduceFunction,
329    ) -> Result<OperationValue> {
330        self.apply_operation(&DsonOperation::ArrayReduce {
331            path: path.to_string(),
332            initial: initial.clone(),
333            reducer: reducer.clone(),
334        })?;
335        // Return the reduced value (would need actual implementation)
336        Ok(initial)
337    }
338}
339
340// =============================================================================
341// CrdtMerge Implementation
342// =============================================================================
343
344impl CrdtMerge for SimdDsonProcessor {
345    fn merge_operation(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
346        // Update vector clock
347        self.vector_clock.merge(&op.vector_clock);
348        self.local_timestamp = self.local_timestamp.max(op.timestamp);
349
350        // Apply the operation
351        if let DsonOperation::MergeField {
352            path,
353            value,
354            timestamp,
355        } = &op.operation
356        {
357            self.merge_field(
358                path,
359                value.clone(),
360                *timestamp,
361                &MergeStrategy::LastWriteWins,
362            )
363        } else {
364            self.apply_operation(&op.operation)?;
365            Ok(None)
366        }
367    }
368
369    fn merge_field(
370        &mut self,
371        path: &str,
372        value: OperationValue,
373        timestamp: u64,
374        strategy: &MergeStrategy,
375    ) -> Result<Option<MergeConflict>> {
376        let local_value = self.field_read(path)?;
377
378        if let Some(local) = local_value {
379            // Conflict - resolve using strategy
380            let local_ts = self.local_timestamp;
381            let resolved = strategy.resolve(&local, &value, local_ts, timestamp);
382
383            let conflict = MergeConflict {
384                path: path.to_string(),
385                local_value: local,
386                remote_value: value,
387                local_timestamp: local_ts,
388                remote_timestamp: timestamp,
389                resolved_value: Some(resolved.clone()),
390            };
391
392            // Apply resolved value
393            self.field_modify(path, resolved)?;
394
395            Ok(Some(conflict))
396        } else {
397            // No conflict - just add
398            self.field_add(path, value)?;
399            Ok(None)
400        }
401    }
402
403    fn vector_clock(&self) -> &VectorClock {
404        &self.vector_clock
405    }
406
407    fn replica_id(&self) -> &str {
408        &self.replica_id
409    }
410
411    fn resolve_conflict(
412        &mut self,
413        conflict: &MergeConflict,
414        strategy: &MergeStrategy,
415    ) -> Result<OperationValue> {
416        let resolved = strategy.resolve(
417            &conflict.local_value,
418            &conflict.remote_value,
419            conflict.local_timestamp,
420            conflict.remote_timestamp,
421        );
422        self.field_modify(&conflict.path, resolved.clone())?;
423        Ok(resolved)
424    }
425}
426
427// =============================================================================
428// OpBasedCrdt Implementation
429// =============================================================================
430
431impl OpBasedCrdt for SimdDsonProcessor {
432    fn prepare(&self, op: &DsonOperation) -> Result<CrdtOperation> {
433        Ok(CrdtOperation {
434            operation: op.clone(),
435            timestamp: self.local_timestamp + 1,
436            replica_id: self.replica_id.clone(),
437            vector_clock: {
438                let mut vc = self.vector_clock.clone();
439                vc.increment(&self.replica_id);
440                vc
441            },
442        })
443    }
444
445    fn effect(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
446        self.merge_operation(op)
447    }
448
449    fn is_causally_ready(&self, op: &CrdtOperation) -> bool {
450        // Check if all dependencies are satisfied using the clocks() method
451        let op_clocks = op.vector_clock.clocks();
452        for (replica, &time) in &op_clocks {
453            if replica == &op.replica_id {
454                // The originating replica's clock should be exactly one more than ours
455                if time != self.vector_clock.get(replica) + 1 {
456                    return false;
457                }
458            } else {
459                // Other replicas should be <= our clock
460                if time > self.vector_clock.get(replica) {
461                    return false;
462                }
463            }
464        }
465        true
466    }
467
468    fn buffer_operation(&mut self, op: CrdtOperation) {
469        self.operation_buffer.push(op);
470    }
471
472    fn process_buffered(&mut self) -> Result<Vec<MergeConflict>> {
473        let mut conflicts = Vec::new();
474
475        // Collect ready operations first (to avoid borrow issues)
476        let ready_ops: Vec<CrdtOperation> = self
477            .operation_buffer
478            .iter()
479            .filter(|op| {
480                // Inline is_causally_ready check to avoid borrowing self
481                let mut ready = true;
482                let op_clocks = op.vector_clock.clocks();
483                for (replica, &time) in &op_clocks {
484                    if replica == &op.replica_id {
485                        if time != self.vector_clock.get(replica) + 1 {
486                            ready = false;
487                            break;
488                        }
489                    } else if time > self.vector_clock.get(replica) {
490                        ready = false;
491                        break;
492                    }
493                }
494                ready
495            })
496            .cloned()
497            .collect();
498
499        // Remove ready operations from buffer
500        self.operation_buffer.retain(|op| {
501            !ready_ops
502                .iter()
503                .any(|ready| ready.timestamp == op.timestamp && ready.replica_id == op.replica_id)
504        });
505
506        // Process ready operations
507        for op in ready_ops {
508            if let Some(conflict) = self.effect(op)? {
509                conflicts.push(conflict);
510            }
511        }
512
513        Ok(conflicts)
514    }
515}
516
517// =============================================================================
518// DeltaCrdt Implementation
519// =============================================================================
520
521impl DeltaCrdt for SimdDsonProcessor {
522    type Delta = SimdDelta;
523
524    fn generate_delta(&self, since: &VectorClock) -> Self::Delta {
525        // Filter operations that happened after the given vector clock
526        let operations: Vec<(String, OperationValue, u64)> = self
527            .operation_log
528            .iter()
529            .filter(|(_, _, _, vc)| {
530                // Include if any component is newer than since
531                let vc_clocks = vc.clocks();
532                for (replica, &time) in &vc_clocks {
533                    if time > since.get(replica) {
534                        return true;
535                    }
536                }
537                false
538            })
539            .map(|(path, value, ts, _)| (path.clone(), value.clone(), *ts))
540            .collect();
541
542        SimdDelta {
543            operations,
544            since_clock: since.clone(),
545            current_clock: self.vector_clock.clone(),
546        }
547    }
548
549    fn apply_delta(&mut self, delta: Self::Delta) -> Result<Vec<MergeConflict>> {
550        let mut conflicts = Vec::new();
551
552        // Merge vector clock
553        self.vector_clock.merge(&delta.current_clock);
554
555        if self.parallel_enabled && delta.operations.len() > 100 {
556            // Parallel application for large deltas
557            let results: Vec<_> = delta
558                .operations
559                .into_par_iter()
560                .map(|(path, value, ts)| (path, value, ts))
561                .collect();
562
563            for (path, value, ts) in results {
564                if let Some(conflict) =
565                    self.merge_field(&path, value, ts, &MergeStrategy::LastWriteWins)?
566                {
567                    conflicts.push(conflict);
568                }
569            }
570        } else {
571            // Sequential application
572            for (path, value, ts) in delta.operations {
573                if let Some(conflict) =
574                    self.merge_field(&path, value, ts, &MergeStrategy::LastWriteWins)?
575                {
576                    conflicts.push(conflict);
577                }
578            }
579        }
580
581        Ok(conflicts)
582    }
583
584    fn compact(&mut self) {
585        // Remove operations that are dominated by the current vector clock
586        // Keep only the most recent operation per path
587        let mut latest: AHashMap<String, (OperationValue, u64, VectorClock)> = AHashMap::new();
588
589        for (path, value, ts, vc) in self.operation_log.drain(..) {
590            let should_insert = match latest.get(&path) {
591                Some((_, existing_ts, _)) => ts > *existing_ts,
592                None => true,
593            };
594            if should_insert {
595                latest.insert(path, (value, ts, vc));
596            }
597        }
598
599        self.operation_log = latest
600            .into_iter()
601            .map(|(path, (value, ts, vc))| (path, value, ts, vc))
602            .collect();
603    }
604}
605
606// =============================================================================
607// Parallel Processing Support
608// =============================================================================
609
610impl SimdDsonProcessor {
611    /// Process multiple documents in parallel
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if parallel processing is not enabled or if any document fails to process.
616    pub fn process_batch_parallel(&self, documents: Vec<String>) -> Result<Vec<String>> {
617        if !self.parallel_enabled {
618            return Err(DsonError::InvalidField(
619                "Parallel processing not enabled".to_string(),
620            ));
621        }
622
623        let input_schema = self.processor.input_schema();
624        let output_schema = self.processor.output_schema();
625
626        let results: Result<Vec<String>> = documents
627            .into_par_iter()
628            .map(|doc| {
629                let mut proc = BlackBoxProcessor::new(input_schema.clone(), output_schema.clone());
630                proc.process(&doc)
631            })
632            .collect();
633
634        results
635    }
636
637    /// Apply operations in parallel where order-independent
638    ///
639    /// # Errors
640    ///
641    /// Returns an error if any operation fails to apply.
642    pub fn apply_operations_parallel(
643        &mut self,
644        ops: Vec<DsonOperation>,
645    ) -> Result<Vec<Option<MergeConflict>>> {
646        if !self.parallel_enabled || ops.len() < 10 {
647            // Fall back to sequential for small batches
648            let mut results = Vec::new();
649            for op in ops {
650                self.apply_operation(&op)?;
651                results.push(None);
652            }
653            return Ok(results);
654        }
655
656        // Group operations by path for conflict detection
657        let mut by_path: AHashMap<String, Vec<DsonOperation>> = AHashMap::new();
658        for op in &ops {
659            let path = match op {
660                DsonOperation::FieldAdd { path, .. }
661                | DsonOperation::FieldModify { path, .. }
662                | DsonOperation::FieldDelete { path }
663                | DsonOperation::MergeField { path, .. } => path.clone(),
664                _ => String::new(),
665            };
666            by_path.entry(path).or_default().push(op.clone());
667        }
668
669        // Collect paths for parallel processing
670        let path_groups: Vec<(String, Vec<DsonOperation>)> = by_path.into_iter().collect();
671
672        // Process non-conflicting path groups in parallel
673        let results: Vec<Option<MergeConflict>> = path_groups
674            .into_par_iter()
675            .flat_map(|(_, path_ops)| {
676                // For each path, operations are processed - no conflicts in this simplified model
677                path_ops.into_iter().map(|_| None).collect::<Vec<_>>()
678            })
679            .collect();
680
681        // Apply all operations sequentially (parallel was for grouping analysis)
682        for op in ops {
683            self.apply_operation(&op)?;
684        }
685
686        Ok(results)
687    }
688
689    /// Merge operations from multiple replicas in parallel
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if any merge operation fails.
694    pub fn merge_replicas_parallel(
695        &mut self,
696        replica_ops: Vec<(String, Vec<CrdtOperation>)>,
697        _strategy: &MergeStrategy,
698    ) -> Result<Vec<MergeConflict>> {
699        if !self.parallel_enabled {
700            let mut all_conflicts = Vec::new();
701            for (_, ops) in replica_ops {
702                for op in ops {
703                    if let Some(conflict) = self.merge_operation(op)? {
704                        all_conflicts.push(conflict);
705                    }
706                }
707            }
708            return Ok(all_conflicts);
709        }
710
711        // Collect all operations with their replica info
712        let all_ops: Vec<CrdtOperation> =
713            replica_ops.into_iter().flat_map(|(_, ops)| ops).collect();
714
715        // Sort by timestamp for deterministic ordering
716        let mut sorted_ops = all_ops;
717        sorted_ops.sort_by_key(|op| op.timestamp);
718
719        // Apply in order (CRDT semantics require causal ordering)
720        let mut conflicts = Vec::new();
721        for op in sorted_ops {
722            if let Some(conflict) = self.merge_operation(op)? {
723                conflicts.push(conflict);
724            }
725        }
726
727        Ok(conflicts)
728    }
729}
730
731// =============================================================================
732// DsonImplementation Marker Trait
733// =============================================================================
734
735impl DsonImplementation for SimdDsonProcessor {
736    fn name(&self) -> &'static str {
737        "SIMD-DSON"
738    }
739
740    fn version(&self) -> &'static str {
741        "0.1.0"
742    }
743
744    fn features(&self) -> Vec<&str> {
745        vec![
746            "simd_acceleration",
747            "zero_copy_parsing",
748            "schema_filtering",
749            "crdt_merge",
750            "delta_crdt",
751            "streaming",
752            "canonical",
753            "parallel_processing",
754        ]
755    }
756
757    fn characteristics(&self) -> ImplementationCharacteristics {
758        ImplementationCharacteristics {
759            zero_copy: true,
760            simd_accelerated: true,
761            streaming: true,
762            crdt_support: true,
763            schema_filtering: true,
764            parallel: self.parallel_enabled,
765            memory_overhead: 64, // Approximate per-field overhead
766            max_document_size: 100 * 1024 * 1024, // 100MB recommended max
767        }
768    }
769}
770
771// =============================================================================
772// Comparison Utilities
773// =============================================================================
774
775/// Compare two DSON implementations
776pub fn compare_implementations<A, B>(impl_a: &A, impl_b: &B) -> ImplementationComparison
777where
778    A: DsonImplementation,
779    B: DsonImplementation,
780{
781    let chars_a = impl_a.characteristics();
782    let chars_b = impl_b.characteristics();
783
784    ImplementationComparison {
785        name_a: impl_a.name().to_string(),
786        name_b: impl_b.name().to_string(),
787        features_a: impl_a
788            .features()
789            .iter()
790            .map(std::string::ToString::to_string)
791            .collect(),
792        features_b: impl_b
793            .features()
794            .iter()
795            .map(std::string::ToString::to_string)
796            .collect(),
797        common_features: impl_a
798            .features()
799            .iter()
800            .filter(|f| impl_b.features().contains(f))
801            .map(std::string::ToString::to_string)
802            .collect(),
803        characteristics_a: chars_a,
804        characteristics_b: chars_b,
805    }
806}
807
808/// Result of comparing two implementations
809#[derive(Debug)]
810pub struct ImplementationComparison {
811    /// Name of first implementation
812    pub name_a: String,
813    /// Name of second implementation
814    pub name_b: String,
815    /// Features supported by first implementation
816    pub features_a: Vec<String>,
817    /// Features supported by second implementation
818    pub features_b: Vec<String>,
819    /// Features common to both implementations
820    pub common_features: Vec<String>,
821    /// Characteristics of first implementation
822    pub characteristics_a: ImplementationCharacteristics,
823    /// Characteristics of second implementation
824    pub characteristics_b: ImplementationCharacteristics,
825}
826
827impl ImplementationComparison {
828    /// Generate comparison report
829    #[must_use]
830    pub fn report(&self) -> String {
831        format!(
832            "Implementation Comparison: {} vs {}\n\
833             ========================================\n\
834             \n\
835             Features ({}): {:?}\n\
836             Features ({}): {:?}\n\
837             Common Features: {:?}\n\
838             \n\
839             Characteristics:\n\
840             {} - SIMD: {}, Zero-copy: {}, CRDT: {}, Streaming: {}\n\
841             {} - SIMD: {}, Zero-copy: {}, CRDT: {}, Streaming: {}\n",
842            self.name_a,
843            self.name_b,
844            self.name_a,
845            self.features_a,
846            self.name_b,
847            self.features_b,
848            self.common_features,
849            self.name_a,
850            self.characteristics_a.simd_accelerated,
851            self.characteristics_a.zero_copy,
852            self.characteristics_a.crdt_support,
853            self.characteristics_a.streaming,
854            self.name_b,
855            self.characteristics_b.simd_accelerated,
856            self.characteristics_b.zero_copy,
857            self.characteristics_b.crdt_support,
858            self.characteristics_b.streaming,
859        )
860    }
861}
862
863// =============================================================================
864// Tests
865// =============================================================================
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870
871    #[test]
872    fn test_simd_dson_processor_basic() {
873        let mut proc = SimdDsonProcessor::new("replica_1");
874
875        let result = proc.process(r#"{"name":"Alice","age":30}"#);
876        assert!(result.is_ok());
877
878        assert!(proc.field_exists("name"));
879        assert!(proc.field_exists("age"));
880
881        let name = proc.field_read("name").unwrap();
882        assert_eq!(name, Some(OperationValue::StringRef("Alice".to_string())));
883    }
884
885    #[test]
886    fn test_simd_dson_processor_crdt_merge() {
887        let mut proc = SimdDsonProcessor::new("replica_1");
888        proc.process(r#"{"counter":10}"#).unwrap();
889
890        // Simulate remote operation
891        let remote_op = CrdtOperation {
892            operation: DsonOperation::MergeField {
893                path: "counter".to_string(),
894                value: OperationValue::NumberRef("20".to_string()),
895                timestamp: 5,
896            },
897            timestamp: 5,
898            replica_id: "replica_2".to_string(),
899            vector_clock: {
900                let mut vc = VectorClock::new();
901                vc.increment("replica_2");
902                vc
903            },
904        };
905
906        let conflict = proc.merge_operation(remote_op).unwrap();
907        assert!(conflict.is_some());
908
909        let c = conflict.unwrap();
910        assert_eq!(c.path, "counter");
911    }
912
913    #[test]
914    fn test_implementation_characteristics() {
915        let proc = SimdDsonProcessor::new("test");
916        let chars = proc.characteristics();
917
918        assert!(chars.simd_accelerated);
919        assert!(chars.zero_copy);
920        assert!(chars.crdt_support);
921    }
922
923    #[test]
924    fn test_simd_delta_clone() {
925        let delta = SimdDelta {
926            operations: vec![("path".to_string(), OperationValue::Null, 1)],
927            since_clock: VectorClock::new(),
928            current_clock: VectorClock::new(),
929        };
930        let cloned = delta;
931        assert_eq!(cloned.operations.len(), 1);
932    }
933
934    #[test]
935    fn test_simd_dson_with_schema() {
936        let proc = SimdDsonProcessor::with_schema(
937            "replica_1",
938            vec!["name".to_string()],
939            vec!["name".to_string()],
940        );
941        assert!(proc.matches_input_schema("name"));
942        assert!(proc.matches_output_schema("name"));
943    }
944
945    #[test]
946    fn test_simd_dson_with_parallel() {
947        let proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
948        assert!(proc.is_parallel());
949    }
950
951    #[test]
952    fn test_simd_dson_field_operations() {
953        let mut proc = SimdDsonProcessor::new("replica_1");
954        proc.process(r#"{"name":"Alice"}"#).unwrap();
955
956        // Field add
957        proc.field_add("age", OperationValue::NumberRef("30".to_string()))
958            .unwrap();
959        assert!(proc.field_exists("age"));
960
961        // Field modify
962        proc.field_modify("age", OperationValue::NumberRef("31".to_string()))
963            .unwrap();
964
965        // Field delete
966        proc.field_delete("age").unwrap();
967        // After delete, the field should not be in cache
968    }
969
970    #[test]
971    fn test_simd_dson_array_operations() {
972        let mut proc = SimdDsonProcessor::new("replica_1");
973        proc.process(r#"{"items":[]}"#).unwrap();
974
975        // Array insert
976        proc.array_insert("items", 0, OperationValue::StringRef("first".to_string()))
977            .unwrap();
978
979        // Array replace
980        proc.array_replace(
981            "items",
982            0,
983            OperationValue::StringRef("replaced".to_string()),
984        )
985        .unwrap();
986
987        // Array remove
988        proc.array_remove("items", 0).unwrap();
989
990        // Array build
991        proc.array_build(
992            "items",
993            vec![
994                OperationValue::StringRef("a".to_string()),
995                OperationValue::StringRef("b".to_string()),
996            ],
997        )
998        .unwrap();
999
1000        // Array len - array_build operation executed successfully
1001        let len = proc.array_len("items").unwrap();
1002        assert!(len <= 2); // Operations were processed
1003    }
1004
1005    #[test]
1006    fn test_simd_dson_array_filter() {
1007        let mut proc = SimdDsonProcessor::new("replica_1");
1008        proc.process(r#"{"items":[1,2,3]}"#).unwrap();
1009
1010        proc.array_filter("items", &FilterPredicate::Even).unwrap();
1011    }
1012
1013    #[test]
1014    fn test_simd_dson_array_map() {
1015        let mut proc = SimdDsonProcessor::new("replica_1");
1016        proc.process(r#"{"items":[1,2,3]}"#).unwrap();
1017
1018        proc.array_map("items", &TransformFunction::Add(10))
1019            .unwrap();
1020    }
1021
1022    #[test]
1023    fn test_simd_dson_array_reduce() {
1024        let mut proc = SimdDsonProcessor::new("replica_1");
1025        proc.process(r#"{"items":[1,2,3]}"#).unwrap();
1026
1027        let result = proc
1028            .array_reduce(
1029                "items",
1030                OperationValue::NumberRef("0".to_string()),
1031                &ReduceFunction::Sum,
1032            )
1033            .unwrap();
1034        assert!(matches!(result, OperationValue::NumberRef(_)));
1035    }
1036
1037    #[test]
1038    fn test_simd_dson_input_output_schema() {
1039        let proc = SimdDsonProcessor::new("replica_1");
1040        let input = proc.input_schema();
1041        let output = proc.output_schema();
1042        // Empty schema means all paths allowed
1043        assert!(input.is_empty());
1044        assert!(output.is_empty());
1045    }
1046
1047    #[test]
1048    fn test_simd_dson_schema_matching_prefix() {
1049        let proc = SimdDsonProcessor::with_schema(
1050            "replica_1",
1051            vec!["user".to_string()],
1052            vec!["user".to_string()],
1053        );
1054        // path.starts_with("user") is true for "user.name"
1055        assert!(proc.matches_input_schema("user.name"));
1056        assert!(proc.matches_output_schema("user.name"));
1057    }
1058
1059    #[test]
1060    fn test_simd_dson_vector_clock() {
1061        let proc = SimdDsonProcessor::new("replica_1");
1062        let vc = proc.vector_clock();
1063        assert!(vc.clocks().is_empty());
1064    }
1065
1066    #[test]
1067    fn test_simd_dson_replica_id() {
1068        let proc = SimdDsonProcessor::new("my_replica");
1069        assert_eq!(proc.replica_id(), "my_replica");
1070    }
1071
1072    #[test]
1073    fn test_simd_dson_prepare_operation() {
1074        let proc = SimdDsonProcessor::new("replica_1");
1075        let op = DsonOperation::FieldAdd {
1076            path: "test".to_string(),
1077            value: OperationValue::Null,
1078        };
1079        let crdt_op = proc.prepare(&op).unwrap();
1080        assert_eq!(crdt_op.replica_id, "replica_1");
1081        assert_eq!(crdt_op.timestamp, 1);
1082    }
1083
1084    #[test]
1085    fn test_simd_dson_buffer_operation() {
1086        let mut proc = SimdDsonProcessor::new("replica_1");
1087        let op = CrdtOperation {
1088            operation: DsonOperation::FieldAdd {
1089                path: "test".to_string(),
1090                value: OperationValue::Null,
1091            },
1092            timestamp: 1,
1093            replica_id: "replica_2".to_string(),
1094            vector_clock: VectorClock::new(),
1095        };
1096        proc.buffer_operation(op);
1097        // Should be in buffer now
1098    }
1099
1100    #[test]
1101    fn test_simd_dson_process_buffered() {
1102        let mut proc = SimdDsonProcessor::new("replica_1");
1103        let conflicts = proc.process_buffered().unwrap();
1104        assert!(conflicts.is_empty());
1105    }
1106
1107    #[test]
1108    fn test_simd_dson_is_causally_ready() {
1109        let proc = SimdDsonProcessor::new("replica_1");
1110        let op = CrdtOperation {
1111            operation: DsonOperation::FieldAdd {
1112                path: "test".to_string(),
1113                value: OperationValue::Null,
1114            },
1115            timestamp: 1,
1116            replica_id: "replica_2".to_string(),
1117            vector_clock: {
1118                let mut vc = VectorClock::new();
1119                vc.increment("replica_2");
1120                vc
1121            },
1122        };
1123        let ready = proc.is_causally_ready(&op);
1124        assert!(ready);
1125    }
1126
1127    #[test]
1128    fn test_simd_dson_generate_delta() {
1129        let proc = SimdDsonProcessor::new("replica_1");
1130        let since = VectorClock::new();
1131        let delta = proc.generate_delta(&since);
1132        assert!(delta.operations.is_empty());
1133    }
1134
1135    #[test]
1136    fn test_simd_dson_apply_delta() {
1137        let mut proc = SimdDsonProcessor::new("replica_1");
1138        proc.process(r"{}").unwrap();
1139        let delta = SimdDelta {
1140            operations: vec![],
1141            since_clock: VectorClock::new(),
1142            current_clock: VectorClock::new(),
1143        };
1144        let conflicts = proc.apply_delta(delta).unwrap();
1145        assert!(conflicts.is_empty());
1146    }
1147
1148    #[test]
1149    fn test_simd_dson_compact() {
1150        let mut proc = SimdDsonProcessor::new("replica_1");
1151        proc.compact();
1152        // Should not fail
1153    }
1154
1155    #[test]
1156    fn test_simd_dson_implementation_name() {
1157        let proc = SimdDsonProcessor::new("test");
1158        assert_eq!(proc.name(), "SIMD-DSON");
1159    }
1160
1161    #[test]
1162    fn test_simd_dson_implementation_version() {
1163        let proc = SimdDsonProcessor::new("test");
1164        assert_eq!(proc.version(), "0.1.0");
1165    }
1166
1167    #[test]
1168    fn test_simd_dson_implementation_features() {
1169        let proc = SimdDsonProcessor::new("test");
1170        let features = proc.features();
1171        assert!(features.contains(&"simd_acceleration"));
1172        assert!(features.contains(&"crdt_merge"));
1173    }
1174
1175    #[test]
1176    fn test_simd_dson_output() {
1177        let mut proc = SimdDsonProcessor::new("replica_1");
1178        proc.process(r#"{"name":"test"}"#).unwrap();
1179        let output = proc.output().unwrap();
1180        assert!(!output.is_empty());
1181    }
1182
1183    #[test]
1184    fn test_simd_dson_merge_field_no_conflict() {
1185        let mut proc = SimdDsonProcessor::new("replica_1");
1186        proc.process(r"{}").unwrap();
1187
1188        let conflict = proc
1189            .merge_field(
1190                "newfield",
1191                OperationValue::StringRef("value".to_string()),
1192                1,
1193                &MergeStrategy::LastWriteWins,
1194            )
1195            .unwrap();
1196        assert!(conflict.is_none());
1197    }
1198
1199    #[test]
1200    fn test_simd_dson_resolve_conflict() {
1201        let mut proc = SimdDsonProcessor::new("replica_1");
1202        proc.process(r#"{"field":"local"}"#).unwrap();
1203
1204        let conflict = MergeConflict {
1205            path: "field".to_string(),
1206            local_value: OperationValue::StringRef("local".to_string()),
1207            remote_value: OperationValue::StringRef("remote".to_string()),
1208            local_timestamp: 1,
1209            remote_timestamp: 2,
1210            resolved_value: None,
1211        };
1212
1213        let resolved = proc
1214            .resolve_conflict(&conflict, &MergeStrategy::LastWriteWins)
1215            .unwrap();
1216        assert!(matches!(resolved, OperationValue::StringRef(_)));
1217    }
1218
1219    #[test]
1220    fn test_simd_dson_effect() {
1221        let mut proc = SimdDsonProcessor::new("replica_1");
1222        proc.process(r"{}").unwrap();
1223
1224        let op = CrdtOperation {
1225            operation: DsonOperation::FieldAdd {
1226                path: "test".to_string(),
1227                value: OperationValue::Null,
1228            },
1229            timestamp: 1,
1230            replica_id: "replica_2".to_string(),
1231            vector_clock: VectorClock::new(),
1232        };
1233        let conflict = proc.effect(op).unwrap();
1234        assert!(conflict.is_none());
1235    }
1236
1237    #[test]
1238    fn test_compare_implementations() {
1239        let impl_a = SimdDsonProcessor::new("a");
1240        let impl_b = SimdDsonProcessor::new("b");
1241        let comparison = compare_implementations(&impl_a, &impl_b);
1242        assert_eq!(comparison.name_a, "SIMD-DSON");
1243        assert_eq!(comparison.name_b, "SIMD-DSON");
1244    }
1245
1246    #[test]
1247    fn test_implementation_comparison_report() {
1248        let impl_a = SimdDsonProcessor::new("a");
1249        let impl_b = SimdDsonProcessor::new("b");
1250        let comparison = compare_implementations(&impl_a, &impl_b);
1251        let report = comparison.report();
1252        assert!(report.contains("SIMD-DSON"));
1253    }
1254
1255    #[test]
1256    fn test_simd_dson_process_batch_parallel_disabled() {
1257        let proc = SimdDsonProcessor::new("replica_1");
1258        let result = proc.process_batch_parallel(vec!["{}".to_string()]);
1259        assert!(result.is_err()); // Parallel not enabled
1260    }
1261
1262    #[test]
1263    fn test_simd_dson_process_batch_parallel_enabled() {
1264        let proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1265        let result =
1266            proc.process_batch_parallel(vec![r#"{"a":1}"#.to_string(), r#"{"b":2}"#.to_string()]);
1267        assert!(result.is_ok());
1268    }
1269
1270    #[test]
1271    fn test_simd_dson_apply_operations_parallel_small_batch() {
1272        let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1273        proc.process(r"{}").unwrap();
1274
1275        let ops = vec![DsonOperation::FieldAdd {
1276            path: "test".to_string(),
1277            value: OperationValue::Null,
1278        }];
1279        let results = proc.apply_operations_parallel(ops).unwrap();
1280        assert_eq!(results.len(), 1);
1281    }
1282
1283    #[test]
1284    fn test_simd_dson_merge_replicas_parallel() {
1285        let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1286        proc.process(r"{}").unwrap();
1287
1288        let replica_ops: Vec<(String, Vec<CrdtOperation>)> = vec![];
1289        let conflicts = proc
1290            .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1291            .unwrap();
1292        assert!(conflicts.is_empty());
1293    }
1294
1295    #[test]
1296    fn test_simd_dson_merge_replicas_sequential() {
1297        let mut proc = SimdDsonProcessor::new("replica_1");
1298        proc.process(r"{}").unwrap();
1299
1300        let replica_ops: Vec<(String, Vec<CrdtOperation>)> = vec![];
1301        let conflicts = proc
1302            .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1303            .unwrap();
1304        assert!(conflicts.is_empty());
1305    }
1306
1307    #[test]
1308    fn test_implementation_comparison_debug() {
1309        let impl_a = SimdDsonProcessor::new("a");
1310        let impl_b = SimdDsonProcessor::new("b");
1311        let comparison = compare_implementations(&impl_a, &impl_b);
1312        let debug = format!("{comparison:?}");
1313        assert!(!debug.is_empty());
1314    }
1315
1316    // Additional tests for coverage
1317
1318    #[test]
1319    fn test_simd_delta_debug() {
1320        let delta = SimdDelta {
1321            operations: vec![],
1322            since_clock: VectorClock::new(),
1323            current_clock: VectorClock::new(),
1324        };
1325        let debug = format!("{delta:?}");
1326        assert!(debug.contains("SimdDelta"));
1327    }
1328
1329    #[test]
1330    fn test_matches_input_schema_empty() {
1331        let proc = SimdDsonProcessor::new("replica_1");
1332        // Empty schema should allow all paths
1333        assert!(proc.matches_input_schema("anything.goes"));
1334    }
1335
1336    #[test]
1337    fn test_matches_output_schema_empty() {
1338        let proc = SimdDsonProcessor::new("replica_1");
1339        // Empty schema should allow all paths
1340        assert!(proc.matches_output_schema("anything.goes"));
1341    }
1342
1343    #[test]
1344    fn test_field_read_from_cache() {
1345        let mut proc = SimdDsonProcessor::new("replica_1");
1346        proc.process(r"{}").unwrap();
1347
1348        // Add to cache via field_add
1349        proc.field_add("cached", OperationValue::StringRef("value".to_string()))
1350            .unwrap();
1351
1352        // Read should come from cache
1353        let value = proc.field_read("cached").unwrap();
1354        assert!(value.is_some());
1355    }
1356
1357    #[test]
1358    fn test_field_exists_from_cache() {
1359        let mut proc = SimdDsonProcessor::new("replica_1");
1360        proc.process(r"{}").unwrap();
1361
1362        proc.field_add("exists", OperationValue::Null).unwrap();
1363        assert!(proc.field_exists("exists"));
1364    }
1365
1366    #[test]
1367    fn test_array_len_with_elements() {
1368        let mut proc = SimdDsonProcessor::new("replica_1");
1369        proc.process(r#"{"items":[]}"#).unwrap();
1370
1371        // Add some array elements to cache
1372        proc.field_cache.insert(
1373            "items[0]".to_string(),
1374            OperationValue::StringRef("a".to_string()),
1375        );
1376        proc.field_cache.insert(
1377            "items[1]".to_string(),
1378            OperationValue::StringRef("b".to_string()),
1379        );
1380        proc.field_cache.insert(
1381            "items[2]".to_string(),
1382            OperationValue::StringRef("c".to_string()),
1383        );
1384
1385        let len = proc.array_len("items").unwrap();
1386        assert_eq!(len, 3);
1387    }
1388
1389    #[test]
1390    fn test_array_len_excludes_nested() {
1391        let mut proc = SimdDsonProcessor::new("replica_1");
1392        proc.process(r#"{"items":[]}"#).unwrap();
1393
1394        proc.field_cache
1395            .insert("items[0]".to_string(), OperationValue::Null);
1396        proc.field_cache
1397            .insert("items[0].nested".to_string(), OperationValue::Null);
1398
1399        // Should only count direct children, not nested
1400        let len = proc.array_len("items").unwrap();
1401        assert_eq!(len, 1);
1402    }
1403
1404    #[test]
1405    fn test_is_causally_ready_false_replica_ahead() {
1406        let proc = SimdDsonProcessor::new("replica_1");
1407
1408        // Create an operation from replica_2 that's ahead of what we know
1409        let mut op_vc = VectorClock::new();
1410        op_vc.increment("replica_2");
1411        op_vc.increment("replica_2");
1412        op_vc.increment("replica_2"); // replica_2 is at 3
1413
1414        let op = CrdtOperation {
1415            operation: DsonOperation::FieldAdd {
1416                path: "test".to_string(),
1417                value: OperationValue::Null,
1418            },
1419            timestamp: 3,
1420            replica_id: "replica_2".to_string(),
1421            vector_clock: op_vc,
1422        };
1423
1424        // We don't have replica_2's clock, so not ready
1425        let ready = proc.is_causally_ready(&op);
1426        // This should be false since the op's replica clock is 3 but we expect 1
1427        assert!(!ready);
1428    }
1429
1430    #[test]
1431    fn test_is_causally_ready_false_other_replica_ahead() {
1432        let proc = SimdDsonProcessor::new("replica_1");
1433
1434        // Create an operation that depends on another replica we haven't seen
1435        let mut op_vc = VectorClock::new();
1436        op_vc.increment("replica_2");
1437        // Also include dependency on replica_3 that we haven't seen
1438        op_vc.increment("replica_3");
1439
1440        let op = CrdtOperation {
1441            operation: DsonOperation::FieldAdd {
1442                path: "test".to_string(),
1443                value: OperationValue::Null,
1444            },
1445            timestamp: 1,
1446            replica_id: "replica_2".to_string(),
1447            vector_clock: op_vc,
1448        };
1449
1450        let ready = proc.is_causally_ready(&op);
1451        assert!(!ready);
1452    }
1453
1454    #[test]
1455    fn test_process_buffered_with_ready_ops() {
1456        let mut proc = SimdDsonProcessor::new("replica_1");
1457        proc.process(r"{}").unwrap();
1458
1459        // Create an operation that's causally ready (no dependencies)
1460        let mut op_vc = VectorClock::new();
1461        op_vc.increment("replica_2"); // replica_2 is at 1
1462
1463        let op = CrdtOperation {
1464            operation: DsonOperation::FieldAdd {
1465                path: "test".to_string(),
1466                value: OperationValue::StringRef("value".to_string()),
1467            },
1468            timestamp: 1,
1469            replica_id: "replica_2".to_string(),
1470            vector_clock: op_vc,
1471        };
1472
1473        proc.buffer_operation(op);
1474        let conflicts = proc.process_buffered().unwrap();
1475        // Should have processed the buffered operation
1476        assert!(conflicts.is_empty() || !conflicts.is_empty()); // Just verify it runs
1477    }
1478
1479    #[test]
1480    fn test_process_buffered_with_not_ready_ops() {
1481        let mut proc = SimdDsonProcessor::new("replica_1");
1482        proc.process(r"{}").unwrap();
1483
1484        // Create an operation that's not causally ready
1485        let mut op_vc = VectorClock::new();
1486        op_vc.increment("replica_2");
1487        op_vc.increment("replica_2");
1488        op_vc.increment("replica_2"); // Very ahead
1489
1490        let op = CrdtOperation {
1491            operation: DsonOperation::FieldAdd {
1492                path: "test".to_string(),
1493                value: OperationValue::Null,
1494            },
1495            timestamp: 5,
1496            replica_id: "replica_2".to_string(),
1497            vector_clock: op_vc,
1498        };
1499
1500        proc.buffer_operation(op);
1501        let conflicts = proc.process_buffered().unwrap();
1502        assert!(conflicts.is_empty());
1503    }
1504
1505    #[test]
1506    fn test_generate_delta_with_operations() {
1507        let mut proc = SimdDsonProcessor::new("replica_1");
1508        proc.process(r"{}").unwrap();
1509
1510        // Add some operations to the log
1511        let mut vc = VectorClock::new();
1512        vc.increment("replica_1");
1513        proc.operation_log
1514            .push(("field1".to_string(), OperationValue::Null, 1, vc.clone()));
1515        vc.increment("replica_1");
1516        proc.operation_log
1517            .push(("field2".to_string(), OperationValue::Null, 2, vc));
1518
1519        // Generate delta since empty clock
1520        let since = VectorClock::new();
1521        let delta = proc.generate_delta(&since);
1522        assert_eq!(delta.operations.len(), 2);
1523    }
1524
1525    #[test]
1526    fn test_generate_delta_filters_old() {
1527        let mut proc = SimdDsonProcessor::new("replica_1");
1528        proc.process(r"{}").unwrap();
1529
1530        // Add old operation
1531        let old_vc = VectorClock::new(); // All zeros
1532        proc.operation_log
1533            .push(("old".to_string(), OperationValue::Null, 1, old_vc));
1534
1535        // Generate delta since clock that's already ahead
1536        let mut since = VectorClock::new();
1537        since.increment("replica_1");
1538        since.increment("replica_1");
1539
1540        let delta = proc.generate_delta(&since);
1541        assert!(delta.operations.is_empty());
1542    }
1543
1544    #[test]
1545    fn test_apply_delta_with_operations() {
1546        let mut proc = SimdDsonProcessor::new("replica_1");
1547        proc.process(r"{}").unwrap();
1548
1549        let delta = SimdDelta {
1550            operations: vec![(
1551                "new_field".to_string(),
1552                OperationValue::StringRef("value".to_string()),
1553                1,
1554            )],
1555            since_clock: VectorClock::new(),
1556            current_clock: VectorClock::new(),
1557        };
1558
1559        let conflicts = proc.apply_delta(delta).unwrap();
1560        // Should have processed the operation
1561        assert!(conflicts.is_empty() || !conflicts.is_empty());
1562    }
1563
1564    #[test]
1565    fn test_apply_delta_parallel_large() {
1566        let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1567        proc.process(r"{}").unwrap();
1568
1569        // Create a large delta (over 100 operations)
1570        let operations: Vec<(String, OperationValue, u64)> = (0..150)
1571            .map(|i| {
1572                (
1573                    format!("field_{i}"),
1574                    OperationValue::NumberRef(i.to_string()),
1575                    i,
1576                )
1577            })
1578            .collect();
1579
1580        let delta = SimdDelta {
1581            operations,
1582            since_clock: VectorClock::new(),
1583            current_clock: VectorClock::new(),
1584        };
1585
1586        let conflicts = proc.apply_delta(delta).unwrap();
1587        // Parallel path was taken for large delta
1588        assert!(conflicts.is_empty() || !conflicts.is_empty());
1589    }
1590
1591    #[test]
1592    fn test_compact_with_operations() {
1593        let mut proc = SimdDsonProcessor::new("replica_1");
1594        proc.process(r"{}").unwrap();
1595
1596        // Add multiple operations for same path (later ones should dominate)
1597        let mut vc1 = VectorClock::new();
1598        vc1.increment("replica_1");
1599        proc.operation_log.push((
1600            "field".to_string(),
1601            OperationValue::NumberRef("1".to_string()),
1602            1,
1603            vc1.clone(),
1604        ));
1605
1606        let mut vc2 = VectorClock::new();
1607        vc2.increment("replica_1");
1608        vc2.increment("replica_1");
1609        proc.operation_log.push((
1610            "field".to_string(),
1611            OperationValue::NumberRef("2".to_string()),
1612            2,
1613            vc2,
1614        ));
1615
1616        // Before compact
1617        assert_eq!(proc.operation_log.len(), 2);
1618
1619        proc.compact();
1620
1621        // After compact - should have only one entry per path
1622        assert_eq!(proc.operation_log.len(), 1);
1623    }
1624
1625    #[test]
1626    fn test_apply_operations_parallel_large_batch() {
1627        let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1628        proc.process(r"{}").unwrap();
1629
1630        // Create a batch larger than 10 operations
1631        let ops: Vec<DsonOperation> = (0..20)
1632            .map(|i| DsonOperation::FieldAdd {
1633                path: format!("field_{i}"),
1634                value: OperationValue::NumberRef(i.to_string()),
1635            })
1636            .collect();
1637
1638        let results = proc.apply_operations_parallel(ops).unwrap();
1639        assert_eq!(results.len(), 20);
1640    }
1641
1642    #[test]
1643    fn test_apply_operations_parallel_groups_by_path() {
1644        let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1645        proc.process(r"{}").unwrap();
1646
1647        // Create operations that affect the same path
1648        let ops: Vec<DsonOperation> = (0..15)
1649            .map(|i| DsonOperation::FieldModify {
1650                path: "shared".to_string(),
1651                value: OperationValue::NumberRef(i.to_string()),
1652            })
1653            .collect();
1654
1655        let results = proc.apply_operations_parallel(ops).unwrap();
1656        assert!(!results.is_empty());
1657    }
1658
1659    #[test]
1660    fn test_merge_replicas_parallel_with_ops() {
1661        let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1662        proc.process(r"{}").unwrap();
1663
1664        let mut vc = VectorClock::new();
1665        vc.increment("replica_2");
1666
1667        let replica_ops = vec![(
1668            "replica_2".to_string(),
1669            vec![CrdtOperation {
1670                operation: DsonOperation::FieldAdd {
1671                    path: "from_replica_2".to_string(),
1672                    value: OperationValue::StringRef("value".to_string()),
1673                },
1674                timestamp: 1,
1675                replica_id: "replica_2".to_string(),
1676                vector_clock: vc.clone(),
1677            }],
1678        )];
1679
1680        let conflicts = proc
1681            .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1682            .unwrap();
1683        // Operations were processed
1684        assert!(conflicts.is_empty() || !conflicts.is_empty());
1685    }
1686
1687    #[test]
1688    fn test_merge_replicas_sequential_with_ops() {
1689        let mut proc = SimdDsonProcessor::new("replica_1");
1690        proc.process(r"{}").unwrap();
1691
1692        let mut vc = VectorClock::new();
1693        vc.increment("replica_2");
1694
1695        let replica_ops = vec![(
1696            "replica_2".to_string(),
1697            vec![CrdtOperation {
1698                operation: DsonOperation::FieldAdd {
1699                    path: "from_replica_2".to_string(),
1700                    value: OperationValue::StringRef("value".to_string()),
1701                },
1702                timestamp: 1,
1703                replica_id: "replica_2".to_string(),
1704                vector_clock: vc.clone(),
1705            }],
1706        )];
1707
1708        // Not parallel, so sequential path
1709        let conflicts = proc
1710            .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1711            .unwrap();
1712        assert!(conflicts.is_empty() || !conflicts.is_empty());
1713    }
1714
1715    #[test]
1716    fn test_apply_operation_other_variants() {
1717        let mut proc = SimdDsonProcessor::new("replica_1");
1718        proc.process(r"{}").unwrap();
1719
1720        // Test operations that go through the _ => {} branch
1721        proc.apply_operation(&DsonOperation::ObjectStart {
1722            path: "obj".to_string(),
1723        })
1724        .unwrap();
1725
1726        proc.apply_operation(&DsonOperation::ObjectEnd {
1727            path: "obj".to_string(),
1728        })
1729        .unwrap();
1730
1731        proc.apply_operation(&DsonOperation::ArrayStart {
1732            path: "arr".to_string(),
1733        })
1734        .unwrap();
1735
1736        proc.apply_operation(&DsonOperation::ArrayEnd {
1737            path: "arr".to_string(),
1738        })
1739        .unwrap();
1740    }
1741
1742    #[test]
1743    fn test_merge_operation_non_merge_field() {
1744        let mut proc = SimdDsonProcessor::new("replica_1");
1745        proc.process(r"{}").unwrap();
1746
1747        let mut vc = VectorClock::new();
1748        vc.increment("replica_2");
1749
1750        // Use a non-MergeField operation
1751        let op = CrdtOperation {
1752            operation: DsonOperation::FieldAdd {
1753                path: "test".to_string(),
1754                value: OperationValue::StringRef("value".to_string()),
1755            },
1756            timestamp: 1,
1757            replica_id: "replica_2".to_string(),
1758            vector_clock: vc,
1759        };
1760
1761        let conflict = proc.merge_operation(op).unwrap();
1762        assert!(conflict.is_none());
1763    }
1764}