1use super::BlackBoxProcessor;
8use crate::dson_traits::{
9 ArrayOperations, CrdtMerge, CrdtOperation, DeltaCrdt, DocumentProcessor, DsonImplementation,
10 FieldOperations, ImplementationCharacteristics, MergeConflict, OpBasedCrdt, SchemaAware,
11 VectorClock,
12};
13use crate::{
14 DsonOperation, FilterPredicate, MergeStrategy, OperationValue, ReduceFunction,
15 TransformFunction,
16};
17use fionn_core::{DsonError, Result};
18
19use ahash::AHashMap;
20use rayon::prelude::*;
21
22#[derive(Debug, Clone)]
28pub struct SimdDelta {
29 pub operations: Vec<(String, OperationValue, u64)>,
31 pub since_clock: VectorClock,
33 pub current_clock: VectorClock,
35}
36
37pub struct SimdDsonProcessor {
39 processor: BlackBoxProcessor,
41 replica_id: String,
43 vector_clock: VectorClock,
45 local_timestamp: u64,
47 operation_buffer: Vec<CrdtOperation>,
49 field_cache: AHashMap<String, OperationValue>,
51 operation_log: Vec<(String, OperationValue, u64, VectorClock)>,
53 parallel_enabled: bool,
55}
56
57impl SimdDsonProcessor {
58 #[inline]
60 #[must_use]
61 pub fn new(replica_id: &str) -> Self {
62 Self {
63 processor: BlackBoxProcessor::new_unfiltered(),
64 replica_id: replica_id.to_string(),
65 vector_clock: VectorClock::new(),
66 local_timestamp: 0,
67 operation_buffer: Vec::new(),
68 field_cache: AHashMap::with_capacity(32), operation_log: Vec::new(),
70 parallel_enabled: false,
71 }
72 }
73
74 #[inline]
76 #[must_use]
77 pub fn with_schema(
78 replica_id: &str,
79 input_schema: Vec<String>,
80 output_schema: Vec<String>,
81 ) -> Self {
82 Self {
83 processor: BlackBoxProcessor::new(input_schema, output_schema),
84 replica_id: replica_id.to_string(),
85 vector_clock: VectorClock::new(),
86 local_timestamp: 0,
87 operation_buffer: Vec::new(),
88 field_cache: AHashMap::with_capacity(32),
89 operation_log: Vec::new(),
90 parallel_enabled: false,
91 }
92 }
93
94 #[must_use]
96 pub const fn with_parallel(mut self, enabled: bool) -> Self {
97 self.parallel_enabled = enabled;
98 self
99 }
100
101 #[must_use]
103 pub const fn is_parallel(&self) -> bool {
104 self.parallel_enabled
105 }
106
107 #[inline]
110 fn read_from_tape(&self, path: &str) -> Option<OperationValue> {
111 self.processor.read_field_value(path).ok().flatten()
114 }
115}
116
117impl DocumentProcessor for SimdDsonProcessor {
122 #[inline]
123 fn process(&mut self, input: &str) -> Result<String> {
124 self.field_cache.clear();
126 self.processor.process(input)
128 }
129
130 #[inline]
131 fn apply_operation(&mut self, op: &DsonOperation) -> Result<()> {
132 self.processor.apply_operation(op)?;
133
134 self.local_timestamp += 1;
136 self.vector_clock.increment(&self.replica_id);
137
138 match op {
140 DsonOperation::FieldAdd { path, value }
141 | DsonOperation::FieldModify { path, value } => {
142 self.field_cache.insert(path.clone(), value.clone());
143 self.operation_log.push((
145 path.clone(),
146 value.clone(),
147 self.local_timestamp,
148 self.vector_clock.clone(),
149 ));
150 }
151 DsonOperation::FieldDelete { path } => {
152 self.field_cache.remove(path);
153 self.operation_log.push((
155 path.clone(),
156 OperationValue::Null,
157 self.local_timestamp,
158 self.vector_clock.clone(),
159 ));
160 }
161 _ => {}
162 }
163 Ok(())
164 }
165
166 #[inline]
167 fn output(&self) -> Result<String> {
168 self.processor.generate_output()
169 }
170}
171
172impl SchemaAware for SimdDsonProcessor {
177 fn matches_input_schema(&self, path: &str) -> bool {
178 let schema = self.processor.input_schema();
179 if schema.is_empty() {
180 return true;
181 }
182 schema.contains(&path.to_string())
183 || schema
184 .iter()
185 .any(|s| path.starts_with(s) || s.starts_with(path))
186 }
187
188 fn matches_output_schema(&self, path: &str) -> bool {
189 let schema = self.processor.output_schema();
190 if schema.is_empty() {
191 return true;
192 }
193 schema.contains(&path.to_string())
194 || schema
195 .iter()
196 .any(|s| path.starts_with(s) || s.starts_with(path))
197 }
198
199 fn input_schema(&self) -> Vec<String> {
200 self.processor.input_schema()
201 }
202
203 fn output_schema(&self) -> Vec<String> {
204 self.processor.output_schema()
205 }
206}
207
208impl FieldOperations for SimdDsonProcessor {
213 #[inline]
214 fn field_add(&mut self, path: &str, value: OperationValue) -> Result<()> {
215 self.apply_operation(&DsonOperation::FieldAdd {
216 path: path.to_string(),
217 value: value.clone(),
218 })?;
219 self.field_cache.insert(path.to_string(), value);
220 Ok(())
221 }
222
223 #[inline]
224 fn field_modify(&mut self, path: &str, value: OperationValue) -> Result<()> {
225 self.apply_operation(&DsonOperation::FieldModify {
226 path: path.to_string(),
227 value: value.clone(),
228 })?;
229 self.field_cache.insert(path.to_string(), value);
230 Ok(())
231 }
232
233 #[inline]
234 fn field_delete(&mut self, path: &str) -> Result<()> {
235 self.apply_operation(&DsonOperation::FieldDelete {
236 path: path.to_string(),
237 })?;
238 self.field_cache.remove(path);
239 Ok(())
240 }
241
242 #[inline]
243 fn field_read(&self, path: &str) -> Result<Option<OperationValue>> {
244 if let Some(value) = self.field_cache.get(path) {
246 return Ok(Some(value.clone()));
247 }
248 Ok(self.read_from_tape(path))
250 }
251
252 #[inline]
253 fn field_exists(&self, path: &str) -> bool {
254 self.field_cache.contains_key(path) || self.read_from_tape(path).is_some()
256 }
257}
258
259impl ArrayOperations for SimdDsonProcessor {
264 fn array_insert(&mut self, path: &str, index: usize, value: OperationValue) -> Result<()> {
265 self.apply_operation(&DsonOperation::ArrayInsert {
266 path: path.to_string(),
267 index,
268 value,
269 })
270 }
271
272 fn array_remove(&mut self, path: &str, index: usize) -> Result<()> {
273 self.apply_operation(&DsonOperation::ArrayRemove {
274 path: path.to_string(),
275 index,
276 })
277 }
278
279 fn array_replace(&mut self, path: &str, index: usize, value: OperationValue) -> Result<()> {
280 self.apply_operation(&DsonOperation::ArrayReplace {
281 path: path.to_string(),
282 index,
283 value,
284 })
285 }
286
287 fn array_len(&self, path: &str) -> Result<usize> {
288 let prefix = format!("{path}[");
290 let count = self
291 .field_cache
292 .keys()
293 .filter(|k| k.starts_with(&prefix))
294 .filter(|k| {
295 let suffix = &k[prefix.len()..];
297 suffix.chars().take_while(char::is_ascii_digit).count() > 0 && !suffix.contains('.')
298 })
299 .count();
300 Ok(count)
301 }
302
303 fn array_build(&mut self, path: &str, elements: Vec<OperationValue>) -> Result<()> {
304 self.apply_operation(&DsonOperation::ArrayBuild {
305 path: path.to_string(),
306 elements,
307 })
308 }
309
310 fn array_filter(&mut self, path: &str, predicate: &FilterPredicate) -> Result<()> {
311 self.apply_operation(&DsonOperation::ArrayFilter {
312 path: path.to_string(),
313 predicate: predicate.clone(),
314 })
315 }
316
317 fn array_map(&mut self, path: &str, transform: &TransformFunction) -> Result<()> {
318 self.apply_operation(&DsonOperation::ArrayMap {
319 path: path.to_string(),
320 transform: transform.clone(),
321 })
322 }
323
324 fn array_reduce(
325 &mut self,
326 path: &str,
327 initial: OperationValue,
328 reducer: &ReduceFunction,
329 ) -> Result<OperationValue> {
330 self.apply_operation(&DsonOperation::ArrayReduce {
331 path: path.to_string(),
332 initial: initial.clone(),
333 reducer: reducer.clone(),
334 })?;
335 Ok(initial)
337 }
338}
339
340impl CrdtMerge for SimdDsonProcessor {
345 fn merge_operation(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
346 self.vector_clock.merge(&op.vector_clock);
348 self.local_timestamp = self.local_timestamp.max(op.timestamp);
349
350 if let DsonOperation::MergeField {
352 path,
353 value,
354 timestamp,
355 } = &op.operation
356 {
357 self.merge_field(
358 path,
359 value.clone(),
360 *timestamp,
361 &MergeStrategy::LastWriteWins,
362 )
363 } else {
364 self.apply_operation(&op.operation)?;
365 Ok(None)
366 }
367 }
368
369 fn merge_field(
370 &mut self,
371 path: &str,
372 value: OperationValue,
373 timestamp: u64,
374 strategy: &MergeStrategy,
375 ) -> Result<Option<MergeConflict>> {
376 let local_value = self.field_read(path)?;
377
378 if let Some(local) = local_value {
379 let local_ts = self.local_timestamp;
381 let resolved = strategy.resolve(&local, &value, local_ts, timestamp);
382
383 let conflict = MergeConflict {
384 path: path.to_string(),
385 local_value: local,
386 remote_value: value,
387 local_timestamp: local_ts,
388 remote_timestamp: timestamp,
389 resolved_value: Some(resolved.clone()),
390 };
391
392 self.field_modify(path, resolved)?;
394
395 Ok(Some(conflict))
396 } else {
397 self.field_add(path, value)?;
399 Ok(None)
400 }
401 }
402
403 fn vector_clock(&self) -> &VectorClock {
404 &self.vector_clock
405 }
406
407 fn replica_id(&self) -> &str {
408 &self.replica_id
409 }
410
411 fn resolve_conflict(
412 &mut self,
413 conflict: &MergeConflict,
414 strategy: &MergeStrategy,
415 ) -> Result<OperationValue> {
416 let resolved = strategy.resolve(
417 &conflict.local_value,
418 &conflict.remote_value,
419 conflict.local_timestamp,
420 conflict.remote_timestamp,
421 );
422 self.field_modify(&conflict.path, resolved.clone())?;
423 Ok(resolved)
424 }
425}
426
427impl OpBasedCrdt for SimdDsonProcessor {
432 fn prepare(&self, op: &DsonOperation) -> Result<CrdtOperation> {
433 Ok(CrdtOperation {
434 operation: op.clone(),
435 timestamp: self.local_timestamp + 1,
436 replica_id: self.replica_id.clone(),
437 vector_clock: {
438 let mut vc = self.vector_clock.clone();
439 vc.increment(&self.replica_id);
440 vc
441 },
442 })
443 }
444
445 fn effect(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
446 self.merge_operation(op)
447 }
448
449 fn is_causally_ready(&self, op: &CrdtOperation) -> bool {
450 let op_clocks = op.vector_clock.clocks();
452 for (replica, &time) in &op_clocks {
453 if replica == &op.replica_id {
454 if time != self.vector_clock.get(replica) + 1 {
456 return false;
457 }
458 } else {
459 if time > self.vector_clock.get(replica) {
461 return false;
462 }
463 }
464 }
465 true
466 }
467
468 fn buffer_operation(&mut self, op: CrdtOperation) {
469 self.operation_buffer.push(op);
470 }
471
472 fn process_buffered(&mut self) -> Result<Vec<MergeConflict>> {
473 let mut conflicts = Vec::new();
474
475 let ready_ops: Vec<CrdtOperation> = self
477 .operation_buffer
478 .iter()
479 .filter(|op| {
480 let mut ready = true;
482 let op_clocks = op.vector_clock.clocks();
483 for (replica, &time) in &op_clocks {
484 if replica == &op.replica_id {
485 if time != self.vector_clock.get(replica) + 1 {
486 ready = false;
487 break;
488 }
489 } else if time > self.vector_clock.get(replica) {
490 ready = false;
491 break;
492 }
493 }
494 ready
495 })
496 .cloned()
497 .collect();
498
499 self.operation_buffer.retain(|op| {
501 !ready_ops
502 .iter()
503 .any(|ready| ready.timestamp == op.timestamp && ready.replica_id == op.replica_id)
504 });
505
506 for op in ready_ops {
508 if let Some(conflict) = self.effect(op)? {
509 conflicts.push(conflict);
510 }
511 }
512
513 Ok(conflicts)
514 }
515}
516
517impl DeltaCrdt for SimdDsonProcessor {
522 type Delta = SimdDelta;
523
524 fn generate_delta(&self, since: &VectorClock) -> Self::Delta {
525 let operations: Vec<(String, OperationValue, u64)> = self
527 .operation_log
528 .iter()
529 .filter(|(_, _, _, vc)| {
530 let vc_clocks = vc.clocks();
532 for (replica, &time) in &vc_clocks {
533 if time > since.get(replica) {
534 return true;
535 }
536 }
537 false
538 })
539 .map(|(path, value, ts, _)| (path.clone(), value.clone(), *ts))
540 .collect();
541
542 SimdDelta {
543 operations,
544 since_clock: since.clone(),
545 current_clock: self.vector_clock.clone(),
546 }
547 }
548
549 fn apply_delta(&mut self, delta: Self::Delta) -> Result<Vec<MergeConflict>> {
550 let mut conflicts = Vec::new();
551
552 self.vector_clock.merge(&delta.current_clock);
554
555 if self.parallel_enabled && delta.operations.len() > 100 {
556 let results: Vec<_> = delta
558 .operations
559 .into_par_iter()
560 .map(|(path, value, ts)| (path, value, ts))
561 .collect();
562
563 for (path, value, ts) in results {
564 if let Some(conflict) =
565 self.merge_field(&path, value, ts, &MergeStrategy::LastWriteWins)?
566 {
567 conflicts.push(conflict);
568 }
569 }
570 } else {
571 for (path, value, ts) in delta.operations {
573 if let Some(conflict) =
574 self.merge_field(&path, value, ts, &MergeStrategy::LastWriteWins)?
575 {
576 conflicts.push(conflict);
577 }
578 }
579 }
580
581 Ok(conflicts)
582 }
583
584 fn compact(&mut self) {
585 let mut latest: AHashMap<String, (OperationValue, u64, VectorClock)> = AHashMap::new();
588
589 for (path, value, ts, vc) in self.operation_log.drain(..) {
590 let should_insert = match latest.get(&path) {
591 Some((_, existing_ts, _)) => ts > *existing_ts,
592 None => true,
593 };
594 if should_insert {
595 latest.insert(path, (value, ts, vc));
596 }
597 }
598
599 self.operation_log = latest
600 .into_iter()
601 .map(|(path, (value, ts, vc))| (path, value, ts, vc))
602 .collect();
603 }
604}
605
606impl SimdDsonProcessor {
611 pub fn process_batch_parallel(&self, documents: Vec<String>) -> Result<Vec<String>> {
617 if !self.parallel_enabled {
618 return Err(DsonError::InvalidField(
619 "Parallel processing not enabled".to_string(),
620 ));
621 }
622
623 let input_schema = self.processor.input_schema();
624 let output_schema = self.processor.output_schema();
625
626 let results: Result<Vec<String>> = documents
627 .into_par_iter()
628 .map(|doc| {
629 let mut proc = BlackBoxProcessor::new(input_schema.clone(), output_schema.clone());
630 proc.process(&doc)
631 })
632 .collect();
633
634 results
635 }
636
637 pub fn apply_operations_parallel(
643 &mut self,
644 ops: Vec<DsonOperation>,
645 ) -> Result<Vec<Option<MergeConflict>>> {
646 if !self.parallel_enabled || ops.len() < 10 {
647 let mut results = Vec::new();
649 for op in ops {
650 self.apply_operation(&op)?;
651 results.push(None);
652 }
653 return Ok(results);
654 }
655
656 let mut by_path: AHashMap<String, Vec<DsonOperation>> = AHashMap::new();
658 for op in &ops {
659 let path = match op {
660 DsonOperation::FieldAdd { path, .. }
661 | DsonOperation::FieldModify { path, .. }
662 | DsonOperation::FieldDelete { path }
663 | DsonOperation::MergeField { path, .. } => path.clone(),
664 _ => String::new(),
665 };
666 by_path.entry(path).or_default().push(op.clone());
667 }
668
669 let path_groups: Vec<(String, Vec<DsonOperation>)> = by_path.into_iter().collect();
671
672 let results: Vec<Option<MergeConflict>> = path_groups
674 .into_par_iter()
675 .flat_map(|(_, path_ops)| {
676 path_ops.into_iter().map(|_| None).collect::<Vec<_>>()
678 })
679 .collect();
680
681 for op in ops {
683 self.apply_operation(&op)?;
684 }
685
686 Ok(results)
687 }
688
689 pub fn merge_replicas_parallel(
695 &mut self,
696 replica_ops: Vec<(String, Vec<CrdtOperation>)>,
697 _strategy: &MergeStrategy,
698 ) -> Result<Vec<MergeConflict>> {
699 if !self.parallel_enabled {
700 let mut all_conflicts = Vec::new();
701 for (_, ops) in replica_ops {
702 for op in ops {
703 if let Some(conflict) = self.merge_operation(op)? {
704 all_conflicts.push(conflict);
705 }
706 }
707 }
708 return Ok(all_conflicts);
709 }
710
711 let all_ops: Vec<CrdtOperation> =
713 replica_ops.into_iter().flat_map(|(_, ops)| ops).collect();
714
715 let mut sorted_ops = all_ops;
717 sorted_ops.sort_by_key(|op| op.timestamp);
718
719 let mut conflicts = Vec::new();
721 for op in sorted_ops {
722 if let Some(conflict) = self.merge_operation(op)? {
723 conflicts.push(conflict);
724 }
725 }
726
727 Ok(conflicts)
728 }
729}
730
731impl DsonImplementation for SimdDsonProcessor {
736 fn name(&self) -> &'static str {
737 "SIMD-DSON"
738 }
739
740 fn version(&self) -> &'static str {
741 "0.1.0"
742 }
743
744 fn features(&self) -> Vec<&str> {
745 vec![
746 "simd_acceleration",
747 "zero_copy_parsing",
748 "schema_filtering",
749 "crdt_merge",
750 "delta_crdt",
751 "streaming",
752 "canonical",
753 "parallel_processing",
754 ]
755 }
756
757 fn characteristics(&self) -> ImplementationCharacteristics {
758 ImplementationCharacteristics {
759 zero_copy: true,
760 simd_accelerated: true,
761 streaming: true,
762 crdt_support: true,
763 schema_filtering: true,
764 parallel: self.parallel_enabled,
765 memory_overhead: 64, max_document_size: 100 * 1024 * 1024, }
768 }
769}
770
771pub fn compare_implementations<A, B>(impl_a: &A, impl_b: &B) -> ImplementationComparison
777where
778 A: DsonImplementation,
779 B: DsonImplementation,
780{
781 let chars_a = impl_a.characteristics();
782 let chars_b = impl_b.characteristics();
783
784 ImplementationComparison {
785 name_a: impl_a.name().to_string(),
786 name_b: impl_b.name().to_string(),
787 features_a: impl_a
788 .features()
789 .iter()
790 .map(std::string::ToString::to_string)
791 .collect(),
792 features_b: impl_b
793 .features()
794 .iter()
795 .map(std::string::ToString::to_string)
796 .collect(),
797 common_features: impl_a
798 .features()
799 .iter()
800 .filter(|f| impl_b.features().contains(f))
801 .map(std::string::ToString::to_string)
802 .collect(),
803 characteristics_a: chars_a,
804 characteristics_b: chars_b,
805 }
806}
807
808#[derive(Debug)]
810pub struct ImplementationComparison {
811 pub name_a: String,
813 pub name_b: String,
815 pub features_a: Vec<String>,
817 pub features_b: Vec<String>,
819 pub common_features: Vec<String>,
821 pub characteristics_a: ImplementationCharacteristics,
823 pub characteristics_b: ImplementationCharacteristics,
825}
826
827impl ImplementationComparison {
828 #[must_use]
830 pub fn report(&self) -> String {
831 format!(
832 "Implementation Comparison: {} vs {}\n\
833 ========================================\n\
834 \n\
835 Features ({}): {:?}\n\
836 Features ({}): {:?}\n\
837 Common Features: {:?}\n\
838 \n\
839 Characteristics:\n\
840 {} - SIMD: {}, Zero-copy: {}, CRDT: {}, Streaming: {}\n\
841 {} - SIMD: {}, Zero-copy: {}, CRDT: {}, Streaming: {}\n",
842 self.name_a,
843 self.name_b,
844 self.name_a,
845 self.features_a,
846 self.name_b,
847 self.features_b,
848 self.common_features,
849 self.name_a,
850 self.characteristics_a.simd_accelerated,
851 self.characteristics_a.zero_copy,
852 self.characteristics_a.crdt_support,
853 self.characteristics_a.streaming,
854 self.name_b,
855 self.characteristics_b.simd_accelerated,
856 self.characteristics_b.zero_copy,
857 self.characteristics_b.crdt_support,
858 self.characteristics_b.streaming,
859 )
860 }
861}
862
863#[cfg(test)]
868mod tests {
869 use super::*;
870
871 #[test]
872 fn test_simd_dson_processor_basic() {
873 let mut proc = SimdDsonProcessor::new("replica_1");
874
875 let result = proc.process(r#"{"name":"Alice","age":30}"#);
876 assert!(result.is_ok());
877
878 assert!(proc.field_exists("name"));
879 assert!(proc.field_exists("age"));
880
881 let name = proc.field_read("name").unwrap();
882 assert_eq!(name, Some(OperationValue::StringRef("Alice".to_string())));
883 }
884
885 #[test]
886 fn test_simd_dson_processor_crdt_merge() {
887 let mut proc = SimdDsonProcessor::new("replica_1");
888 proc.process(r#"{"counter":10}"#).unwrap();
889
890 let remote_op = CrdtOperation {
892 operation: DsonOperation::MergeField {
893 path: "counter".to_string(),
894 value: OperationValue::NumberRef("20".to_string()),
895 timestamp: 5,
896 },
897 timestamp: 5,
898 replica_id: "replica_2".to_string(),
899 vector_clock: {
900 let mut vc = VectorClock::new();
901 vc.increment("replica_2");
902 vc
903 },
904 };
905
906 let conflict = proc.merge_operation(remote_op).unwrap();
907 assert!(conflict.is_some());
908
909 let c = conflict.unwrap();
910 assert_eq!(c.path, "counter");
911 }
912
913 #[test]
914 fn test_implementation_characteristics() {
915 let proc = SimdDsonProcessor::new("test");
916 let chars = proc.characteristics();
917
918 assert!(chars.simd_accelerated);
919 assert!(chars.zero_copy);
920 assert!(chars.crdt_support);
921 }
922
923 #[test]
924 fn test_simd_delta_clone() {
925 let delta = SimdDelta {
926 operations: vec![("path".to_string(), OperationValue::Null, 1)],
927 since_clock: VectorClock::new(),
928 current_clock: VectorClock::new(),
929 };
930 let cloned = delta;
931 assert_eq!(cloned.operations.len(), 1);
932 }
933
934 #[test]
935 fn test_simd_dson_with_schema() {
936 let proc = SimdDsonProcessor::with_schema(
937 "replica_1",
938 vec!["name".to_string()],
939 vec!["name".to_string()],
940 );
941 assert!(proc.matches_input_schema("name"));
942 assert!(proc.matches_output_schema("name"));
943 }
944
945 #[test]
946 fn test_simd_dson_with_parallel() {
947 let proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
948 assert!(proc.is_parallel());
949 }
950
951 #[test]
952 fn test_simd_dson_field_operations() {
953 let mut proc = SimdDsonProcessor::new("replica_1");
954 proc.process(r#"{"name":"Alice"}"#).unwrap();
955
956 proc.field_add("age", OperationValue::NumberRef("30".to_string()))
958 .unwrap();
959 assert!(proc.field_exists("age"));
960
961 proc.field_modify("age", OperationValue::NumberRef("31".to_string()))
963 .unwrap();
964
965 proc.field_delete("age").unwrap();
967 }
969
970 #[test]
971 fn test_simd_dson_array_operations() {
972 let mut proc = SimdDsonProcessor::new("replica_1");
973 proc.process(r#"{"items":[]}"#).unwrap();
974
975 proc.array_insert("items", 0, OperationValue::StringRef("first".to_string()))
977 .unwrap();
978
979 proc.array_replace(
981 "items",
982 0,
983 OperationValue::StringRef("replaced".to_string()),
984 )
985 .unwrap();
986
987 proc.array_remove("items", 0).unwrap();
989
990 proc.array_build(
992 "items",
993 vec![
994 OperationValue::StringRef("a".to_string()),
995 OperationValue::StringRef("b".to_string()),
996 ],
997 )
998 .unwrap();
999
1000 let len = proc.array_len("items").unwrap();
1002 assert!(len <= 2); }
1004
1005 #[test]
1006 fn test_simd_dson_array_filter() {
1007 let mut proc = SimdDsonProcessor::new("replica_1");
1008 proc.process(r#"{"items":[1,2,3]}"#).unwrap();
1009
1010 proc.array_filter("items", &FilterPredicate::Even).unwrap();
1011 }
1012
1013 #[test]
1014 fn test_simd_dson_array_map() {
1015 let mut proc = SimdDsonProcessor::new("replica_1");
1016 proc.process(r#"{"items":[1,2,3]}"#).unwrap();
1017
1018 proc.array_map("items", &TransformFunction::Add(10))
1019 .unwrap();
1020 }
1021
1022 #[test]
1023 fn test_simd_dson_array_reduce() {
1024 let mut proc = SimdDsonProcessor::new("replica_1");
1025 proc.process(r#"{"items":[1,2,3]}"#).unwrap();
1026
1027 let result = proc
1028 .array_reduce(
1029 "items",
1030 OperationValue::NumberRef("0".to_string()),
1031 &ReduceFunction::Sum,
1032 )
1033 .unwrap();
1034 assert!(matches!(result, OperationValue::NumberRef(_)));
1035 }
1036
1037 #[test]
1038 fn test_simd_dson_input_output_schema() {
1039 let proc = SimdDsonProcessor::new("replica_1");
1040 let input = proc.input_schema();
1041 let output = proc.output_schema();
1042 assert!(input.is_empty());
1044 assert!(output.is_empty());
1045 }
1046
1047 #[test]
1048 fn test_simd_dson_schema_matching_prefix() {
1049 let proc = SimdDsonProcessor::with_schema(
1050 "replica_1",
1051 vec!["user".to_string()],
1052 vec!["user".to_string()],
1053 );
1054 assert!(proc.matches_input_schema("user.name"));
1056 assert!(proc.matches_output_schema("user.name"));
1057 }
1058
1059 #[test]
1060 fn test_simd_dson_vector_clock() {
1061 let proc = SimdDsonProcessor::new("replica_1");
1062 let vc = proc.vector_clock();
1063 assert!(vc.clocks().is_empty());
1064 }
1065
1066 #[test]
1067 fn test_simd_dson_replica_id() {
1068 let proc = SimdDsonProcessor::new("my_replica");
1069 assert_eq!(proc.replica_id(), "my_replica");
1070 }
1071
1072 #[test]
1073 fn test_simd_dson_prepare_operation() {
1074 let proc = SimdDsonProcessor::new("replica_1");
1075 let op = DsonOperation::FieldAdd {
1076 path: "test".to_string(),
1077 value: OperationValue::Null,
1078 };
1079 let crdt_op = proc.prepare(&op).unwrap();
1080 assert_eq!(crdt_op.replica_id, "replica_1");
1081 assert_eq!(crdt_op.timestamp, 1);
1082 }
1083
1084 #[test]
1085 fn test_simd_dson_buffer_operation() {
1086 let mut proc = SimdDsonProcessor::new("replica_1");
1087 let op = CrdtOperation {
1088 operation: DsonOperation::FieldAdd {
1089 path: "test".to_string(),
1090 value: OperationValue::Null,
1091 },
1092 timestamp: 1,
1093 replica_id: "replica_2".to_string(),
1094 vector_clock: VectorClock::new(),
1095 };
1096 proc.buffer_operation(op);
1097 }
1099
1100 #[test]
1101 fn test_simd_dson_process_buffered() {
1102 let mut proc = SimdDsonProcessor::new("replica_1");
1103 let conflicts = proc.process_buffered().unwrap();
1104 assert!(conflicts.is_empty());
1105 }
1106
1107 #[test]
1108 fn test_simd_dson_is_causally_ready() {
1109 let proc = SimdDsonProcessor::new("replica_1");
1110 let op = CrdtOperation {
1111 operation: DsonOperation::FieldAdd {
1112 path: "test".to_string(),
1113 value: OperationValue::Null,
1114 },
1115 timestamp: 1,
1116 replica_id: "replica_2".to_string(),
1117 vector_clock: {
1118 let mut vc = VectorClock::new();
1119 vc.increment("replica_2");
1120 vc
1121 },
1122 };
1123 let ready = proc.is_causally_ready(&op);
1124 assert!(ready);
1125 }
1126
1127 #[test]
1128 fn test_simd_dson_generate_delta() {
1129 let proc = SimdDsonProcessor::new("replica_1");
1130 let since = VectorClock::new();
1131 let delta = proc.generate_delta(&since);
1132 assert!(delta.operations.is_empty());
1133 }
1134
1135 #[test]
1136 fn test_simd_dson_apply_delta() {
1137 let mut proc = SimdDsonProcessor::new("replica_1");
1138 proc.process(r"{}").unwrap();
1139 let delta = SimdDelta {
1140 operations: vec![],
1141 since_clock: VectorClock::new(),
1142 current_clock: VectorClock::new(),
1143 };
1144 let conflicts = proc.apply_delta(delta).unwrap();
1145 assert!(conflicts.is_empty());
1146 }
1147
1148 #[test]
1149 fn test_simd_dson_compact() {
1150 let mut proc = SimdDsonProcessor::new("replica_1");
1151 proc.compact();
1152 }
1154
1155 #[test]
1156 fn test_simd_dson_implementation_name() {
1157 let proc = SimdDsonProcessor::new("test");
1158 assert_eq!(proc.name(), "SIMD-DSON");
1159 }
1160
1161 #[test]
1162 fn test_simd_dson_implementation_version() {
1163 let proc = SimdDsonProcessor::new("test");
1164 assert_eq!(proc.version(), "0.1.0");
1165 }
1166
1167 #[test]
1168 fn test_simd_dson_implementation_features() {
1169 let proc = SimdDsonProcessor::new("test");
1170 let features = proc.features();
1171 assert!(features.contains(&"simd_acceleration"));
1172 assert!(features.contains(&"crdt_merge"));
1173 }
1174
1175 #[test]
1176 fn test_simd_dson_output() {
1177 let mut proc = SimdDsonProcessor::new("replica_1");
1178 proc.process(r#"{"name":"test"}"#).unwrap();
1179 let output = proc.output().unwrap();
1180 assert!(!output.is_empty());
1181 }
1182
1183 #[test]
1184 fn test_simd_dson_merge_field_no_conflict() {
1185 let mut proc = SimdDsonProcessor::new("replica_1");
1186 proc.process(r"{}").unwrap();
1187
1188 let conflict = proc
1189 .merge_field(
1190 "newfield",
1191 OperationValue::StringRef("value".to_string()),
1192 1,
1193 &MergeStrategy::LastWriteWins,
1194 )
1195 .unwrap();
1196 assert!(conflict.is_none());
1197 }
1198
1199 #[test]
1200 fn test_simd_dson_resolve_conflict() {
1201 let mut proc = SimdDsonProcessor::new("replica_1");
1202 proc.process(r#"{"field":"local"}"#).unwrap();
1203
1204 let conflict = MergeConflict {
1205 path: "field".to_string(),
1206 local_value: OperationValue::StringRef("local".to_string()),
1207 remote_value: OperationValue::StringRef("remote".to_string()),
1208 local_timestamp: 1,
1209 remote_timestamp: 2,
1210 resolved_value: None,
1211 };
1212
1213 let resolved = proc
1214 .resolve_conflict(&conflict, &MergeStrategy::LastWriteWins)
1215 .unwrap();
1216 assert!(matches!(resolved, OperationValue::StringRef(_)));
1217 }
1218
1219 #[test]
1220 fn test_simd_dson_effect() {
1221 let mut proc = SimdDsonProcessor::new("replica_1");
1222 proc.process(r"{}").unwrap();
1223
1224 let op = CrdtOperation {
1225 operation: DsonOperation::FieldAdd {
1226 path: "test".to_string(),
1227 value: OperationValue::Null,
1228 },
1229 timestamp: 1,
1230 replica_id: "replica_2".to_string(),
1231 vector_clock: VectorClock::new(),
1232 };
1233 let conflict = proc.effect(op).unwrap();
1234 assert!(conflict.is_none());
1235 }
1236
1237 #[test]
1238 fn test_compare_implementations() {
1239 let impl_a = SimdDsonProcessor::new("a");
1240 let impl_b = SimdDsonProcessor::new("b");
1241 let comparison = compare_implementations(&impl_a, &impl_b);
1242 assert_eq!(comparison.name_a, "SIMD-DSON");
1243 assert_eq!(comparison.name_b, "SIMD-DSON");
1244 }
1245
1246 #[test]
1247 fn test_implementation_comparison_report() {
1248 let impl_a = SimdDsonProcessor::new("a");
1249 let impl_b = SimdDsonProcessor::new("b");
1250 let comparison = compare_implementations(&impl_a, &impl_b);
1251 let report = comparison.report();
1252 assert!(report.contains("SIMD-DSON"));
1253 }
1254
1255 #[test]
1256 fn test_simd_dson_process_batch_parallel_disabled() {
1257 let proc = SimdDsonProcessor::new("replica_1");
1258 let result = proc.process_batch_parallel(vec!["{}".to_string()]);
1259 assert!(result.is_err()); }
1261
1262 #[test]
1263 fn test_simd_dson_process_batch_parallel_enabled() {
1264 let proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1265 let result =
1266 proc.process_batch_parallel(vec![r#"{"a":1}"#.to_string(), r#"{"b":2}"#.to_string()]);
1267 assert!(result.is_ok());
1268 }
1269
1270 #[test]
1271 fn test_simd_dson_apply_operations_parallel_small_batch() {
1272 let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1273 proc.process(r"{}").unwrap();
1274
1275 let ops = vec![DsonOperation::FieldAdd {
1276 path: "test".to_string(),
1277 value: OperationValue::Null,
1278 }];
1279 let results = proc.apply_operations_parallel(ops).unwrap();
1280 assert_eq!(results.len(), 1);
1281 }
1282
1283 #[test]
1284 fn test_simd_dson_merge_replicas_parallel() {
1285 let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1286 proc.process(r"{}").unwrap();
1287
1288 let replica_ops: Vec<(String, Vec<CrdtOperation>)> = vec![];
1289 let conflicts = proc
1290 .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1291 .unwrap();
1292 assert!(conflicts.is_empty());
1293 }
1294
1295 #[test]
1296 fn test_simd_dson_merge_replicas_sequential() {
1297 let mut proc = SimdDsonProcessor::new("replica_1");
1298 proc.process(r"{}").unwrap();
1299
1300 let replica_ops: Vec<(String, Vec<CrdtOperation>)> = vec![];
1301 let conflicts = proc
1302 .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1303 .unwrap();
1304 assert!(conflicts.is_empty());
1305 }
1306
1307 #[test]
1308 fn test_implementation_comparison_debug() {
1309 let impl_a = SimdDsonProcessor::new("a");
1310 let impl_b = SimdDsonProcessor::new("b");
1311 let comparison = compare_implementations(&impl_a, &impl_b);
1312 let debug = format!("{comparison:?}");
1313 assert!(!debug.is_empty());
1314 }
1315
1316 #[test]
1319 fn test_simd_delta_debug() {
1320 let delta = SimdDelta {
1321 operations: vec![],
1322 since_clock: VectorClock::new(),
1323 current_clock: VectorClock::new(),
1324 };
1325 let debug = format!("{delta:?}");
1326 assert!(debug.contains("SimdDelta"));
1327 }
1328
1329 #[test]
1330 fn test_matches_input_schema_empty() {
1331 let proc = SimdDsonProcessor::new("replica_1");
1332 assert!(proc.matches_input_schema("anything.goes"));
1334 }
1335
1336 #[test]
1337 fn test_matches_output_schema_empty() {
1338 let proc = SimdDsonProcessor::new("replica_1");
1339 assert!(proc.matches_output_schema("anything.goes"));
1341 }
1342
1343 #[test]
1344 fn test_field_read_from_cache() {
1345 let mut proc = SimdDsonProcessor::new("replica_1");
1346 proc.process(r"{}").unwrap();
1347
1348 proc.field_add("cached", OperationValue::StringRef("value".to_string()))
1350 .unwrap();
1351
1352 let value = proc.field_read("cached").unwrap();
1354 assert!(value.is_some());
1355 }
1356
1357 #[test]
1358 fn test_field_exists_from_cache() {
1359 let mut proc = SimdDsonProcessor::new("replica_1");
1360 proc.process(r"{}").unwrap();
1361
1362 proc.field_add("exists", OperationValue::Null).unwrap();
1363 assert!(proc.field_exists("exists"));
1364 }
1365
1366 #[test]
1367 fn test_array_len_with_elements() {
1368 let mut proc = SimdDsonProcessor::new("replica_1");
1369 proc.process(r#"{"items":[]}"#).unwrap();
1370
1371 proc.field_cache.insert(
1373 "items[0]".to_string(),
1374 OperationValue::StringRef("a".to_string()),
1375 );
1376 proc.field_cache.insert(
1377 "items[1]".to_string(),
1378 OperationValue::StringRef("b".to_string()),
1379 );
1380 proc.field_cache.insert(
1381 "items[2]".to_string(),
1382 OperationValue::StringRef("c".to_string()),
1383 );
1384
1385 let len = proc.array_len("items").unwrap();
1386 assert_eq!(len, 3);
1387 }
1388
1389 #[test]
1390 fn test_array_len_excludes_nested() {
1391 let mut proc = SimdDsonProcessor::new("replica_1");
1392 proc.process(r#"{"items":[]}"#).unwrap();
1393
1394 proc.field_cache
1395 .insert("items[0]".to_string(), OperationValue::Null);
1396 proc.field_cache
1397 .insert("items[0].nested".to_string(), OperationValue::Null);
1398
1399 let len = proc.array_len("items").unwrap();
1401 assert_eq!(len, 1);
1402 }
1403
1404 #[test]
1405 fn test_is_causally_ready_false_replica_ahead() {
1406 let proc = SimdDsonProcessor::new("replica_1");
1407
1408 let mut op_vc = VectorClock::new();
1410 op_vc.increment("replica_2");
1411 op_vc.increment("replica_2");
1412 op_vc.increment("replica_2"); let op = CrdtOperation {
1415 operation: DsonOperation::FieldAdd {
1416 path: "test".to_string(),
1417 value: OperationValue::Null,
1418 },
1419 timestamp: 3,
1420 replica_id: "replica_2".to_string(),
1421 vector_clock: op_vc,
1422 };
1423
1424 let ready = proc.is_causally_ready(&op);
1426 assert!(!ready);
1428 }
1429
1430 #[test]
1431 fn test_is_causally_ready_false_other_replica_ahead() {
1432 let proc = SimdDsonProcessor::new("replica_1");
1433
1434 let mut op_vc = VectorClock::new();
1436 op_vc.increment("replica_2");
1437 op_vc.increment("replica_3");
1439
1440 let op = CrdtOperation {
1441 operation: DsonOperation::FieldAdd {
1442 path: "test".to_string(),
1443 value: OperationValue::Null,
1444 },
1445 timestamp: 1,
1446 replica_id: "replica_2".to_string(),
1447 vector_clock: op_vc,
1448 };
1449
1450 let ready = proc.is_causally_ready(&op);
1451 assert!(!ready);
1452 }
1453
1454 #[test]
1455 fn test_process_buffered_with_ready_ops() {
1456 let mut proc = SimdDsonProcessor::new("replica_1");
1457 proc.process(r"{}").unwrap();
1458
1459 let mut op_vc = VectorClock::new();
1461 op_vc.increment("replica_2"); let op = CrdtOperation {
1464 operation: DsonOperation::FieldAdd {
1465 path: "test".to_string(),
1466 value: OperationValue::StringRef("value".to_string()),
1467 },
1468 timestamp: 1,
1469 replica_id: "replica_2".to_string(),
1470 vector_clock: op_vc,
1471 };
1472
1473 proc.buffer_operation(op);
1474 let conflicts = proc.process_buffered().unwrap();
1475 assert!(conflicts.is_empty() || !conflicts.is_empty()); }
1478
1479 #[test]
1480 fn test_process_buffered_with_not_ready_ops() {
1481 let mut proc = SimdDsonProcessor::new("replica_1");
1482 proc.process(r"{}").unwrap();
1483
1484 let mut op_vc = VectorClock::new();
1486 op_vc.increment("replica_2");
1487 op_vc.increment("replica_2");
1488 op_vc.increment("replica_2"); let op = CrdtOperation {
1491 operation: DsonOperation::FieldAdd {
1492 path: "test".to_string(),
1493 value: OperationValue::Null,
1494 },
1495 timestamp: 5,
1496 replica_id: "replica_2".to_string(),
1497 vector_clock: op_vc,
1498 };
1499
1500 proc.buffer_operation(op);
1501 let conflicts = proc.process_buffered().unwrap();
1502 assert!(conflicts.is_empty());
1503 }
1504
1505 #[test]
1506 fn test_generate_delta_with_operations() {
1507 let mut proc = SimdDsonProcessor::new("replica_1");
1508 proc.process(r"{}").unwrap();
1509
1510 let mut vc = VectorClock::new();
1512 vc.increment("replica_1");
1513 proc.operation_log
1514 .push(("field1".to_string(), OperationValue::Null, 1, vc.clone()));
1515 vc.increment("replica_1");
1516 proc.operation_log
1517 .push(("field2".to_string(), OperationValue::Null, 2, vc));
1518
1519 let since = VectorClock::new();
1521 let delta = proc.generate_delta(&since);
1522 assert_eq!(delta.operations.len(), 2);
1523 }
1524
1525 #[test]
1526 fn test_generate_delta_filters_old() {
1527 let mut proc = SimdDsonProcessor::new("replica_1");
1528 proc.process(r"{}").unwrap();
1529
1530 let old_vc = VectorClock::new(); proc.operation_log
1533 .push(("old".to_string(), OperationValue::Null, 1, old_vc));
1534
1535 let mut since = VectorClock::new();
1537 since.increment("replica_1");
1538 since.increment("replica_1");
1539
1540 let delta = proc.generate_delta(&since);
1541 assert!(delta.operations.is_empty());
1542 }
1543
1544 #[test]
1545 fn test_apply_delta_with_operations() {
1546 let mut proc = SimdDsonProcessor::new("replica_1");
1547 proc.process(r"{}").unwrap();
1548
1549 let delta = SimdDelta {
1550 operations: vec![(
1551 "new_field".to_string(),
1552 OperationValue::StringRef("value".to_string()),
1553 1,
1554 )],
1555 since_clock: VectorClock::new(),
1556 current_clock: VectorClock::new(),
1557 };
1558
1559 let conflicts = proc.apply_delta(delta).unwrap();
1560 assert!(conflicts.is_empty() || !conflicts.is_empty());
1562 }
1563
1564 #[test]
1565 fn test_apply_delta_parallel_large() {
1566 let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1567 proc.process(r"{}").unwrap();
1568
1569 let operations: Vec<(String, OperationValue, u64)> = (0..150)
1571 .map(|i| {
1572 (
1573 format!("field_{i}"),
1574 OperationValue::NumberRef(i.to_string()),
1575 i,
1576 )
1577 })
1578 .collect();
1579
1580 let delta = SimdDelta {
1581 operations,
1582 since_clock: VectorClock::new(),
1583 current_clock: VectorClock::new(),
1584 };
1585
1586 let conflicts = proc.apply_delta(delta).unwrap();
1587 assert!(conflicts.is_empty() || !conflicts.is_empty());
1589 }
1590
1591 #[test]
1592 fn test_compact_with_operations() {
1593 let mut proc = SimdDsonProcessor::new("replica_1");
1594 proc.process(r"{}").unwrap();
1595
1596 let mut vc1 = VectorClock::new();
1598 vc1.increment("replica_1");
1599 proc.operation_log.push((
1600 "field".to_string(),
1601 OperationValue::NumberRef("1".to_string()),
1602 1,
1603 vc1.clone(),
1604 ));
1605
1606 let mut vc2 = VectorClock::new();
1607 vc2.increment("replica_1");
1608 vc2.increment("replica_1");
1609 proc.operation_log.push((
1610 "field".to_string(),
1611 OperationValue::NumberRef("2".to_string()),
1612 2,
1613 vc2,
1614 ));
1615
1616 assert_eq!(proc.operation_log.len(), 2);
1618
1619 proc.compact();
1620
1621 assert_eq!(proc.operation_log.len(), 1);
1623 }
1624
1625 #[test]
1626 fn test_apply_operations_parallel_large_batch() {
1627 let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1628 proc.process(r"{}").unwrap();
1629
1630 let ops: Vec<DsonOperation> = (0..20)
1632 .map(|i| DsonOperation::FieldAdd {
1633 path: format!("field_{i}"),
1634 value: OperationValue::NumberRef(i.to_string()),
1635 })
1636 .collect();
1637
1638 let results = proc.apply_operations_parallel(ops).unwrap();
1639 assert_eq!(results.len(), 20);
1640 }
1641
1642 #[test]
1643 fn test_apply_operations_parallel_groups_by_path() {
1644 let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1645 proc.process(r"{}").unwrap();
1646
1647 let ops: Vec<DsonOperation> = (0..15)
1649 .map(|i| DsonOperation::FieldModify {
1650 path: "shared".to_string(),
1651 value: OperationValue::NumberRef(i.to_string()),
1652 })
1653 .collect();
1654
1655 let results = proc.apply_operations_parallel(ops).unwrap();
1656 assert!(!results.is_empty());
1657 }
1658
1659 #[test]
1660 fn test_merge_replicas_parallel_with_ops() {
1661 let mut proc = SimdDsonProcessor::new("replica_1").with_parallel(true);
1662 proc.process(r"{}").unwrap();
1663
1664 let mut vc = VectorClock::new();
1665 vc.increment("replica_2");
1666
1667 let replica_ops = vec![(
1668 "replica_2".to_string(),
1669 vec![CrdtOperation {
1670 operation: DsonOperation::FieldAdd {
1671 path: "from_replica_2".to_string(),
1672 value: OperationValue::StringRef("value".to_string()),
1673 },
1674 timestamp: 1,
1675 replica_id: "replica_2".to_string(),
1676 vector_clock: vc.clone(),
1677 }],
1678 )];
1679
1680 let conflicts = proc
1681 .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1682 .unwrap();
1683 assert!(conflicts.is_empty() || !conflicts.is_empty());
1685 }
1686
1687 #[test]
1688 fn test_merge_replicas_sequential_with_ops() {
1689 let mut proc = SimdDsonProcessor::new("replica_1");
1690 proc.process(r"{}").unwrap();
1691
1692 let mut vc = VectorClock::new();
1693 vc.increment("replica_2");
1694
1695 let replica_ops = vec![(
1696 "replica_2".to_string(),
1697 vec![CrdtOperation {
1698 operation: DsonOperation::FieldAdd {
1699 path: "from_replica_2".to_string(),
1700 value: OperationValue::StringRef("value".to_string()),
1701 },
1702 timestamp: 1,
1703 replica_id: "replica_2".to_string(),
1704 vector_clock: vc.clone(),
1705 }],
1706 )];
1707
1708 let conflicts = proc
1710 .merge_replicas_parallel(replica_ops, &MergeStrategy::LastWriteWins)
1711 .unwrap();
1712 assert!(conflicts.is_empty() || !conflicts.is_empty());
1713 }
1714
1715 #[test]
1716 fn test_apply_operation_other_variants() {
1717 let mut proc = SimdDsonProcessor::new("replica_1");
1718 proc.process(r"{}").unwrap();
1719
1720 proc.apply_operation(&DsonOperation::ObjectStart {
1722 path: "obj".to_string(),
1723 })
1724 .unwrap();
1725
1726 proc.apply_operation(&DsonOperation::ObjectEnd {
1727 path: "obj".to_string(),
1728 })
1729 .unwrap();
1730
1731 proc.apply_operation(&DsonOperation::ArrayStart {
1732 path: "arr".to_string(),
1733 })
1734 .unwrap();
1735
1736 proc.apply_operation(&DsonOperation::ArrayEnd {
1737 path: "arr".to_string(),
1738 })
1739 .unwrap();
1740 }
1741
1742 #[test]
1743 fn test_merge_operation_non_merge_field() {
1744 let mut proc = SimdDsonProcessor::new("replica_1");
1745 proc.process(r"{}").unwrap();
1746
1747 let mut vc = VectorClock::new();
1748 vc.increment("replica_2");
1749
1750 let op = CrdtOperation {
1752 operation: DsonOperation::FieldAdd {
1753 path: "test".to_string(),
1754 value: OperationValue::StringRef("value".to_string()),
1755 },
1756 timestamp: 1,
1757 replica_id: "replica_2".to_string(),
1758 vector_clock: vc,
1759 };
1760
1761 let conflict = proc.merge_operation(op).unwrap();
1762 assert!(conflict.is_none());
1763 }
1764}