Skip to main content

fionn_ops/
operations.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! Canonical DSON Operations and Zero-Allocation Processing
3//!
4//! This module defines the complete set of operations for schema-aware DSON processing.
5//! Operations are designed for zero-allocation streaming with JSON path-based filtering.
6
7use fionn_core::{DsonError, Result};
8use std::collections::{HashMap, HashSet};
9
10/// Complete set of DSON operations
11#[derive(Debug, Clone, PartialEq)]
12pub enum DsonOperation {
13    // Structural Operations
14    /// Start a new object at the specified path
15    ObjectStart {
16        /// Path where the object should be created
17        path: String,
18    },
19    /// End the current object at the specified path
20    ObjectEnd {
21        /// Path of the object being ended
22        path: String,
23    },
24    /// Start a new array at the specified path
25    ArrayStart {
26        /// Path where the array should be created
27        path: String,
28    },
29    /// End the current array at the specified path
30    ArrayEnd {
31        /// Path of the array being ended
32        path: String,
33    },
34
35    /// Add a new field at path with value
36    FieldAdd {
37        /// Target path
38        path: String,
39        /// Value to add
40        value: OperationValue,
41    },
42    /// Modify existing field at path
43    FieldModify {
44        /// Target path
45        path: String,
46        /// New value
47        value: OperationValue,
48    },
49    /// Delete field at path
50    FieldDelete {
51        /// Target path
52        path: String,
53    },
54
55    /// Insert element into array at index
56    ArrayInsert {
57        /// Array path
58        path: String,
59        /// Insert position
60        index: usize,
61        /// Value to insert
62        value: OperationValue,
63    },
64    /// Remove element from array at index
65    ArrayRemove {
66        /// Array path
67        path: String,
68        /// Remove position
69        index: usize,
70    },
71    /// Replace element in array at index
72    ArrayReplace {
73        /// Array path
74        path: String,
75        /// Replace position
76        index: usize,
77        /// New value
78        value: OperationValue,
79    },
80
81    /// Check that path exists
82    CheckPresence {
83        /// Path to check
84        path: String,
85    },
86    /// Check that path does not exist
87    CheckAbsence {
88        /// Path to check
89        path: String,
90    },
91    /// Check that value at path is null
92    CheckNull {
93        /// Path to check
94        path: String,
95    },
96    /// Check that value at path is not null
97    CheckNotNull {
98        /// Path to check
99        path: String,
100    },
101
102    /// Merge field with CRDT semantics
103    MergeField {
104        /// Target path
105        path: String,
106        /// Value to merge
107        value: OperationValue,
108        /// Operation timestamp
109        timestamp: u64,
110    },
111    /// Resolve conflict using strategy
112    ConflictResolve {
113        /// Conflict path
114        path: String,
115        /// Resolution strategy
116        strategy: MergeStrategy,
117    },
118
119    /// Build array from elements
120    ArrayBuild {
121        /// Array path
122        path: String,
123        /// Array elements
124        elements: Vec<OperationValue>,
125    },
126    /// Filter array elements
127    ArrayFilter {
128        /// Array path
129        path: String,
130        /// Filter predicate
131        predicate: FilterPredicate,
132    },
133    /// Transform array elements
134    ArrayMap {
135        /// Array path
136        path: String,
137        /// Transform function
138        transform: TransformFunction,
139    },
140    /// Reduce array to single value
141    ArrayReduce {
142        /// Array path
143        path: String,
144        /// Initial accumulator
145        initial: OperationValue,
146        /// Reducer function
147        reducer: ReduceFunction,
148    },
149
150    /// Build stream from generator
151    StreamBuild {
152        /// Stream path
153        path: String,
154        /// Value generator
155        generator: StreamGenerator,
156    },
157    /// Filter stream elements
158    StreamFilter {
159        /// Stream path
160        path: String,
161        /// Filter predicate
162        predicate: FilterPredicate,
163    },
164    /// Transform stream elements
165    StreamMap {
166        /// Stream path
167        path: String,
168        /// Transform function
169        transform: TransformFunction,
170    },
171    /// Emit stream batch
172    StreamEmit {
173        /// Stream path
174        path: String,
175        /// Batch size
176        batch_size: usize,
177    },
178
179    /// Execute batch of operations
180    BatchExecute {
181        /// Operations to execute
182        operations: Vec<Self>,
183    },
184}
185
186/// Values that can be operated on (re-exported from fionn-core)
187pub use fionn_core::OperationValue;
188
189/// Merge strategies for CRDT conflict resolution
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub enum MergeStrategy {
192    /// Most recent write wins
193    LastWriteWins,
194    /// Combine values additively
195    Additive,
196    /// Keep maximum value
197    Max,
198    /// Keep minimum value
199    Min,
200    /// Union of collections
201    Union,
202    /// Custom merge function
203    Custom(String),
204}
205
206/// Filter predicates for array/stream filtering
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum FilterPredicate {
209    /// Keep every nth element
210    EveryNth(usize),
211    /// Keep alternating elements (1st, 3rd, 5th...)
212    Alternate,
213    /// Keep even indices
214    Even,
215    /// Keep odd indices
216    Odd,
217    /// Keep values greater than threshold
218    GreaterThan(i64),
219    /// Keep values less than threshold
220    LessThan(i64),
221    /// Keep values equal to target
222    Equals(OperationValue),
223    /// Custom predicate expression
224    Custom(String),
225}
226
227/// Transform functions for mapping operations
228#[derive(Debug, Clone)]
229pub enum TransformFunction {
230    /// Add to numeric values
231    Add(i64),
232    /// Multiply numeric values
233    Multiply(i64),
234    /// Convert string to uppercase
235    ToUppercase,
236    /// Convert string to lowercase
237    ToLowercase,
238    /// Append suffix to strings
239    Append(String),
240    /// Prepend prefix to strings
241    Prepend(String),
242    /// Custom transform expression
243    Custom(String),
244}
245
246impl PartialEq for TransformFunction {
247    fn eq(&self, other: &Self) -> bool {
248        match (self, other) {
249            (Self::Add(a), Self::Add(b)) | (Self::Multiply(a), Self::Multiply(b)) => a == b,
250            (Self::ToUppercase, Self::ToUppercase) | (Self::ToLowercase, Self::ToLowercase) => true,
251            (Self::Append(a), Self::Append(b))
252            | (Self::Prepend(a), Self::Prepend(b))
253            | (Self::Custom(a), Self::Custom(b)) => a == b,
254            _ => false,
255        }
256    }
257}
258
259impl Eq for TransformFunction {}
260
261/// Reduce functions for aggregation
262#[derive(Debug, Clone)]
263pub enum ReduceFunction {
264    /// Sum numeric values
265    Sum,
266    /// Product of numeric values
267    Product,
268    /// Minimum value
269    Min,
270    /// Maximum value
271    Max,
272    /// Count elements
273    Count,
274    /// Concatenate strings
275    Concat,
276    /// Custom reduction expression
277    Custom(String),
278}
279
280impl PartialEq for ReduceFunction {
281    fn eq(&self, other: &Self) -> bool {
282        match (self, other) {
283            (Self::Sum, Self::Sum)
284            | (Self::Product, Self::Product)
285            | (Self::Min, Self::Min)
286            | (Self::Max, Self::Max)
287            | (Self::Count, Self::Count)
288            | (Self::Concat, Self::Concat) => true,
289            (Self::Custom(a), Self::Custom(b)) => a == b,
290            _ => false,
291        }
292    }
293}
294
295impl Eq for ReduceFunction {}
296
297/// Stream generators for streaming operations
298#[derive(Debug, Clone)]
299pub enum StreamGenerator {
300    /// Generate numeric range
301    Range {
302        /// Start value
303        start: i64,
304        /// End value (exclusive)
305        end: i64,
306        /// Step between values
307        step: i64,
308    },
309    /// Repeat value N times
310    Repeat(OperationValue, usize),
311    /// Generate fibonacci sequence
312    Fibonacci(usize),
313    /// Custom generator expression
314    Custom(String),
315}
316
317impl PartialEq for StreamGenerator {
318    fn eq(&self, other: &Self) -> bool {
319        match (self, other) {
320            (
321                Self::Range {
322                    start: s1,
323                    end: e1,
324                    step: st1,
325                },
326                Self::Range {
327                    start: s2,
328                    end: e2,
329                    step: st2,
330                },
331            ) => s1 == s2 && e1 == e2 && st1 == st2,
332            (Self::Repeat(v1, c1), Self::Repeat(v2, c2)) => v1 == v2 && c1 == c2,
333            (Self::Fibonacci(f1), Self::Fibonacci(f2)) => f1 == f2,
334            (Self::Custom(c1), Self::Custom(c2)) => c1 == c2,
335            _ => false,
336        }
337    }
338}
339
340impl Eq for StreamGenerator {}
341
342/// Canonical operation processor that optimizes and filters operations
343pub struct CanonicalOperationProcessor {
344    input_schema: HashSet<String>,             // Allowed input paths
345    output_schema: HashSet<String>,            // Allowed output paths
346    operations: Vec<DsonOperation>,            // Original operations
347    canonical_ops: Vec<DsonOperation>,         // Optimized canonical sequence
348    field_states: HashMap<String, FieldState>, // Track field lifecycle
349}
350
351#[derive(Debug, Clone)]
352enum FieldState {
353    NotPresent,
354    Present,
355    Deleted,
356    Modified,
357}
358
359impl CanonicalOperationProcessor {
360    /// Create a new canonical operation processor with input and output schemas.
361    #[must_use]
362    pub fn new(input_schema: HashSet<String>, output_schema: HashSet<String>) -> Self {
363        Self {
364            input_schema,
365            output_schema,
366            operations: Vec::new(),
367            canonical_ops: Vec::new(),
368            field_states: HashMap::new(),
369        }
370    }
371
372    /// Add an operation to be processed
373    pub fn add_operation(&mut self, op: DsonOperation) {
374        self.operations.push(op);
375    }
376
377    /// Compute canonical operation sequence
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if an operation is invalid, such as:
382    /// - Modifying a non-existent field
383    /// - Modifying a deleted field
384    pub fn compute_canonical(&mut self) -> Result<&[DsonOperation]> {
385        // Reset state
386        self.canonical_ops.clear();
387        self.field_states.clear();
388
389        // Process each operation
390        let operations = self.operations.clone(); // Clone to avoid borrowing issues
391        for op in operations {
392            self.process_operation(op)?;
393        }
394
395        // Filter for output schema
396        self.filter_output_schema();
397
398        Ok(&self.canonical_ops)
399    }
400
401    fn process_operation(&mut self, op: DsonOperation) -> Result<()> {
402        match op {
403            // Field operations - track state and optimize
404            DsonOperation::FieldAdd { path, value } => {
405                self.process_field_add(path, value);
406            }
407            DsonOperation::FieldModify { path, value } => {
408                self.process_field_modify(path, value)?;
409            }
410            DsonOperation::FieldDelete { path } => {
411                self.process_field_delete(path);
412            }
413
414            // Presence operations - always pass through (they don't modify)
415            DsonOperation::CheckPresence { .. }
416            | DsonOperation::CheckAbsence { .. }
417            | DsonOperation::CheckNull { .. }
418            | DsonOperation::CheckNotNull { .. } => {
419                self.canonical_ops.push(op);
420            }
421
422            // CRDT MergeField needs special handling
423            DsonOperation::MergeField {
424                path,
425                value,
426                timestamp,
427            } => {
428                self.process_merge_field(path, value, timestamp);
429            }
430
431            // Batch operations - recurse into the batch
432            DsonOperation::BatchExecute { operations } => {
433                for batch_op in operations {
434                    self.process_operation(batch_op)?;
435                }
436            }
437
438            // All other operations with paths - filter by input schema
439            DsonOperation::ObjectStart { ref path }
440            | DsonOperation::ObjectEnd { ref path }
441            | DsonOperation::ArrayStart { ref path }
442            | DsonOperation::ArrayEnd { ref path }
443            | DsonOperation::ArrayInsert { ref path, .. }
444            | DsonOperation::ArrayRemove { ref path, .. }
445            | DsonOperation::ArrayReplace { ref path, .. }
446            | DsonOperation::ConflictResolve { ref path, .. }
447            | DsonOperation::ArrayBuild { ref path, .. }
448            | DsonOperation::ArrayFilter { ref path, .. }
449            | DsonOperation::ArrayMap { ref path, .. }
450            | DsonOperation::ArrayReduce { ref path, .. }
451            | DsonOperation::StreamBuild { ref path, .. }
452            | DsonOperation::StreamFilter { ref path, .. }
453            | DsonOperation::StreamMap { ref path, .. }
454            | DsonOperation::StreamEmit { ref path, .. } => {
455                if self.should_process_path(path) {
456                    self.canonical_ops.push(op);
457                }
458            }
459        }
460        Ok(())
461    }
462
463    fn process_field_add(&mut self, path: String, value: OperationValue) {
464        if !self.should_process_path(&path) {
465            return; // Skip if not in input schema
466        }
467
468        let current_state = self
469            .field_states
470            .get(&path)
471            .cloned()
472            .unwrap_or(FieldState::NotPresent);
473
474        match current_state {
475            FieldState::NotPresent => {
476                // Normal add
477                self.field_states.insert(path.clone(), FieldState::Present);
478                self.canonical_ops
479                    .push(DsonOperation::FieldAdd { path, value });
480            }
481            FieldState::Deleted => {
482                // Add after delete - becomes modify
483                self.field_states.insert(path.clone(), FieldState::Present);
484                self.canonical_ops
485                    .push(DsonOperation::FieldModify { path, value });
486            }
487            FieldState::Present | FieldState::Modified => {
488                // Add after existing - becomes modify
489                self.field_states.insert(path.clone(), FieldState::Modified);
490                self.canonical_ops
491                    .push(DsonOperation::FieldModify { path, value });
492            }
493        }
494    }
495
496    fn process_field_modify(&mut self, path: String, value: OperationValue) -> Result<()> {
497        if !self.should_process_path(&path) {
498            return Ok(()); // Skip if not in input schema
499        }
500
501        let current_state = self
502            .field_states
503            .get(&path)
504            .cloned()
505            .unwrap_or(FieldState::NotPresent);
506
507        match current_state {
508            FieldState::NotPresent => {
509                // Modify of non-existent field - error unless checking absence
510                return Err(DsonError::InvalidOperation(format!(
511                    "Cannot modify non-existent field: {path}"
512                )));
513            }
514            FieldState::Deleted => {
515                // Modify after delete - invalid
516                return Err(DsonError::InvalidOperation(format!(
517                    "Cannot modify deleted field: {path}"
518                )));
519            }
520            FieldState::Present | FieldState::Modified => {
521                // Normal modify
522                self.field_states.insert(path.clone(), FieldState::Modified);
523                self.canonical_ops
524                    .push(DsonOperation::FieldModify { path, value });
525            }
526        }
527        Ok(())
528    }
529
530    fn process_field_delete(&mut self, path: String) {
531        if !self.should_process_path(&path) {
532            return; // Skip if not in input schema
533        }
534
535        let current_state = self
536            .field_states
537            .get(&path)
538            .cloned()
539            .unwrap_or(FieldState::NotPresent);
540
541        match current_state {
542            FieldState::NotPresent => {
543                // Delete of non-existent field - mark as deleted for future operations
544                self.field_states.insert(path, FieldState::Deleted);
545                // Don't add the delete operation since it was a no-op, but track the state
546            }
547            FieldState::Deleted => {
548                // Already deleted - no-op
549            }
550            FieldState::Present | FieldState::Modified => {
551                // Normal delete
552                self.field_states.insert(path.clone(), FieldState::Deleted);
553                self.canonical_ops.push(DsonOperation::FieldDelete { path });
554            }
555        }
556    }
557
558    fn process_merge_field(&mut self, path: String, value: OperationValue, timestamp: u64) {
559        if !self.should_process_path(&path) {
560            return; // Skip if not in input schema
561        }
562
563        // For CRDT merges, we always add the operation
564        // Conflict resolution happens at merge time
565        self.canonical_ops.push(DsonOperation::MergeField {
566            path,
567            value,
568            timestamp,
569        });
570    }
571
572    fn should_process_path(&self, path: &str) -> bool {
573        // Check if path is in input schema (allows wildcards)
574        self.input_schema.contains(path)
575            || self.input_schema.iter().any(|schema_path| {
576                schema_path.ends_with(".*")
577                    && path.starts_with(&schema_path[..schema_path.len() - 2])
578            })
579    }
580
581    fn filter_output_schema(&mut self) {
582        // Remove operations for fields not in output schema
583        self.canonical_ops.retain(|op| {
584            match op {
585                DsonOperation::FieldAdd { path, .. }
586                | DsonOperation::FieldModify { path, .. }
587                | DsonOperation::FieldDelete { path }
588                | DsonOperation::ArrayInsert { path, .. }
589                | DsonOperation::ArrayRemove { path, .. }
590                | DsonOperation::ArrayReplace { path, .. }
591                | DsonOperation::MergeField { path, .. }
592                | DsonOperation::ConflictResolve { path, .. } => {
593                    self.output_schema.contains(path)
594                        || self.output_schema.iter().any(|schema_path| {
595                            schema_path.ends_with(".*")
596                                && path.starts_with(&schema_path[..schema_path.len() - 2])
597                        })
598                }
599                // Structural and presence operations always pass through
600                _ => true,
601            }
602        });
603    }
604}
605
606/// Operation sequence optimizer
607pub struct OperationOptimizer {
608    operations: Vec<DsonOperation>,
609}
610
611impl OperationOptimizer {
612    /// Create a new operation optimizer with the given operations.
613    #[must_use]
614    pub const fn new(operations: Vec<DsonOperation>) -> Self {
615        Self { operations }
616    }
617
618    /// Optimize operation sequence for zero-allocation processing
619    #[must_use]
620    pub fn optimize(mut self) -> Vec<DsonOperation> {
621        // Phase 1: Coalesce adjacent operations
622        self.coalesce_operations();
623
624        // Phase 2: Reorder for efficiency
625        self.reorder_for_efficiency();
626
627        // Phase 3: Remove redundant operations
628        self.remove_redundant_ops();
629
630        self.operations
631    }
632
633    fn remove_redundant_ops(&mut self) {
634        let mut i = 0;
635        while i < self.operations.len() {
636            if self.is_redundant_operation(i) {
637                self.operations.remove(i);
638            } else {
639                i += 1;
640            }
641        }
642    }
643
644    fn is_redundant_operation(&self, index: usize) -> bool {
645        let op = &self.operations[index];
646
647        match op {
648            DsonOperation::FieldAdd { path, .. } => {
649                // Check if this field is deleted later without being re-added
650                self.is_field_deleted_after_without_readd(path, index)
651            }
652            DsonOperation::FieldDelete { path } => {
653                // Check if this field is added/modified after (making the delete redundant)
654                self.is_field_modified_after(path, index)
655            }
656            // All other operations (including FieldModify) are never redundant
657            _ => false,
658        }
659    }
660
661    fn is_field_deleted_after_without_readd(&self, path: &str, start_index: usize) -> bool {
662        let mut saw_delete = false;
663        for i in start_index + 1..self.operations.len() {
664            match &self.operations[i] {
665                DsonOperation::FieldDelete { path: del_path } if del_path == path => {
666                    saw_delete = true;
667                }
668                DsonOperation::FieldAdd { path: add_path, .. } if add_path == path => {
669                    // If we see an add after a delete, it's not redundant
670                    if saw_delete {
671                        return false;
672                    }
673                }
674                _ => {}
675            }
676        }
677        saw_delete
678    }
679
680    fn is_field_modified_after(&self, path: &str, start_index: usize) -> bool {
681        for i in start_index + 1..self.operations.len() {
682            match &self.operations[i] {
683                DsonOperation::FieldAdd { path: add_path, .. } if add_path == path => {
684                    return true;
685                }
686                DsonOperation::FieldModify { path: mod_path, .. } if mod_path == path => {
687                    return true;
688                }
689                _ => {}
690            }
691        }
692        false
693    }
694
695    fn reorder_for_efficiency(&mut self) {
696        // Reorder operations to minimize tape navigation
697        // Group operations by path prefix for locality
698        let mut operations_with_paths: Vec<(String, DsonOperation)> = self
699            .operations
700            .drain(..)
701            .map(|op| {
702                let path = Self::get_operation_path(&op);
703                // For sorting, append a high character to End operations so they come after children
704                let sort_key = match op {
705                    DsonOperation::ObjectEnd { .. } | DsonOperation::ArrayEnd { .. } => {
706                        format!("{path}\u{10FFFF}")
707                    }
708                    _ => path,
709                };
710                (sort_key, op)
711            })
712            .collect();
713
714        operations_with_paths.sort_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
715
716        self.operations = operations_with_paths
717            .into_iter()
718            .map(|(_, op)| op)
719            .collect();
720    }
721
722    fn get_operation_path(op: &DsonOperation) -> String {
723        match op {
724            DsonOperation::ObjectStart { path }
725            | DsonOperation::ObjectEnd { path }
726            | DsonOperation::ArrayStart { path }
727            | DsonOperation::ArrayEnd { path }
728            | DsonOperation::FieldAdd { path, .. }
729            | DsonOperation::FieldModify { path, .. }
730            | DsonOperation::FieldDelete { path }
731            | DsonOperation::ArrayInsert { path, .. }
732            | DsonOperation::ArrayRemove { path, .. }
733            | DsonOperation::ArrayReplace { path, .. }
734            | DsonOperation::CheckPresence { path }
735            | DsonOperation::CheckAbsence { path }
736            | DsonOperation::CheckNull { path }
737            | DsonOperation::CheckNotNull { path }
738            | DsonOperation::MergeField { path, .. }
739            | DsonOperation::ConflictResolve { path, .. }
740            | DsonOperation::ArrayBuild { path, .. }
741            | DsonOperation::ArrayFilter { path, .. }
742            | DsonOperation::ArrayMap { path, .. }
743            | DsonOperation::ArrayReduce { path, .. }
744            | DsonOperation::StreamBuild { path, .. }
745            | DsonOperation::StreamFilter { path, .. }
746            | DsonOperation::StreamMap { path, .. }
747            | DsonOperation::StreamEmit { path, .. } => path.clone(),
748            DsonOperation::BatchExecute { .. } => "batch".to_string(),
749        }
750    }
751
752    fn coalesce_operations(&mut self) {
753        let mut i = 0;
754        while i + 1 < self.operations.len() {
755            if self.can_coalesce(i, i + 1) {
756                self.coalesce_pair(i, i + 1);
757                // Remove the second operation (now coalesced)
758                self.operations.remove(i + 1);
759            } else {
760                i += 1;
761            }
762        }
763    }
764
765    fn can_coalesce(&self, i: usize, j: usize) -> bool {
766        match (&self.operations[i], &self.operations[j]) {
767            (
768                DsonOperation::FieldModify { path: p1, .. },
769                DsonOperation::FieldModify { path: p2, .. },
770            ) => {
771                p1 == p2 // Multiple modifies on same field can be coalesced to last one
772            }
773            _ => false,
774        }
775    }
776
777    fn coalesce_pair(&mut self, i: usize, j: usize) {
778        // For multiple modifies, keep the last one
779        let (left, right) = self.operations.split_at_mut(j);
780        if let DsonOperation::FieldModify { value, .. } = &right[0]
781            && let DsonOperation::FieldModify {
782                value: old_value, ..
783            } = &mut left[i]
784        {
785            *old_value = value.clone();
786        }
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793
794    #[test]
795    fn test_canonical_delete_then_add() {
796        let mut processor = CanonicalOperationProcessor::new(
797            HashSet::from(["field1".to_string()]),
798            HashSet::from(["field1".to_string()]),
799        );
800
801        // Delete then add - should become modify
802        processor.add_operation(DsonOperation::FieldDelete {
803            path: "field1".to_string(),
804        });
805        processor.add_operation(DsonOperation::FieldAdd {
806            path: "field1".to_string(),
807            value: OperationValue::StringRef("value".to_string()),
808        });
809
810        let canonical = processor.compute_canonical().unwrap();
811
812        assert_eq!(canonical.len(), 1);
813        match &canonical[0] {
814            DsonOperation::FieldModify { path, .. } => assert_eq!(path, "field1"),
815            _ => panic!("Expected FieldModify"),
816        }
817    }
818
819    #[test]
820    fn test_input_schema_filtering() {
821        let mut processor = CanonicalOperationProcessor::new(
822            HashSet::from(["allowed".to_string()]), // Only "allowed" in input schema
823            HashSet::from(["allowed".to_string()]), // Only "allowed" in output schema
824        );
825
826        processor.add_operation(DsonOperation::FieldAdd {
827            path: "allowed".to_string(),
828            value: OperationValue::StringRef("ok".to_string()),
829        });
830        processor.add_operation(DsonOperation::FieldAdd {
831            path: "not_allowed".to_string(),
832            value: OperationValue::StringRef("filtered".to_string()),
833        });
834
835        let canonical = processor.compute_canonical().unwrap();
836
837        // Only the allowed operation should remain
838        assert_eq!(canonical.len(), 1);
839        match &canonical[0] {
840            DsonOperation::FieldAdd { path, .. } => assert_eq!(path, "allowed"),
841            _ => panic!("Expected FieldAdd for allowed field"),
842        }
843    }
844
845    #[test]
846    fn test_operation_optimizer() {
847        let operations = vec![
848            DsonOperation::FieldModify {
849                path: "field1".to_string(),
850                value: OperationValue::StringRef("first".to_string()),
851            },
852            DsonOperation::FieldModify {
853                path: "field1".to_string(),
854                value: OperationValue::StringRef("second".to_string()),
855            },
856            DsonOperation::FieldDelete {
857                path: "field1".to_string(),
858            },
859        ];
860
861        let optimizer = OperationOptimizer::new(operations);
862        let ops = optimizer.optimize();
863
864        // Should coalesce the two modifies and keep the delete
865        assert_eq!(ops.len(), 2);
866        match &ops[0] {
867            DsonOperation::FieldModify { value, .. } => match value {
868                OperationValue::StringRef(s) => assert_eq!(s, "second"),
869                _ => panic!("Expected StringRef with 'second'"),
870            },
871            _ => panic!("Expected FieldModify"),
872        }
873        match &ops[1] {
874            DsonOperation::FieldDelete { path } => assert_eq!(path, "field1"),
875            _ => panic!("Expected FieldDelete"),
876        }
877    }
878
879    #[test]
880    fn test_operation_value_equality() {
881        assert_eq!(OperationValue::Null, OperationValue::Null);
882        assert_eq!(OperationValue::BoolRef(true), OperationValue::BoolRef(true));
883        assert_ne!(
884            OperationValue::BoolRef(true),
885            OperationValue::BoolRef(false)
886        );
887        assert_eq!(
888            OperationValue::StringRef("test".to_string()),
889            OperationValue::StringRef("test".to_string())
890        );
891        assert_eq!(
892            OperationValue::NumberRef("42".to_string()),
893            OperationValue::NumberRef("42".to_string())
894        );
895        assert_eq!(
896            OperationValue::ObjectRef { start: 0, end: 10 },
897            OperationValue::ObjectRef { start: 0, end: 10 }
898        );
899        assert_eq!(
900            OperationValue::ArrayRef { start: 0, end: 5 },
901            OperationValue::ArrayRef { start: 0, end: 5 }
902        );
903    }
904
905    #[test]
906    fn test_merge_strategy_equality() {
907        assert_eq!(MergeStrategy::LastWriteWins, MergeStrategy::LastWriteWins);
908        assert_eq!(MergeStrategy::Additive, MergeStrategy::Additive);
909        assert_eq!(MergeStrategy::Max, MergeStrategy::Max);
910        assert_eq!(MergeStrategy::Min, MergeStrategy::Min);
911        assert_eq!(MergeStrategy::Union, MergeStrategy::Union);
912        assert_eq!(
913            MergeStrategy::Custom("test".to_string()),
914            MergeStrategy::Custom("test".to_string())
915        );
916    }
917
918    #[test]
919    fn test_filter_predicate_equality() {
920        assert_eq!(FilterPredicate::Even, FilterPredicate::Even);
921        assert_eq!(FilterPredicate::Odd, FilterPredicate::Odd);
922        assert_eq!(FilterPredicate::Alternate, FilterPredicate::Alternate);
923        assert_eq!(FilterPredicate::EveryNth(3), FilterPredicate::EveryNth(3));
924        assert_eq!(
925            FilterPredicate::GreaterThan(10),
926            FilterPredicate::GreaterThan(10)
927        );
928        assert_eq!(FilterPredicate::LessThan(5), FilterPredicate::LessThan(5));
929        assert_eq!(
930            FilterPredicate::Equals(OperationValue::Null),
931            FilterPredicate::Equals(OperationValue::Null)
932        );
933        assert_eq!(
934            FilterPredicate::Custom("custom".to_string()),
935            FilterPredicate::Custom("custom".to_string())
936        );
937    }
938
939    #[test]
940    fn test_transform_function_equality() {
941        assert_eq!(TransformFunction::Add(5), TransformFunction::Add(5));
942        assert_ne!(TransformFunction::Add(5), TransformFunction::Add(10));
943        assert_eq!(
944            TransformFunction::Multiply(2),
945            TransformFunction::Multiply(2)
946        );
947        assert_eq!(
948            TransformFunction::ToUppercase,
949            TransformFunction::ToUppercase
950        );
951        assert_eq!(
952            TransformFunction::ToLowercase,
953            TransformFunction::ToLowercase
954        );
955        assert_eq!(
956            TransformFunction::Append("x".to_string()),
957            TransformFunction::Append("x".to_string())
958        );
959        assert_eq!(
960            TransformFunction::Prepend("y".to_string()),
961            TransformFunction::Prepend("y".to_string())
962        );
963        assert_eq!(
964            TransformFunction::Custom("c".to_string()),
965            TransformFunction::Custom("c".to_string())
966        );
967        assert_ne!(TransformFunction::Add(1), TransformFunction::Multiply(1));
968    }
969
970    #[test]
971    fn test_reduce_function_equality() {
972        assert_eq!(ReduceFunction::Sum, ReduceFunction::Sum);
973        assert_eq!(ReduceFunction::Product, ReduceFunction::Product);
974        assert_eq!(ReduceFunction::Min, ReduceFunction::Min);
975        assert_eq!(ReduceFunction::Max, ReduceFunction::Max);
976        assert_eq!(ReduceFunction::Count, ReduceFunction::Count);
977        assert_eq!(ReduceFunction::Concat, ReduceFunction::Concat);
978        assert_eq!(
979            ReduceFunction::Custom("r".to_string()),
980            ReduceFunction::Custom("r".to_string())
981        );
982        assert_ne!(ReduceFunction::Sum, ReduceFunction::Product);
983    }
984
985    #[test]
986    fn test_stream_generator_equality() {
987        assert_eq!(
988            StreamGenerator::Range {
989                start: 0,
990                end: 10,
991                step: 1
992            },
993            StreamGenerator::Range {
994                start: 0,
995                end: 10,
996                step: 1
997            }
998        );
999        assert_ne!(
1000            StreamGenerator::Range {
1001                start: 0,
1002                end: 10,
1003                step: 1
1004            },
1005            StreamGenerator::Range {
1006                start: 0,
1007                end: 20,
1008                step: 1
1009            }
1010        );
1011        assert_eq!(
1012            StreamGenerator::Repeat(OperationValue::Null, 5),
1013            StreamGenerator::Repeat(OperationValue::Null, 5)
1014        );
1015        assert_eq!(
1016            StreamGenerator::Fibonacci(10),
1017            StreamGenerator::Fibonacci(10)
1018        );
1019        assert_eq!(
1020            StreamGenerator::Custom("g".to_string()),
1021            StreamGenerator::Custom("g".to_string())
1022        );
1023    }
1024
1025    #[test]
1026    fn test_dson_operation_clone() {
1027        let op = DsonOperation::FieldAdd {
1028            path: "test".to_string(),
1029            value: OperationValue::Null,
1030        };
1031        let cloned = op.clone();
1032        assert_eq!(op, cloned);
1033    }
1034
1035    #[test]
1036    fn test_dson_operation_debug() {
1037        let op = DsonOperation::FieldAdd {
1038            path: "test".to_string(),
1039            value: OperationValue::Null,
1040        };
1041        let debug = format!("{op:?}");
1042        assert!(debug.contains("FieldAdd"));
1043    }
1044
1045    #[test]
1046    fn test_canonical_processor_structural_ops() {
1047        let mut processor = CanonicalOperationProcessor::new(
1048            HashSet::from(["obj".to_string()]),
1049            HashSet::from(["obj".to_string()]),
1050        );
1051
1052        processor.add_operation(DsonOperation::ObjectStart {
1053            path: "obj".to_string(),
1054        });
1055        processor.add_operation(DsonOperation::ObjectEnd {
1056            path: "obj".to_string(),
1057        });
1058
1059        let canonical = processor.compute_canonical().unwrap();
1060        assert_eq!(canonical.len(), 2);
1061    }
1062
1063    #[test]
1064    fn test_canonical_processor_array_structural() {
1065        let mut processor = CanonicalOperationProcessor::new(
1066            HashSet::from(["arr".to_string()]),
1067            HashSet::from(["arr".to_string()]),
1068        );
1069
1070        processor.add_operation(DsonOperation::ArrayStart {
1071            path: "arr".to_string(),
1072        });
1073        processor.add_operation(DsonOperation::ArrayEnd {
1074            path: "arr".to_string(),
1075        });
1076
1077        let canonical = processor.compute_canonical().unwrap();
1078        assert_eq!(canonical.len(), 2);
1079    }
1080
1081    #[test]
1082    fn test_canonical_processor_array_operations() {
1083        let mut processor = CanonicalOperationProcessor::new(
1084            HashSet::from(["items".to_string()]),
1085            HashSet::from(["items".to_string()]),
1086        );
1087
1088        processor.add_operation(DsonOperation::ArrayInsert {
1089            path: "items".to_string(),
1090            index: 0,
1091            value: OperationValue::Null,
1092        });
1093        processor.add_operation(DsonOperation::ArrayRemove {
1094            path: "items".to_string(),
1095            index: 0,
1096        });
1097        processor.add_operation(DsonOperation::ArrayReplace {
1098            path: "items".to_string(),
1099            index: 0,
1100            value: OperationValue::Null,
1101        });
1102
1103        let canonical = processor.compute_canonical().unwrap();
1104        assert_eq!(canonical.len(), 3);
1105    }
1106
1107    #[test]
1108    fn test_canonical_processor_presence_ops() {
1109        let mut processor = CanonicalOperationProcessor::new(
1110            HashSet::from(["field".to_string()]),
1111            HashSet::from(["field".to_string()]),
1112        );
1113
1114        processor.add_operation(DsonOperation::CheckPresence {
1115            path: "field".to_string(),
1116        });
1117        processor.add_operation(DsonOperation::CheckAbsence {
1118            path: "field".to_string(),
1119        });
1120        processor.add_operation(DsonOperation::CheckNull {
1121            path: "field".to_string(),
1122        });
1123        processor.add_operation(DsonOperation::CheckNotNull {
1124            path: "field".to_string(),
1125        });
1126
1127        let canonical = processor.compute_canonical().unwrap();
1128        assert_eq!(canonical.len(), 4);
1129    }
1130
1131    #[test]
1132    fn test_canonical_processor_merge_field() {
1133        let mut processor = CanonicalOperationProcessor::new(
1134            HashSet::from(["field".to_string()]),
1135            HashSet::from(["field".to_string()]),
1136        );
1137
1138        processor.add_operation(DsonOperation::MergeField {
1139            path: "field".to_string(),
1140            value: OperationValue::Null,
1141            timestamp: 1,
1142        });
1143
1144        let canonical = processor.compute_canonical().unwrap();
1145        assert_eq!(canonical.len(), 1);
1146    }
1147
1148    #[test]
1149    fn test_canonical_processor_conflict_resolve() {
1150        let mut processor = CanonicalOperationProcessor::new(
1151            HashSet::from(["field".to_string()]),
1152            HashSet::from(["field".to_string()]),
1153        );
1154
1155        processor.add_operation(DsonOperation::ConflictResolve {
1156            path: "field".to_string(),
1157            strategy: MergeStrategy::LastWriteWins,
1158        });
1159
1160        let canonical = processor.compute_canonical().unwrap();
1161        assert_eq!(canonical.len(), 1);
1162    }
1163
1164    #[test]
1165    fn test_canonical_processor_advanced_array_ops() {
1166        let mut processor = CanonicalOperationProcessor::new(
1167            HashSet::from(["arr".to_string()]),
1168            HashSet::from(["arr".to_string()]),
1169        );
1170
1171        processor.add_operation(DsonOperation::ArrayBuild {
1172            path: "arr".to_string(),
1173            elements: vec![OperationValue::Null],
1174        });
1175        processor.add_operation(DsonOperation::ArrayFilter {
1176            path: "arr".to_string(),
1177            predicate: FilterPredicate::Even,
1178        });
1179        processor.add_operation(DsonOperation::ArrayMap {
1180            path: "arr".to_string(),
1181            transform: TransformFunction::Add(1),
1182        });
1183        processor.add_operation(DsonOperation::ArrayReduce {
1184            path: "arr".to_string(),
1185            initial: OperationValue::NumberRef("0".to_string()),
1186            reducer: ReduceFunction::Sum,
1187        });
1188
1189        let canonical = processor.compute_canonical().unwrap();
1190        assert_eq!(canonical.len(), 4);
1191    }
1192
1193    #[test]
1194    fn test_canonical_processor_streaming_ops() {
1195        let mut processor = CanonicalOperationProcessor::new(
1196            HashSet::from(["stream".to_string()]),
1197            HashSet::from(["stream".to_string()]),
1198        );
1199
1200        processor.add_operation(DsonOperation::StreamBuild {
1201            path: "stream".to_string(),
1202            generator: StreamGenerator::Fibonacci(10),
1203        });
1204        processor.add_operation(DsonOperation::StreamFilter {
1205            path: "stream".to_string(),
1206            predicate: FilterPredicate::Even,
1207        });
1208        processor.add_operation(DsonOperation::StreamMap {
1209            path: "stream".to_string(),
1210            transform: TransformFunction::Multiply(2),
1211        });
1212        processor.add_operation(DsonOperation::StreamEmit {
1213            path: "stream".to_string(),
1214            batch_size: 10,
1215        });
1216
1217        let canonical = processor.compute_canonical().unwrap();
1218        assert_eq!(canonical.len(), 4);
1219    }
1220
1221    #[test]
1222    fn test_canonical_processor_batch_execute() {
1223        let mut processor = CanonicalOperationProcessor::new(
1224            HashSet::from(["field".to_string()]),
1225            HashSet::from(["field".to_string()]),
1226        );
1227
1228        processor.add_operation(DsonOperation::BatchExecute {
1229            operations: vec![DsonOperation::FieldAdd {
1230                path: "field".to_string(),
1231                value: OperationValue::Null,
1232            }],
1233        });
1234
1235        let canonical = processor.compute_canonical().unwrap();
1236        assert_eq!(canonical.len(), 1);
1237    }
1238
1239    #[test]
1240    fn test_canonical_processor_wildcard_schema() {
1241        let mut processor = CanonicalOperationProcessor::new(
1242            HashSet::from(["user.*".to_string()]),
1243            HashSet::from(["user.*".to_string()]),
1244        );
1245
1246        processor.add_operation(DsonOperation::FieldAdd {
1247            path: "user.name".to_string(),
1248            value: OperationValue::StringRef("test".to_string()),
1249        });
1250
1251        let canonical = processor.compute_canonical().unwrap();
1252        assert_eq!(canonical.len(), 1);
1253    }
1254
1255    #[test]
1256    fn test_canonical_processor_add_then_add() {
1257        let mut processor = CanonicalOperationProcessor::new(
1258            HashSet::from(["field".to_string()]),
1259            HashSet::from(["field".to_string()]),
1260        );
1261
1262        processor.add_operation(DsonOperation::FieldAdd {
1263            path: "field".to_string(),
1264            value: OperationValue::StringRef("first".to_string()),
1265        });
1266        processor.add_operation(DsonOperation::FieldAdd {
1267            path: "field".to_string(),
1268            value: OperationValue::StringRef("second".to_string()),
1269        });
1270
1271        let canonical = processor.compute_canonical().unwrap();
1272        // Second add becomes modify
1273        assert_eq!(canonical.len(), 2);
1274    }
1275
1276    #[test]
1277    fn test_operation_optimizer_remove_redundant() {
1278        let operations = vec![
1279            DsonOperation::FieldAdd {
1280                path: "field".to_string(),
1281                value: OperationValue::Null,
1282            },
1283            DsonOperation::FieldDelete {
1284                path: "field".to_string(),
1285            },
1286        ];
1287
1288        let optimizer = OperationOptimizer::new(operations);
1289        let ops = optimizer.optimize();
1290        // Add followed by delete - add is redundant
1291        assert_eq!(ops.len(), 1);
1292    }
1293
1294    #[test]
1295    fn test_operation_optimizer_delete_then_add() {
1296        let operations = vec![
1297            DsonOperation::FieldDelete {
1298                path: "field".to_string(),
1299            },
1300            DsonOperation::FieldAdd {
1301                path: "field".to_string(),
1302                value: OperationValue::Null,
1303            },
1304        ];
1305
1306        let optimizer = OperationOptimizer::new(operations);
1307        let ops = optimizer.optimize();
1308        // Delete followed by add - delete is redundant
1309        assert!(ops.len() <= 2);
1310    }
1311
1312    #[test]
1313    fn test_operation_value_clone() {
1314        let val = OperationValue::StringRef("test".to_string());
1315        let cloned = val.clone();
1316        assert_eq!(val, cloned);
1317    }
1318
1319    #[test]
1320    fn test_merge_strategy_clone() {
1321        let strategy = MergeStrategy::LastWriteWins;
1322        let cloned = strategy.clone();
1323        assert_eq!(strategy, cloned);
1324    }
1325
1326    #[test]
1327    fn test_filter_predicate_clone() {
1328        let pred = FilterPredicate::Even;
1329        let cloned = pred.clone();
1330        assert_eq!(pred, cloned);
1331    }
1332
1333    #[test]
1334    fn test_transform_function_clone() {
1335        let transform = TransformFunction::Add(5);
1336        let cloned = transform.clone();
1337        assert_eq!(transform, cloned);
1338    }
1339
1340    #[test]
1341    fn test_reduce_function_clone() {
1342        let reduce = ReduceFunction::Sum;
1343        let cloned = reduce.clone();
1344        assert_eq!(reduce, cloned);
1345    }
1346
1347    #[test]
1348    fn test_stream_generator_clone() {
1349        let generator = StreamGenerator::Fibonacci(10);
1350        let cloned = generator.clone();
1351        assert_eq!(generator, cloned);
1352    }
1353
1354    #[test]
1355    fn test_field_state_clone() {
1356        let state = FieldState::Present;
1357        let cloned = state;
1358        assert!(matches!(cloned, FieldState::Present));
1359    }
1360
1361    #[test]
1362    fn test_transform_function_debug() {
1363        let transform = TransformFunction::Add(5);
1364        let debug = format!("{transform:?}");
1365        assert!(debug.contains("Add"));
1366    }
1367
1368    #[test]
1369    fn test_reduce_function_debug() {
1370        let reduce = ReduceFunction::Sum;
1371        let debug = format!("{reduce:?}");
1372        assert!(debug.contains("Sum"));
1373    }
1374
1375    #[test]
1376    fn test_stream_generator_debug() {
1377        let generator = StreamGenerator::Fibonacci(10);
1378        let debug = format!("{generator:?}");
1379        assert!(debug.contains("Fibonacci"));
1380    }
1381
1382    #[test]
1383    fn test_canonical_processor_modify_nonexistent_field() {
1384        let mut processor = CanonicalOperationProcessor::new(
1385            HashSet::from(["field".to_string()]),
1386            HashSet::from(["field".to_string()]),
1387        );
1388
1389        processor.add_operation(DsonOperation::FieldModify {
1390            path: "field".to_string(),
1391            value: OperationValue::StringRef("value".to_string()),
1392        });
1393
1394        let result = processor.compute_canonical();
1395        assert!(result.is_err());
1396        let err = result.unwrap_err();
1397        assert!(format!("{err:?}").contains("non-existent"));
1398    }
1399
1400    #[test]
1401    fn test_canonical_processor_modify_deleted_field() {
1402        let mut processor = CanonicalOperationProcessor::new(
1403            HashSet::from(["field".to_string()]),
1404            HashSet::from(["field".to_string()]),
1405        );
1406
1407        // First add then delete the field
1408        processor.add_operation(DsonOperation::FieldAdd {
1409            path: "field".to_string(),
1410            value: OperationValue::StringRef("initial".to_string()),
1411        });
1412        processor.add_operation(DsonOperation::FieldDelete {
1413            path: "field".to_string(),
1414        });
1415        processor.add_operation(DsonOperation::FieldModify {
1416            path: "field".to_string(),
1417            value: OperationValue::StringRef("modified".to_string()),
1418        });
1419
1420        let result = processor.compute_canonical();
1421        assert!(result.is_err());
1422        let err = result.unwrap_err();
1423        assert!(format!("{err:?}").contains("deleted"));
1424    }
1425
1426    #[test]
1427    fn test_canonical_processor_delete_already_deleted() {
1428        let mut processor = CanonicalOperationProcessor::new(
1429            HashSet::from(["field".to_string()]),
1430            HashSet::from(["field".to_string()]),
1431        );
1432
1433        // Add, delete, then delete again
1434        processor.add_operation(DsonOperation::FieldAdd {
1435            path: "field".to_string(),
1436            value: OperationValue::StringRef("value".to_string()),
1437        });
1438        processor.add_operation(DsonOperation::FieldDelete {
1439            path: "field".to_string(),
1440        });
1441        processor.add_operation(DsonOperation::FieldDelete {
1442            path: "field".to_string(),
1443        });
1444
1445        let canonical = processor.compute_canonical().unwrap();
1446        // Should have add and one delete only (second delete is no-op)
1447        assert_eq!(canonical.len(), 2);
1448    }
1449
1450    #[test]
1451    fn test_canonical_processor_output_schema_filtering() {
1452        let mut processor = CanonicalOperationProcessor::new(
1453            HashSet::from(["input".to_string(), "output".to_string()]),
1454            HashSet::from(["output".to_string()]), // Only "output" in output schema
1455        );
1456
1457        processor.add_operation(DsonOperation::FieldAdd {
1458            path: "input".to_string(),
1459            value: OperationValue::StringRef("input_val".to_string()),
1460        });
1461        processor.add_operation(DsonOperation::FieldAdd {
1462            path: "output".to_string(),
1463            value: OperationValue::StringRef("output_val".to_string()),
1464        });
1465
1466        let canonical = processor.compute_canonical().unwrap();
1467        // Only output field should remain
1468        assert_eq!(canonical.len(), 1);
1469        match &canonical[0] {
1470            DsonOperation::FieldAdd { path, .. } => assert_eq!(path, "output"),
1471            _ => panic!("Expected FieldAdd"),
1472        }
1473    }
1474
1475    #[test]
1476    fn test_canonical_processor_output_wildcard_filtering() {
1477        let mut processor = CanonicalOperationProcessor::new(
1478            HashSet::from(["user.*".to_string()]),
1479            HashSet::from(["user.name".to_string()]), // Only user.name in output
1480        );
1481
1482        processor.add_operation(DsonOperation::FieldAdd {
1483            path: "user.name".to_string(),
1484            value: OperationValue::StringRef("Alice".to_string()),
1485        });
1486        processor.add_operation(DsonOperation::FieldAdd {
1487            path: "user.email".to_string(),
1488            value: OperationValue::StringRef("alice@example.com".to_string()),
1489        });
1490
1491        let canonical = processor.compute_canonical().unwrap();
1492        // Only user.name should remain
1493        assert_eq!(canonical.len(), 1);
1494    }
1495
1496    #[test]
1497    fn test_canonical_processor_merge_field_filtered() {
1498        let mut processor = CanonicalOperationProcessor::new(
1499            HashSet::from(["field".to_string()]),
1500            HashSet::from([]), // Empty output schema
1501        );
1502
1503        processor.add_operation(DsonOperation::MergeField {
1504            path: "field".to_string(),
1505            value: OperationValue::Null,
1506            timestamp: 1,
1507        });
1508
1509        let canonical = processor.compute_canonical().unwrap();
1510        assert_eq!(canonical.len(), 0); // Filtered out by output schema
1511    }
1512
1513    #[test]
1514    fn test_canonical_processor_conflict_resolve_filtered() {
1515        let mut processor = CanonicalOperationProcessor::new(
1516            HashSet::from(["field".to_string()]),
1517            HashSet::from([]), // Empty output schema
1518        );
1519
1520        processor.add_operation(DsonOperation::ConflictResolve {
1521            path: "field".to_string(),
1522            strategy: MergeStrategy::Max,
1523        });
1524
1525        let canonical = processor.compute_canonical().unwrap();
1526        assert_eq!(canonical.len(), 0); // Filtered out by output schema
1527    }
1528
1529    #[test]
1530    fn test_canonical_processor_array_insert_filtered() {
1531        let mut processor = CanonicalOperationProcessor::new(
1532            HashSet::from(["arr".to_string()]),
1533            HashSet::from([]), // Empty output schema
1534        );
1535
1536        processor.add_operation(DsonOperation::ArrayInsert {
1537            path: "arr".to_string(),
1538            index: 0,
1539            value: OperationValue::Null,
1540        });
1541
1542        let canonical = processor.compute_canonical().unwrap();
1543        assert_eq!(canonical.len(), 0); // Filtered out by output schema
1544    }
1545
1546    #[test]
1547    fn test_canonical_processor_array_remove_filtered() {
1548        let mut processor = CanonicalOperationProcessor::new(
1549            HashSet::from(["arr".to_string()]),
1550            HashSet::from([]), // Empty output schema
1551        );
1552
1553        processor.add_operation(DsonOperation::ArrayRemove {
1554            path: "arr".to_string(),
1555            index: 0,
1556        });
1557
1558        let canonical = processor.compute_canonical().unwrap();
1559        assert_eq!(canonical.len(), 0); // Filtered out by output schema
1560    }
1561
1562    #[test]
1563    fn test_canonical_processor_array_replace_filtered() {
1564        let mut processor = CanonicalOperationProcessor::new(
1565            HashSet::from(["arr".to_string()]),
1566            HashSet::from([]), // Empty output schema
1567        );
1568
1569        processor.add_operation(DsonOperation::ArrayReplace {
1570            path: "arr".to_string(),
1571            index: 0,
1572            value: OperationValue::Null,
1573        });
1574
1575        let canonical = processor.compute_canonical().unwrap();
1576        assert_eq!(canonical.len(), 0); // Filtered out by output schema
1577    }
1578
1579    #[test]
1580    fn test_field_state_not_present_clone() {
1581        let state = FieldState::NotPresent;
1582        let cloned = state;
1583        assert!(matches!(cloned, FieldState::NotPresent));
1584    }
1585
1586    #[test]
1587    fn test_field_state_deleted_clone() {
1588        let state = FieldState::Deleted;
1589        let cloned = state;
1590        assert!(matches!(cloned, FieldState::Deleted));
1591    }
1592
1593    #[test]
1594    fn test_field_state_modified_clone() {
1595        let state = FieldState::Modified;
1596        let cloned = state;
1597        assert!(matches!(cloned, FieldState::Modified));
1598    }
1599
1600    #[test]
1601    fn test_field_state_debug() {
1602        assert!(format!("{:?}", FieldState::NotPresent).contains("NotPresent"));
1603        assert!(format!("{:?}", FieldState::Present).contains("Present"));
1604        assert!(format!("{:?}", FieldState::Deleted).contains("Deleted"));
1605        assert!(format!("{:?}", FieldState::Modified).contains("Modified"));
1606    }
1607
1608    #[test]
1609    fn test_operation_value_debug() {
1610        assert!(format!("{:?}", OperationValue::Null).contains("Null"));
1611        assert!(format!("{:?}", OperationValue::BoolRef(true)).contains("BoolRef"));
1612        assert!(format!("{:?}", OperationValue::StringRef("s".to_string())).contains("StringRef"));
1613        assert!(format!("{:?}", OperationValue::NumberRef("1".to_string())).contains("NumberRef"));
1614        assert!(
1615            format!("{:?}", OperationValue::ObjectRef { start: 0, end: 1 }).contains("ObjectRef")
1616        );
1617        assert!(
1618            format!("{:?}", OperationValue::ArrayRef { start: 0, end: 1 }).contains("ArrayRef")
1619        );
1620    }
1621
1622    #[test]
1623    fn test_merge_strategy_debug() {
1624        assert!(format!("{:?}", MergeStrategy::LastWriteWins).contains("LastWriteWins"));
1625        assert!(format!("{:?}", MergeStrategy::Additive).contains("Additive"));
1626        assert!(format!("{:?}", MergeStrategy::Max).contains("Max"));
1627        assert!(format!("{:?}", MergeStrategy::Min).contains("Min"));
1628        assert!(format!("{:?}", MergeStrategy::Union).contains("Union"));
1629        assert!(format!("{:?}", MergeStrategy::Custom("c".to_string())).contains("Custom"));
1630    }
1631
1632    #[test]
1633    fn test_filter_predicate_debug() {
1634        assert!(format!("{:?}", FilterPredicate::EveryNth(2)).contains("EveryNth"));
1635        assert!(format!("{:?}", FilterPredicate::Alternate).contains("Alternate"));
1636        assert!(format!("{:?}", FilterPredicate::Even).contains("Even"));
1637        assert!(format!("{:?}", FilterPredicate::Odd).contains("Odd"));
1638        assert!(format!("{:?}", FilterPredicate::GreaterThan(5)).contains("GreaterThan"));
1639        assert!(format!("{:?}", FilterPredicate::LessThan(5)).contains("LessThan"));
1640        assert!(format!("{:?}", FilterPredicate::Equals(OperationValue::Null)).contains("Equals"));
1641        assert!(format!("{:?}", FilterPredicate::Custom("c".to_string())).contains("Custom"));
1642    }
1643
1644    #[test]
1645    fn test_transform_function_inequality() {
1646        // Test cross-variant inequality
1647        assert_ne!(
1648            TransformFunction::ToUppercase,
1649            TransformFunction::ToLowercase
1650        );
1651        assert_ne!(TransformFunction::Add(1), TransformFunction::ToUppercase);
1652        assert_ne!(
1653            TransformFunction::Multiply(1),
1654            TransformFunction::ToLowercase
1655        );
1656        assert_ne!(
1657            TransformFunction::Append("a".to_string()),
1658            TransformFunction::Prepend("a".to_string())
1659        );
1660        assert_ne!(
1661            TransformFunction::Custom("a".to_string()),
1662            TransformFunction::Add(1)
1663        );
1664    }
1665
1666    #[test]
1667    fn test_reduce_function_inequality() {
1668        // Test cross-variant inequality
1669        assert_ne!(ReduceFunction::Sum, ReduceFunction::Min);
1670        assert_ne!(ReduceFunction::Product, ReduceFunction::Max);
1671        assert_ne!(ReduceFunction::Count, ReduceFunction::Concat);
1672        assert_ne!(ReduceFunction::Custom("a".to_string()), ReduceFunction::Sum);
1673    }
1674
1675    #[test]
1676    fn test_stream_generator_inequality() {
1677        // Test cross-variant inequality
1678        assert_ne!(
1679            StreamGenerator::Fibonacci(10),
1680            StreamGenerator::Custom("c".to_string())
1681        );
1682        assert_ne!(
1683            StreamGenerator::Range {
1684                start: 0,
1685                end: 10,
1686                step: 1
1687            },
1688            StreamGenerator::Repeat(OperationValue::Null, 5)
1689        );
1690        assert_ne!(
1691            StreamGenerator::Repeat(OperationValue::Null, 5),
1692            StreamGenerator::Fibonacci(5)
1693        );
1694    }
1695
1696    #[test]
1697    fn test_operation_optimizer_add_delete_readd() {
1698        let operations = vec![
1699            DsonOperation::FieldAdd {
1700                path: "field".to_string(),
1701                value: OperationValue::StringRef("first".to_string()),
1702            },
1703            DsonOperation::FieldDelete {
1704                path: "field".to_string(),
1705            },
1706            DsonOperation::FieldAdd {
1707                path: "field".to_string(),
1708                value: OperationValue::StringRef("second".to_string()),
1709            },
1710        ];
1711
1712        let optimizer = OperationOptimizer::new(operations);
1713        let ops = optimizer.optimize();
1714        // Add, delete, add - add is not redundant because there's a readd after delete
1715        assert!(ops.len() >= 2);
1716    }
1717
1718    #[test]
1719    fn test_operation_optimizer_delete_then_modify() {
1720        let operations = vec![
1721            DsonOperation::FieldDelete {
1722                path: "field".to_string(),
1723            },
1724            DsonOperation::FieldModify {
1725                path: "field".to_string(),
1726                value: OperationValue::Null,
1727            },
1728        ];
1729
1730        let optimizer = OperationOptimizer::new(operations);
1731        let ops = optimizer.optimize();
1732        // Delete followed by modify - delete is redundant
1733        assert_eq!(ops.len(), 1);
1734    }
1735
1736    #[test]
1737    fn test_operation_optimizer_reorder_paths() {
1738        let operations = vec![
1739            DsonOperation::FieldAdd {
1740                path: "z.field".to_string(),
1741                value: OperationValue::Null,
1742            },
1743            DsonOperation::FieldAdd {
1744                path: "a.field".to_string(),
1745                value: OperationValue::Null,
1746            },
1747        ];
1748
1749        let optimizer = OperationOptimizer::new(operations);
1750        let ops = optimizer.optimize();
1751        // Should be reordered by path
1752        assert_eq!(ops.len(), 2);
1753        match &ops[0] {
1754            DsonOperation::FieldAdd { path, .. } => assert_eq!(path, "a.field"),
1755            _ => panic!("Expected FieldAdd"),
1756        }
1757    }
1758
1759    #[test]
1760    fn test_operation_optimizer_batch_execute_path() {
1761        let operations = vec![DsonOperation::BatchExecute { operations: vec![] }];
1762
1763        let optimizer = OperationOptimizer::new(operations);
1764        let ops = optimizer.optimize();
1765        assert_eq!(ops.len(), 1);
1766    }
1767
1768    #[test]
1769    fn test_operation_optimizer_object_end_ordering() {
1770        let operations = vec![
1771            DsonOperation::ObjectEnd {
1772                path: "obj".to_string(),
1773            },
1774            DsonOperation::FieldAdd {
1775                path: "obj.child".to_string(),
1776                value: OperationValue::Null,
1777            },
1778        ];
1779
1780        let optimizer = OperationOptimizer::new(operations);
1781        let ops = optimizer.optimize();
1782        // Object end should come after its children
1783        assert_eq!(ops.len(), 2);
1784    }
1785
1786    #[test]
1787    fn test_operation_optimizer_array_end_ordering() {
1788        let operations = vec![
1789            DsonOperation::ArrayEnd {
1790                path: "arr".to_string(),
1791            },
1792            DsonOperation::ArrayInsert {
1793                path: "arr[0]".to_string(),
1794                index: 0,
1795                value: OperationValue::Null,
1796            },
1797        ];
1798
1799        let optimizer = OperationOptimizer::new(operations);
1800        let ops = optimizer.optimize();
1801        assert_eq!(ops.len(), 2);
1802    }
1803
1804    #[test]
1805    fn test_canonical_processor_not_in_input_schema() {
1806        let mut processor = CanonicalOperationProcessor::new(
1807            HashSet::from(["allowed".to_string()]), // Only "allowed" in input schema
1808            HashSet::from(["allowed".to_string(), "not_allowed".to_string()]),
1809        );
1810
1811        // Try structural operations on a path not in input schema
1812        processor.add_operation(DsonOperation::ObjectStart {
1813            path: "not_allowed".to_string(),
1814        });
1815        processor.add_operation(DsonOperation::ObjectEnd {
1816            path: "not_allowed".to_string(),
1817        });
1818        processor.add_operation(DsonOperation::ArrayStart {
1819            path: "not_allowed".to_string(),
1820        });
1821        processor.add_operation(DsonOperation::ArrayEnd {
1822            path: "not_allowed".to_string(),
1823        });
1824
1825        let canonical = processor.compute_canonical().unwrap();
1826        // None should be added because they're not in input schema
1827        assert_eq!(canonical.len(), 0);
1828    }
1829
1830    #[test]
1831    fn test_canonical_processor_array_ops_not_in_input() {
1832        let mut processor = CanonicalOperationProcessor::new(
1833            HashSet::from(["allowed".to_string()]),
1834            HashSet::from(["allowed".to_string(), "arr".to_string()]),
1835        );
1836
1837        processor.add_operation(DsonOperation::ArrayInsert {
1838            path: "arr".to_string(),
1839            index: 0,
1840            value: OperationValue::Null,
1841        });
1842        processor.add_operation(DsonOperation::ArrayRemove {
1843            path: "arr".to_string(),
1844            index: 0,
1845        });
1846        processor.add_operation(DsonOperation::ArrayReplace {
1847            path: "arr".to_string(),
1848            index: 0,
1849            value: OperationValue::Null,
1850        });
1851
1852        let canonical = processor.compute_canonical().unwrap();
1853        // None should be added because arr is not in input schema
1854        assert_eq!(canonical.len(), 0);
1855    }
1856
1857    #[test]
1858    fn test_canonical_processor_conflict_resolve_not_in_input() {
1859        let mut processor = CanonicalOperationProcessor::new(
1860            HashSet::from([]), // Empty input schema
1861            HashSet::from(["field".to_string()]),
1862        );
1863
1864        processor.add_operation(DsonOperation::ConflictResolve {
1865            path: "field".to_string(),
1866            strategy: MergeStrategy::Max,
1867        });
1868
1869        let canonical = processor.compute_canonical().unwrap();
1870        assert_eq!(canonical.len(), 0);
1871    }
1872
1873    #[test]
1874    fn test_canonical_processor_advanced_array_not_in_input() {
1875        let mut processor = CanonicalOperationProcessor::new(
1876            HashSet::from([]), // Empty input schema
1877            HashSet::from(["arr".to_string()]),
1878        );
1879
1880        processor.add_operation(DsonOperation::ArrayBuild {
1881            path: "arr".to_string(),
1882            elements: vec![],
1883        });
1884        processor.add_operation(DsonOperation::ArrayFilter {
1885            path: "arr".to_string(),
1886            predicate: FilterPredicate::Even,
1887        });
1888        processor.add_operation(DsonOperation::ArrayMap {
1889            path: "arr".to_string(),
1890            transform: TransformFunction::Add(1),
1891        });
1892        processor.add_operation(DsonOperation::ArrayReduce {
1893            path: "arr".to_string(),
1894            initial: OperationValue::Null,
1895            reducer: ReduceFunction::Sum,
1896        });
1897
1898        let canonical = processor.compute_canonical().unwrap();
1899        assert_eq!(canonical.len(), 0);
1900    }
1901
1902    #[test]
1903    fn test_canonical_processor_streaming_not_in_input() {
1904        let mut processor = CanonicalOperationProcessor::new(
1905            HashSet::from([]), // Empty input schema
1906            HashSet::from(["stream".to_string()]),
1907        );
1908
1909        processor.add_operation(DsonOperation::StreamBuild {
1910            path: "stream".to_string(),
1911            generator: StreamGenerator::Fibonacci(10),
1912        });
1913        processor.add_operation(DsonOperation::StreamFilter {
1914            path: "stream".to_string(),
1915            predicate: FilterPredicate::Even,
1916        });
1917        processor.add_operation(DsonOperation::StreamMap {
1918            path: "stream".to_string(),
1919            transform: TransformFunction::Add(1),
1920        });
1921        processor.add_operation(DsonOperation::StreamEmit {
1922            path: "stream".to_string(),
1923            batch_size: 10,
1924        });
1925
1926        let canonical = processor.compute_canonical().unwrap();
1927        assert_eq!(canonical.len(), 0);
1928    }
1929
1930    #[test]
1931    #[allow(clippy::too_many_lines)] // Comprehensive variant coverage test
1932    fn test_dson_operation_all_variants_equality() {
1933        // Test equality for all DsonOperation variants
1934        assert_eq!(
1935            DsonOperation::ObjectStart {
1936                path: "a".to_string()
1937            },
1938            DsonOperation::ObjectStart {
1939                path: "a".to_string()
1940            }
1941        );
1942        assert_eq!(
1943            DsonOperation::ObjectEnd {
1944                path: "a".to_string()
1945            },
1946            DsonOperation::ObjectEnd {
1947                path: "a".to_string()
1948            }
1949        );
1950        assert_eq!(
1951            DsonOperation::ArrayStart {
1952                path: "a".to_string()
1953            },
1954            DsonOperation::ArrayStart {
1955                path: "a".to_string()
1956            }
1957        );
1958        assert_eq!(
1959            DsonOperation::ArrayEnd {
1960                path: "a".to_string()
1961            },
1962            DsonOperation::ArrayEnd {
1963                path: "a".to_string()
1964            }
1965        );
1966        assert_eq!(
1967            DsonOperation::ArrayInsert {
1968                path: "a".to_string(),
1969                index: 0,
1970                value: OperationValue::Null
1971            },
1972            DsonOperation::ArrayInsert {
1973                path: "a".to_string(),
1974                index: 0,
1975                value: OperationValue::Null
1976            }
1977        );
1978        assert_eq!(
1979            DsonOperation::ArrayRemove {
1980                path: "a".to_string(),
1981                index: 0
1982            },
1983            DsonOperation::ArrayRemove {
1984                path: "a".to_string(),
1985                index: 0
1986            }
1987        );
1988        assert_eq!(
1989            DsonOperation::ArrayReplace {
1990                path: "a".to_string(),
1991                index: 0,
1992                value: OperationValue::Null
1993            },
1994            DsonOperation::ArrayReplace {
1995                path: "a".to_string(),
1996                index: 0,
1997                value: OperationValue::Null
1998            }
1999        );
2000        assert_eq!(
2001            DsonOperation::CheckPresence {
2002                path: "a".to_string()
2003            },
2004            DsonOperation::CheckPresence {
2005                path: "a".to_string()
2006            }
2007        );
2008        assert_eq!(
2009            DsonOperation::CheckAbsence {
2010                path: "a".to_string()
2011            },
2012            DsonOperation::CheckAbsence {
2013                path: "a".to_string()
2014            }
2015        );
2016        assert_eq!(
2017            DsonOperation::CheckNull {
2018                path: "a".to_string()
2019            },
2020            DsonOperation::CheckNull {
2021                path: "a".to_string()
2022            }
2023        );
2024        assert_eq!(
2025            DsonOperation::CheckNotNull {
2026                path: "a".to_string()
2027            },
2028            DsonOperation::CheckNotNull {
2029                path: "a".to_string()
2030            }
2031        );
2032        assert_eq!(
2033            DsonOperation::MergeField {
2034                path: "a".to_string(),
2035                value: OperationValue::Null,
2036                timestamp: 1
2037            },
2038            DsonOperation::MergeField {
2039                path: "a".to_string(),
2040                value: OperationValue::Null,
2041                timestamp: 1
2042            }
2043        );
2044        assert_eq!(
2045            DsonOperation::ConflictResolve {
2046                path: "a".to_string(),
2047                strategy: MergeStrategy::Max
2048            },
2049            DsonOperation::ConflictResolve {
2050                path: "a".to_string(),
2051                strategy: MergeStrategy::Max
2052            }
2053        );
2054        assert_eq!(
2055            DsonOperation::ArrayBuild {
2056                path: "a".to_string(),
2057                elements: vec![]
2058            },
2059            DsonOperation::ArrayBuild {
2060                path: "a".to_string(),
2061                elements: vec![]
2062            }
2063        );
2064        assert_eq!(
2065            DsonOperation::ArrayFilter {
2066                path: "a".to_string(),
2067                predicate: FilterPredicate::Even
2068            },
2069            DsonOperation::ArrayFilter {
2070                path: "a".to_string(),
2071                predicate: FilterPredicate::Even
2072            }
2073        );
2074        assert_eq!(
2075            DsonOperation::ArrayMap {
2076                path: "a".to_string(),
2077                transform: TransformFunction::Add(1)
2078            },
2079            DsonOperation::ArrayMap {
2080                path: "a".to_string(),
2081                transform: TransformFunction::Add(1)
2082            }
2083        );
2084        assert_eq!(
2085            DsonOperation::ArrayReduce {
2086                path: "a".to_string(),
2087                initial: OperationValue::Null,
2088                reducer: ReduceFunction::Sum
2089            },
2090            DsonOperation::ArrayReduce {
2091                path: "a".to_string(),
2092                initial: OperationValue::Null,
2093                reducer: ReduceFunction::Sum
2094            }
2095        );
2096        assert_eq!(
2097            DsonOperation::StreamBuild {
2098                path: "a".to_string(),
2099                generator: StreamGenerator::Fibonacci(5)
2100            },
2101            DsonOperation::StreamBuild {
2102                path: "a".to_string(),
2103                generator: StreamGenerator::Fibonacci(5)
2104            }
2105        );
2106        assert_eq!(
2107            DsonOperation::StreamFilter {
2108                path: "a".to_string(),
2109                predicate: FilterPredicate::Odd
2110            },
2111            DsonOperation::StreamFilter {
2112                path: "a".to_string(),
2113                predicate: FilterPredicate::Odd
2114            }
2115        );
2116        assert_eq!(
2117            DsonOperation::StreamMap {
2118                path: "a".to_string(),
2119                transform: TransformFunction::ToUppercase
2120            },
2121            DsonOperation::StreamMap {
2122                path: "a".to_string(),
2123                transform: TransformFunction::ToUppercase
2124            }
2125        );
2126        assert_eq!(
2127            DsonOperation::StreamEmit {
2128                path: "a".to_string(),
2129                batch_size: 10
2130            },
2131            DsonOperation::StreamEmit {
2132                path: "a".to_string(),
2133                batch_size: 10
2134            }
2135        );
2136        assert_eq!(
2137            DsonOperation::BatchExecute { operations: vec![] },
2138            DsonOperation::BatchExecute { operations: vec![] }
2139        );
2140    }
2141
2142    #[test]
2143    fn test_operation_value_inequality() {
2144        assert_ne!(OperationValue::Null, OperationValue::BoolRef(true));
2145        assert_ne!(
2146            OperationValue::BoolRef(true),
2147            OperationValue::StringRef("s".to_string())
2148        );
2149        assert_ne!(
2150            OperationValue::StringRef("a".to_string()),
2151            OperationValue::NumberRef("1".to_string())
2152        );
2153        assert_ne!(
2154            OperationValue::ObjectRef { start: 0, end: 1 },
2155            OperationValue::ArrayRef { start: 0, end: 1 }
2156        );
2157    }
2158
2159    #[test]
2160    fn test_merge_strategy_inequality() {
2161        assert_ne!(MergeStrategy::LastWriteWins, MergeStrategy::Additive);
2162        assert_ne!(MergeStrategy::Max, MergeStrategy::Min);
2163        assert_ne!(MergeStrategy::Union, MergeStrategy::Custom("c".to_string()));
2164    }
2165
2166    #[test]
2167    fn test_filter_predicate_inequality() {
2168        assert_ne!(FilterPredicate::Even, FilterPredicate::Odd);
2169        assert_ne!(FilterPredicate::EveryNth(2), FilterPredicate::EveryNth(3));
2170        assert_ne!(
2171            FilterPredicate::GreaterThan(5),
2172            FilterPredicate::LessThan(5)
2173        );
2174        assert_ne!(FilterPredicate::Alternate, FilterPredicate::Even);
2175        assert_ne!(
2176            FilterPredicate::Custom("a".to_string()),
2177            FilterPredicate::Custom("b".to_string())
2178        );
2179    }
2180}