1use crate::format_dson::{FormatBatchProcessor, FormatBatchResult, FormatDsonProcessor};
8use crate::skiptape::CompiledSchema;
9use fionn_core::Result;
10use fionn_core::format::FormatKind;
11use fionn_ops::dson_traits::{
12 CrdtMerge, CrdtOperation, DeltaCrdt, MergeConflict, OpBasedCrdt, VectorClock,
13};
14use fionn_ops::{DsonOperation, MergeStrategy, OperationValue};
15use smallvec::SmallVec;
16use std::collections::HashMap;
17
18#[derive(Debug, Clone)]
24pub struct FormatDelta {
25 pub operations: Vec<CrdtOperation>,
27 pub clock: VectorClock,
29 pub format_kind: FormatKind,
31}
32
33impl FormatDelta {
34 #[must_use]
36 pub fn new(format_kind: FormatKind) -> Self {
37 Self {
38 operations: Vec::new(),
39 clock: VectorClock::new(),
40 format_kind,
41 }
42 }
43
44 #[must_use]
46 pub const fn with_operations(
47 operations: Vec<CrdtOperation>,
48 clock: VectorClock,
49 format_kind: FormatKind,
50 ) -> Self {
51 Self {
52 operations,
53 clock,
54 format_kind,
55 }
56 }
57
58 #[must_use]
60 pub const fn is_empty(&self) -> bool {
61 self.operations.is_empty()
62 }
63
64 #[must_use]
66 pub const fn len(&self) -> usize {
67 self.operations.len()
68 }
69}
70
71#[derive(Debug, Clone)]
77struct DocumentState {
78 content: String,
80 field_timestamps: HashMap<String, u64>,
82}
83
84impl DocumentState {
85 fn new(content: String) -> Self {
86 Self {
87 content,
88 field_timestamps: HashMap::new(),
89 }
90 }
91}
92
93pub struct FormatCrdtProcessor<P: FormatBatchProcessor> {
105 dson_processor: FormatDsonProcessor<P>,
107 replica_id: String,
109 vector_clock: VectorClock,
111 lamport_timestamp: u64,
113 default_strategy: MergeStrategy,
115 document_states: HashMap<String, DocumentState>,
117 operation_history: Vec<CrdtOperation>,
119 operation_buffer: SmallVec<[CrdtOperation; 16]>,
121}
122
123impl<P: FormatBatchProcessor> FormatCrdtProcessor<P> {
124 #[must_use]
126 pub fn new(batch_processor: P, replica_id: impl Into<String>) -> Self {
127 Self {
128 dson_processor: FormatDsonProcessor::new(batch_processor),
129 replica_id: replica_id.into(),
130 vector_clock: VectorClock::new(),
131 lamport_timestamp: 0,
132 default_strategy: MergeStrategy::LastWriteWins,
133 document_states: HashMap::new(),
134 operation_history: Vec::new(),
135 operation_buffer: SmallVec::new(),
136 }
137 }
138
139 #[must_use]
141 pub fn with_strategy(mut self, strategy: MergeStrategy) -> Self {
142 self.default_strategy = strategy;
143 self
144 }
145
146 #[must_use]
148 pub fn format_kind(&self) -> FormatKind {
149 self.dson_processor.format_kind()
150 }
151
152 #[must_use]
154 pub fn replica_id(&self) -> &str {
155 &self.replica_id
156 }
157
158 #[must_use]
160 pub const fn vector_clock(&self) -> &VectorClock {
161 &self.vector_clock
162 }
163
164 #[must_use]
166 pub const fn lamport_timestamp(&self) -> u64 {
167 self.lamport_timestamp
168 }
169
170 pub fn process(&mut self, data: &[u8], schema: &CompiledSchema) -> Result<FormatBatchResult> {
175 let result = self
176 .dson_processor
177 .process_with_operations(data, schema, &[])?;
178
179 for (idx, doc) in result.documents.iter().enumerate() {
181 let doc_id = format!("doc_{idx}");
182 self.document_states
183 .insert(doc_id, DocumentState::new(doc.clone()));
184 }
185
186 self.vector_clock.increment(&self.replica_id);
188 self.lamport_timestamp += 1;
189
190 Ok(result)
191 }
192
193 pub fn process_with_operations(
198 &mut self,
199 data: &[u8],
200 schema: &CompiledSchema,
201 operations: &[DsonOperation],
202 ) -> Result<FormatBatchResult> {
203 let result = self
204 .dson_processor
205 .process_with_operations(data, schema, operations)?;
206
207 for op in operations {
209 let crdt_op = self.prepare_operation(op);
210 self.operation_history.push(crdt_op);
211 }
212
213 for (idx, doc) in result.documents.iter().enumerate() {
215 let doc_id = format!("doc_{idx}");
216 self.document_states
217 .insert(doc_id, DocumentState::new(doc.clone()));
218 }
219
220 self.vector_clock.increment(&self.replica_id);
221 self.lamport_timestamp += 1;
222
223 Ok(result)
224 }
225
226 pub fn process_unfiltered(&mut self, data: &[u8]) -> Result<FormatBatchResult> {
231 let result = self
232 .dson_processor
233 .process_unfiltered_with_operations(data, &[])?;
234
235 for (idx, doc) in result.documents.iter().enumerate() {
236 let doc_id = format!("doc_{idx}");
237 self.document_states
238 .insert(doc_id, DocumentState::new(doc.clone()));
239 }
240
241 self.vector_clock.increment(&self.replica_id);
242 self.lamport_timestamp += 1;
243
244 Ok(result)
245 }
246
247 fn prepare_operation(&mut self, op: &DsonOperation) -> CrdtOperation {
249 self.lamport_timestamp += 1;
250 self.vector_clock.increment(&self.replica_id);
251
252 CrdtOperation {
253 operation: op.clone(),
254 timestamp: self.lamport_timestamp,
255 replica_id: self.replica_id.clone(),
256 vector_clock: self.vector_clock.clone(),
257 }
258 }
259
260 #[must_use]
262 pub fn get_document(&self, doc_id: &str) -> Option<&str> {
263 self.document_states.get(doc_id).map(|s| s.content.as_str())
264 }
265
266 #[must_use]
268 pub fn document_ids(&self) -> Vec<&str> {
269 self.document_states.keys().map(String::as_str).collect()
270 }
271
272 #[must_use]
274 pub fn document_count(&self) -> usize {
275 self.document_states.len()
276 }
277
278 fn apply_field_value(
280 &mut self,
281 doc_id: &str,
282 path: &str,
283 value: &OperationValue,
284 timestamp: u64,
285 ) -> Option<MergeConflict> {
286 let state = self.document_states.get_mut(doc_id)?;
287
288 let current_ts = state.field_timestamps.get(path).copied().unwrap_or(0);
289
290 match timestamp.cmp(¤t_ts) {
291 std::cmp::Ordering::Greater => {
292 state.field_timestamps.insert(path.to_string(), timestamp);
294
295 if let Ok(mut json_value) =
297 serde_json::from_str::<serde_json::Value>(&state.content)
298 {
299 Self::set_json_path(&mut json_value, path, value);
300 if let Ok(new_content) = serde_json::to_string(&json_value) {
301 state.content = new_content;
302 }
303 }
304
305 None
306 }
307 std::cmp::Ordering::Equal => {
308 Some(MergeConflict {
311 path: path.to_string(),
312 local_value: OperationValue::StringRef("current".to_string()),
313 remote_value: value.clone(),
314 local_timestamp: current_ts,
315 remote_timestamp: timestamp,
316 resolved_value: None,
317 })
318 }
319 std::cmp::Ordering::Less => {
320 None
322 }
323 }
324 }
325
326 fn set_json_path(json: &mut serde_json::Value, path: &str, value: &OperationValue) {
328 let parts: Vec<&str> = path.split('.').collect();
329 let mut current = json;
330
331 for (i, part) in parts.iter().enumerate() {
332 if i == parts.len() - 1 {
333 if let serde_json::Value::Object(obj) = current {
335 obj.insert((*part).to_string(), Self::operation_value_to_json(value));
336 }
337 } else {
338 if let serde_json::Value::Object(obj) = current {
340 current = obj
341 .entry((*part).to_string())
342 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
343 }
344 }
345 }
346 }
347
348 fn operation_value_to_json(value: &OperationValue) -> serde_json::Value {
354 match value {
355 OperationValue::Null => serde_json::Value::Null,
356 OperationValue::BoolRef(b) => serde_json::Value::Bool(*b),
357 OperationValue::NumberRef(n) => n
358 .parse::<i64>()
359 .map(|i| serde_json::Value::Number(i.into()))
360 .or_else(|_| {
361 n.parse::<f64>().map(|f| {
362 serde_json::Value::Number(
363 serde_json::Number::from_f64(f).unwrap_or_else(|| 0.into()),
364 )
365 })
366 })
367 .unwrap_or_else(|_| serde_json::Value::String(n.clone())),
368 OperationValue::StringRef(s) => serde_json::Value::String(s.clone()),
369 OperationValue::ArrayRef { .. } => serde_json::Value::Array(Vec::new()),
372 OperationValue::ObjectRef { .. } => serde_json::Value::Object(serde_json::Map::new()),
373 }
374 }
375
376 pub fn reset(&mut self) {
378 self.dson_processor.reset();
379 self.document_states.clear();
380 self.operation_history.clear();
381 self.operation_buffer.clear();
382 }
383
384 #[must_use]
386 pub const fn dson_processor(&self) -> &FormatDsonProcessor<P> {
387 &self.dson_processor
388 }
389
390 #[allow(clippy::missing_const_for_fn)] pub fn dson_processor_mut(&mut self) -> &mut FormatDsonProcessor<P> {
393 &mut self.dson_processor
394 }
395}
396
397impl<P: FormatBatchProcessor> CrdtMerge for FormatCrdtProcessor<P> {
402 fn merge_operation(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
403 self.lamport_timestamp = self.lamport_timestamp.max(op.timestamp) + 1;
405
406 self.vector_clock.merge(&op.vector_clock);
408 self.vector_clock.increment(&self.replica_id);
409
410 match &op.operation {
412 DsonOperation::FieldAdd { path, value }
413 | DsonOperation::FieldModify { path, value } => {
414 let mut conflict = None;
416 let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
417 for doc_id in doc_ids {
418 if let Some(c) = self.apply_field_value(&doc_id, path, value, op.timestamp) {
419 conflict = Some(c);
420 }
421 }
422 Ok(conflict)
423 }
424 DsonOperation::FieldDelete { path } => {
425 let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
427 for doc_id in doc_ids {
428 if let Some(state) = self.document_states.get_mut(&doc_id)
429 && let Ok(mut json_value) =
430 serde_json::from_str::<serde_json::Value>(&state.content)
431 {
432 Self::delete_json_path(&mut json_value, path);
433 if let Ok(new_content) = serde_json::to_string(&json_value) {
434 state.content = new_content;
435 }
436 }
437 }
438 Ok(None)
439 }
440 _ => {
441 self.operation_history.push(op);
443 Ok(None)
444 }
445 }
446 }
447
448 fn merge_field(
449 &mut self,
450 path: &str,
451 value: OperationValue,
452 timestamp: u64,
453 strategy: &MergeStrategy,
454 ) -> Result<Option<MergeConflict>> {
455 let mut conflict = None;
456 let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
457
458 for doc_id in doc_ids {
459 if let Some(state) = self.document_states.get(&doc_id) {
460 let current_ts = state.field_timestamps.get(path).copied().unwrap_or(0);
461
462 if current_ts == timestamp {
463 let local_value = self
465 .get_field_value(&doc_id, path)
466 .unwrap_or(OperationValue::Null);
467
468 let resolved = strategy.resolve(&local_value, &value, current_ts, timestamp);
469
470 conflict = Some(MergeConflict {
471 path: path.to_string(),
472 local_value,
473 remote_value: value.clone(),
474 local_timestamp: current_ts,
475 remote_timestamp: timestamp,
476 resolved_value: Some(resolved.clone()),
477 });
478
479 self.apply_field_value(&doc_id, path, &resolved, timestamp.max(current_ts) + 1);
481 } else {
482 self.apply_field_value(&doc_id, path, &value, timestamp);
483 }
484 }
485 }
486
487 Ok(conflict)
488 }
489
490 fn vector_clock(&self) -> &VectorClock {
491 &self.vector_clock
492 }
493
494 fn replica_id(&self) -> &str {
495 &self.replica_id
496 }
497
498 fn resolve_conflict(
499 &mut self,
500 conflict: &MergeConflict,
501 strategy: &MergeStrategy,
502 ) -> Result<OperationValue> {
503 let resolved = strategy.resolve(
504 &conflict.local_value,
505 &conflict.remote_value,
506 conflict.local_timestamp,
507 conflict.remote_timestamp,
508 );
509
510 let doc_ids: Vec<_> = self.document_states.keys().cloned().collect();
512 for doc_id in doc_ids {
513 self.apply_field_value(
514 &doc_id,
515 &conflict.path,
516 &resolved,
517 conflict.local_timestamp.max(conflict.remote_timestamp) + 1,
518 );
519 }
520
521 Ok(resolved)
522 }
523}
524
525impl<P: FormatBatchProcessor> FormatCrdtProcessor<P> {
526 fn delete_json_path(json: &mut serde_json::Value, path: &str) {
528 let parts: Vec<&str> = path.split('.').collect();
529 let mut current = json;
530
531 for (i, part) in parts.iter().enumerate() {
532 if i == parts.len() - 1 {
533 if let serde_json::Value::Object(obj) = current {
535 obj.remove(*part);
536 }
537 } else {
538 if let serde_json::Value::Object(obj) = current {
540 match obj.get_mut(*part) {
541 Some(v) => current = v,
542 None => return,
543 }
544 } else {
545 return;
546 }
547 }
548 }
549 }
550
551 fn get_field_value(&self, doc_id: &str, path: &str) -> Option<OperationValue> {
553 let state = self.document_states.get(doc_id)?;
554 let json_value: serde_json::Value = serde_json::from_str(&state.content).ok()?;
555
556 let parts: Vec<&str> = path.split('.').collect();
557 let mut current = &json_value;
558
559 for part in &parts {
560 match current {
561 serde_json::Value::Object(obj) => {
562 current = obj.get(*part)?;
563 }
564 _ => return None,
565 }
566 }
567
568 Some(Self::json_to_operation_value(current))
569 }
570
571 fn json_to_operation_value(json: &serde_json::Value) -> OperationValue {
576 match json {
577 serde_json::Value::Null => OperationValue::Null,
578 serde_json::Value::Bool(b) => OperationValue::BoolRef(*b),
579 serde_json::Value::Number(n) => OperationValue::NumberRef(n.to_string()),
580 serde_json::Value::String(s) => OperationValue::StringRef(s.clone()),
581 serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
584 OperationValue::StringRef(json.to_string())
585 }
586 }
587 }
588}
589
590impl<P: FormatBatchProcessor> DeltaCrdt for FormatCrdtProcessor<P> {
595 type Delta = FormatDelta;
596
597 fn generate_delta(&self, since: &VectorClock) -> Self::Delta {
598 let ops: Vec<CrdtOperation> = self
600 .operation_history
601 .iter()
602 .filter(|op| !op.vector_clock.happened_before(since))
603 .cloned()
604 .collect();
605
606 FormatDelta::with_operations(ops, self.vector_clock.clone(), self.format_kind())
607 }
608
609 fn apply_delta(&mut self, delta: Self::Delta) -> Result<Vec<MergeConflict>> {
610 let mut conflicts = Vec::new();
611
612 for op in delta.operations {
613 if !self.is_causally_ready(&op) {
614 self.buffer_operation(op);
616 } else if let Some(conflict) = self.merge_operation(op)? {
617 conflicts.push(conflict);
618 }
619 }
620
621 conflicts.extend(self.process_buffered()?);
623
624 self.vector_clock.merge(&delta.clock);
626
627 Ok(conflicts)
628 }
629
630 fn compact(&mut self) {
631 const MAX_HISTORY: usize = 1000;
633 if self.operation_history.len() > MAX_HISTORY {
634 let drain_count = self.operation_history.len() - MAX_HISTORY;
635 self.operation_history.drain(0..drain_count);
636 }
637 }
638}
639
640impl<P: FormatBatchProcessor> OpBasedCrdt for FormatCrdtProcessor<P> {
645 fn prepare(&self, op: &DsonOperation) -> Result<CrdtOperation> {
646 let mut clock = self.vector_clock.clone();
647 clock.increment(&self.replica_id);
648
649 Ok(CrdtOperation {
650 operation: op.clone(),
651 timestamp: self.lamport_timestamp + 1,
652 replica_id: self.replica_id.clone(),
653 vector_clock: clock,
654 })
655 }
656
657 fn effect(&mut self, op: CrdtOperation) -> Result<Option<MergeConflict>> {
658 self.merge_operation(op)
659 }
660
661 fn is_causally_ready(&self, op: &CrdtOperation) -> bool {
662 for (replica, &time) in &op.vector_clock.clocks() {
666 if replica == &op.replica_id {
667 if time > 1 && self.vector_clock.get(replica) < time - 1 {
669 return false;
670 }
671 } else {
672 if self.vector_clock.get(replica) < time {
674 return false;
675 }
676 }
677 }
678 true
679 }
680
681 fn buffer_operation(&mut self, op: CrdtOperation) {
682 self.operation_buffer.push(op);
683 }
684
685 fn process_buffered(&mut self) -> Result<Vec<MergeConflict>> {
686 let mut conflicts = Vec::new();
687 let mut processed = Vec::new();
688
689 loop {
691 let mut made_progress = false;
692
693 for (i, op) in self.operation_buffer.iter().enumerate() {
694 if self.is_causally_ready(op) {
695 processed.push(i);
696 made_progress = true;
697 }
698 }
699
700 if !made_progress {
701 break;
702 }
703
704 for &idx in processed.iter().rev() {
706 let op = self.operation_buffer.remove(idx);
707 if let Some(conflict) = self.merge_operation(op)? {
708 conflicts.push(conflict);
709 }
710 }
711
712 processed.clear();
713 }
714
715 Ok(conflicts)
716 }
717}
718
719#[cfg(test)]
724mod tests {
725 use super::*;
726 use crate::format_dson::BatchStatistics;
727
728 struct MockBatchProcessor;
730
731 impl FormatBatchProcessor for MockBatchProcessor {
732 fn format_kind(&self) -> FormatKind {
733 FormatKind::Json
734 }
735
736 fn process_batch(
737 &mut self,
738 _data: &[u8],
739 _schema: &CompiledSchema,
740 ) -> Result<FormatBatchResult> {
741 Ok(FormatBatchResult {
742 documents: vec![r#"{"name":"test","value":42}"#.to_string()],
743 errors: vec![],
744 statistics: BatchStatistics::default(),
745 })
746 }
747
748 fn process_batch_unfiltered(&mut self, _data: &[u8]) -> Result<FormatBatchResult> {
749 self.process_batch(&[], &CompiledSchema::compile(&[]).unwrap())
750 }
751
752 fn reset(&mut self) {}
753 }
754
755 #[test]
756 fn test_crdt_processor_creation() {
757 let processor = FormatCrdtProcessor::new(MockBatchProcessor, "replica_1");
758 assert_eq!(processor.replica_id(), "replica_1");
759 assert_eq!(processor.format_kind(), FormatKind::Json);
760 }
761
762 #[test]
763 fn test_crdt_processor_with_strategy() {
764 let processor =
765 FormatCrdtProcessor::new(MockBatchProcessor, "r1").with_strategy(MergeStrategy::Max);
766 assert_eq!(processor.replica_id(), "r1");
767 }
768
769 #[test]
770 fn test_process_increments_clock() {
771 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
772 let schema = CompiledSchema::compile(&[]).unwrap();
773
774 let initial_ts = processor.lamport_timestamp();
775 processor.process(b"{}", &schema).unwrap();
776
777 assert!(processor.lamport_timestamp() > initial_ts);
778 assert_eq!(processor.vector_clock().get("r1"), 1);
779 }
780
781 #[test]
782 fn test_process_tracks_documents() {
783 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
784 let schema = CompiledSchema::compile(&[]).unwrap();
785
786 processor.process(b"{}", &schema).unwrap();
787
788 assert_eq!(processor.document_count(), 1);
789 assert!(processor.get_document("doc_0").is_some());
790 }
791
792 #[test]
793 fn test_format_delta_creation() {
794 let delta = FormatDelta::new(FormatKind::Json);
795 assert!(delta.is_empty());
796 assert_eq!(delta.len(), 0);
797 }
798
799 #[test]
800 fn test_format_delta_with_operations() {
801 let op = CrdtOperation {
802 operation: DsonOperation::FieldAdd {
803 path: "test".to_string(),
804 value: OperationValue::StringRef("value".to_string()),
805 },
806 timestamp: 1,
807 replica_id: "r1".to_string(),
808 vector_clock: VectorClock::new(),
809 };
810
811 let delta = FormatDelta::with_operations(vec![op], VectorClock::new(), FormatKind::Json);
812 assert!(!delta.is_empty());
813 assert_eq!(delta.len(), 1);
814 }
815
816 #[test]
817 fn test_merge_operation() {
818 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
819 let schema = CompiledSchema::compile(&[]).unwrap();
820 processor.process(b"{}", &schema).unwrap();
821
822 let op = CrdtOperation {
823 operation: DsonOperation::FieldAdd {
824 path: "new_field".to_string(),
825 value: OperationValue::StringRef("new_value".to_string()),
826 },
827 timestamp: 10,
828 replica_id: "r2".to_string(),
829 vector_clock: VectorClock::new(),
830 };
831
832 let conflict = processor.merge_operation(op).unwrap();
833 assert!(conflict.is_none());
835 }
836
837 #[test]
838 fn test_generate_and_apply_delta() {
839 let mut processor1 = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
840 let schema = CompiledSchema::compile(&[]).unwrap();
841 processor1.process(b"{}", &schema).unwrap();
842
843 let delta = processor1.generate_delta(&VectorClock::new());
845
846 let mut processor2 = FormatCrdtProcessor::new(MockBatchProcessor, "r2");
847 processor2.process(b"{}", &schema).unwrap();
848
849 let conflicts = processor2.apply_delta(delta).unwrap();
851 assert!(conflicts.is_empty());
853 }
854
855 #[test]
856 fn test_prepare_operation() {
857 let processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
858 let op = DsonOperation::FieldAdd {
859 path: "test".to_string(),
860 value: OperationValue::Null,
861 };
862
863 let crdt_op = processor.prepare(&op).unwrap();
864 assert_eq!(crdt_op.replica_id, "r1");
865 assert!(crdt_op.timestamp > 0);
866 }
867
868 #[test]
869 fn test_is_causally_ready() {
870 let processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
871
872 let op = CrdtOperation {
874 operation: DsonOperation::FieldAdd {
875 path: "test".to_string(),
876 value: OperationValue::Null,
877 },
878 timestamp: 1,
879 replica_id: "r2".to_string(),
880 vector_clock: VectorClock::new(),
881 };
882
883 assert!(processor.is_causally_ready(&op));
884 }
885
886 #[test]
887 fn test_buffer_and_process() {
888 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
889
890 let op = CrdtOperation {
891 operation: DsonOperation::FieldAdd {
892 path: "test".to_string(),
893 value: OperationValue::Null,
894 },
895 timestamp: 1,
896 replica_id: "r2".to_string(),
897 vector_clock: VectorClock::new(),
898 };
899
900 processor.buffer_operation(op);
901 assert_eq!(processor.operation_buffer.len(), 1);
902
903 let conflicts = processor.process_buffered().unwrap();
904 assert!(conflicts.is_empty());
906 }
907
908 #[test]
909 fn test_compact() {
910 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
911
912 for i in 0..2000_u64 {
914 processor.operation_history.push(CrdtOperation {
915 operation: DsonOperation::FieldAdd {
916 path: format!("field_{i}"),
917 value: OperationValue::Null,
918 },
919 timestamp: i,
920 replica_id: "r1".to_string(),
921 vector_clock: VectorClock::new(),
922 });
923 }
924
925 processor.compact();
926 assert!(processor.operation_history.len() <= 1000);
927 }
928
929 #[test]
930 fn test_reset() {
931 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
932 let schema = CompiledSchema::compile(&[]).unwrap();
933 processor.process(b"{}", &schema).unwrap();
934
935 assert!(processor.document_count() > 0);
936
937 processor.reset();
938 assert_eq!(processor.document_count(), 0);
939 }
940
941 #[test]
942 fn test_document_ids() {
943 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
944 let schema = CompiledSchema::compile(&[]).unwrap();
945 processor.process(b"{}", &schema).unwrap();
946
947 let ids = processor.document_ids();
948 assert!(!ids.is_empty());
949 }
950
951 #[test]
952 fn test_merge_field_with_conflict() {
953 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
954 let schema = CompiledSchema::compile(&[]).unwrap();
955 processor.process(b"{}", &schema).unwrap();
956
957 processor.apply_field_value(
959 "doc_0",
960 "test_field",
961 &OperationValue::StringRef("value1".to_string()),
962 1,
963 );
964
965 let conflict = processor
967 .merge_field(
968 "test_field",
969 OperationValue::StringRef("value2".to_string()),
970 1, &MergeStrategy::LastWriteWins,
972 )
973 .unwrap();
974
975 assert!(conflict.is_some());
977 }
978
979 #[test]
980 fn test_resolve_conflict() {
981 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
982 let schema = CompiledSchema::compile(&[]).unwrap();
983 processor.process(b"{}", &schema).unwrap();
984
985 let conflict = MergeConflict {
986 path: "test".to_string(),
987 local_value: OperationValue::NumberRef("10".to_string()),
988 remote_value: OperationValue::NumberRef("20".to_string()),
989 local_timestamp: 1,
990 remote_timestamp: 2,
991 resolved_value: None,
992 };
993
994 let resolved = processor
995 .resolve_conflict(&conflict, &MergeStrategy::Max)
996 .unwrap();
997
998 match resolved {
1000 OperationValue::NumberRef(n) => assert_eq!(n, "20"),
1001 _ => panic!("Expected NumberRef"),
1002 }
1003 }
1004
1005 #[test]
1006 fn test_operation_value_to_json_all_types() {
1007 let null_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1009 &OperationValue::Null,
1010 );
1011 assert!(null_json.is_null());
1012
1013 let bool_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1014 &OperationValue::BoolRef(true),
1015 );
1016 assert_eq!(bool_json, serde_json::Value::Bool(true));
1017
1018 let int_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1019 &OperationValue::NumberRef("42".to_string()),
1020 );
1021 assert_eq!(int_json, serde_json::json!(42));
1022
1023 let float_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1024 &OperationValue::NumberRef("3.14".to_string()),
1025 );
1026 assert!(float_json.is_number());
1027
1028 let string_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1029 &OperationValue::StringRef("hello".to_string()),
1030 );
1031 assert_eq!(string_json, serde_json::Value::String("hello".to_string()));
1032
1033 let array_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1035 &OperationValue::ArrayRef { start: 0, end: 0 },
1036 );
1037 assert!(array_json.is_array());
1038
1039 let obj_json = FormatCrdtProcessor::<MockBatchProcessor>::operation_value_to_json(
1040 &OperationValue::ObjectRef { start: 0, end: 0 },
1041 );
1042 assert!(obj_json.is_object());
1043 }
1044
1045 #[test]
1046 fn test_json_to_operation_value_all_types() {
1047 let null_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1048 &serde_json::Value::Null,
1049 );
1050 assert!(matches!(null_val, OperationValue::Null));
1051
1052 let bool_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1053 &serde_json::Value::Bool(false),
1054 );
1055 assert!(matches!(bool_val, OperationValue::BoolRef(false)));
1056
1057 let num_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1058 &serde_json::json!(123),
1059 );
1060 assert!(matches!(num_val, OperationValue::NumberRef(_)));
1061
1062 let str_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1063 &serde_json::Value::String("test".to_string()),
1064 );
1065 assert!(matches!(str_val, OperationValue::StringRef(_)));
1066
1067 let arr_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1069 &serde_json::json!([1, 2]),
1070 );
1071 assert!(matches!(arr_val, OperationValue::StringRef(_)));
1072
1073 let obj_val = FormatCrdtProcessor::<MockBatchProcessor>::json_to_operation_value(
1074 &serde_json::json!({"a": 1}),
1075 );
1076 assert!(matches!(obj_val, OperationValue::StringRef(_)));
1077 }
1078
1079 #[test]
1080 fn test_dson_processor_access() {
1081 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
1082 let _ = processor.dson_processor();
1083 let _ = processor.dson_processor_mut();
1084 }
1085
1086 #[test]
1087 fn test_field_delete_operation() {
1088 let mut processor = FormatCrdtProcessor::new(MockBatchProcessor, "r1");
1089 let schema = CompiledSchema::compile(&[]).unwrap();
1090 processor.process(b"{}", &schema).unwrap();
1091
1092 let op = CrdtOperation {
1093 operation: DsonOperation::FieldDelete {
1094 path: "name".to_string(),
1095 },
1096 timestamp: 10,
1097 replica_id: "r2".to_string(),
1098 vector_clock: VectorClock::new(),
1099 };
1100
1101 let conflict = processor.merge_operation(op).unwrap();
1102 assert!(conflict.is_none());
1103 }
1104}