1use crate::{AdapterError, DomainInfo, PredicateInfo, SymbolTable};
41use serde::{Deserialize, Serialize};
42use std::collections::{HashMap, HashSet, VecDeque};
43use std::sync::{Arc, RwLock};
44use std::time::{SystemTime, UNIX_EPOCH};
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct NodeId(String);
49
50impl NodeId {
51 pub fn new(id: impl Into<String>) -> Self {
53 Self(id.into())
54 }
55
56 pub fn as_str(&self) -> &str {
58 &self.0
59 }
60
61 pub fn random() -> Self {
63 use std::sync::atomic::{AtomicU64, Ordering};
64 static COUNTER: AtomicU64 = AtomicU64::new(0);
65 let id = COUNTER.fetch_add(1, Ordering::SeqCst);
66 Self(format!("node-{}", id))
67 }
68}
69
70impl std::fmt::Display for NodeId {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 write!(f, "{}", self.0)
73 }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78pub struct VectorClock {
79 clocks: HashMap<NodeId, u64>,
80}
81
82impl VectorClock {
83 pub fn new() -> Self {
85 Self {
86 clocks: HashMap::new(),
87 }
88 }
89
90 pub fn increment(&mut self, node: &NodeId) {
92 *self.clocks.entry(node.clone()).or_insert(0) += 1;
93 }
94
95 pub fn get(&self, node: &NodeId) -> u64 {
97 self.clocks.get(node).copied().unwrap_or(0)
98 }
99
100 pub fn merge(&mut self, other: &VectorClock) {
102 for (node, &value) in &other.clocks {
103 let current = self.clocks.entry(node.clone()).or_insert(0);
104 *current = (*current).max(value);
105 }
106 }
107
108 pub fn happens_before(&self, other: &VectorClock) -> bool {
110 let mut strictly_less = false;
111
112 for (node, &self_val) in &self.clocks {
114 let other_val = other.get(node);
115 if self_val > other_val {
116 return false; }
118 if self_val < other_val {
119 strictly_less = true;
120 }
121 }
122
123 for (node, &other_val) in &other.clocks {
125 if !self.clocks.contains_key(node) && other_val > 0 {
126 strictly_less = true;
127 }
128 }
129
130 strictly_less
131 }
132
133 pub fn is_concurrent(&self, other: &VectorClock) -> bool {
135 !self.happens_before(other) && !other.happens_before(self) && self != other
136 }
137}
138
139impl Default for VectorClock {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
147pub enum SyncChangeType {
148 DomainAdded,
150 DomainModified,
152 DomainRemoved,
154 PredicateAdded,
156 PredicateModified,
158 PredicateRemoved,
160 VariableAdded,
162 VariableRemoved,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct SyncEvent {
169 pub id: String,
171 pub origin: NodeId,
173 pub clock: VectorClock,
175 pub change_type: SyncChangeType,
177 pub entity_name: String,
179 pub entity_data: Option<String>,
181 pub timestamp: u64,
183}
184
185impl SyncEvent {
186 pub fn new(
188 origin: NodeId,
189 clock: VectorClock,
190 change_type: SyncChangeType,
191 entity_name: String,
192 entity_data: Option<String>,
193 ) -> Self {
194 let id = format!(
195 "{}-{}-{}",
196 origin.as_str(),
197 clock.get(&origin),
198 Self::current_timestamp()
199 );
200
201 Self {
202 id,
203 origin,
204 clock,
205 change_type,
206 entity_name,
207 entity_data,
208 timestamp: Self::current_timestamp(),
209 }
210 }
211
212 fn current_timestamp() -> u64 {
213 SystemTime::now()
214 .duration_since(UNIX_EPOCH)
215 .unwrap()
216 .as_millis() as u64
217 }
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
222pub enum ConflictResolution {
223 LastWriteWins,
225 FirstWriteWins,
227 Manual,
229 Merge,
231 VectorClock,
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
237pub enum ApplyResult {
238 Applied,
240 Ignored,
242 ConflictResolved,
244 ManualRequired,
246}
247
248pub trait SyncProtocol: Send + Sync {
250 fn send_event(&self, target: &NodeId, event: &SyncEvent) -> Result<(), AdapterError>;
252
253 fn broadcast_event(&self, event: &SyncEvent) -> Result<(), AdapterError>;
255
256 fn receive_events(&self) -> Result<Vec<SyncEvent>, AdapterError>;
258}
259
260#[derive(Debug, Clone)]
262pub struct InMemorySyncProtocol {
263 events: Arc<RwLock<VecDeque<SyncEvent>>>,
264}
265
266impl InMemorySyncProtocol {
267 pub fn new() -> Self {
269 Self {
270 events: Arc::new(RwLock::new(VecDeque::new())),
271 }
272 }
273}
274
275impl Default for InMemorySyncProtocol {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281impl SyncProtocol for InMemorySyncProtocol {
282 fn send_event(&self, _target: &NodeId, event: &SyncEvent) -> Result<(), AdapterError> {
283 self.events
284 .write()
285 .map_err(|e| AdapterError::InvalidOperation(format!("Lock poisoned: {}", e)))?
286 .push_back(event.clone());
287 Ok(())
288 }
289
290 fn broadcast_event(&self, event: &SyncEvent) -> Result<(), AdapterError> {
291 self.events
292 .write()
293 .map_err(|e| AdapterError::InvalidOperation(format!("Lock poisoned: {}", e)))?
294 .push_back(event.clone());
295 Ok(())
296 }
297
298 fn receive_events(&self) -> Result<Vec<SyncEvent>, AdapterError> {
299 let mut events = self
300 .events
301 .write()
302 .map_err(|e| AdapterError::InvalidOperation(format!("Lock poisoned: {}", e)))?;
303 Ok(events.drain(..).collect())
304 }
305}
306
307pub trait EventListener: Send + Sync {
309 fn on_event_received(&self, event: &SyncEvent);
311
312 fn on_event_applied(&self, event: &SyncEvent, result: &ApplyResult);
314
315 fn on_conflict_detected(&self, event: &SyncEvent, conflict_type: &str);
317}
318
319#[derive(Debug, Clone, Default, Serialize, Deserialize)]
321pub struct SyncStatistics {
322 pub events_sent: usize,
324 pub events_received: usize,
326 pub events_applied: usize,
328 pub events_ignored: usize,
330 pub conflicts_detected: usize,
332 pub conflicts_resolved: usize,
334 pub manual_resolutions_required: usize,
336}
337
338pub struct SynchronizationManager {
340 node_id: NodeId,
342 table: SymbolTable,
344 clock: VectorClock,
346 resolution_strategy: ConflictResolution,
348 pending_events: VecDeque<SyncEvent>,
350 applied_events: HashSet<String>,
352 listeners: Vec<Arc<dyn EventListener>>,
354 stats: SyncStatistics,
356 known_nodes: HashSet<NodeId>,
358}
359
360impl SynchronizationManager {
361 pub fn new(node_id: NodeId, table: SymbolTable) -> Self {
363 let mut clock = VectorClock::new();
364 clock.increment(&node_id);
365
366 Self {
367 node_id,
368 table,
369 clock,
370 resolution_strategy: ConflictResolution::VectorClock,
371 pending_events: VecDeque::new(),
372 applied_events: HashSet::new(),
373 listeners: Vec::new(),
374 stats: SyncStatistics::default(),
375 known_nodes: HashSet::new(),
376 }
377 }
378
379 pub fn set_resolution_strategy(&mut self, strategy: ConflictResolution) {
381 self.resolution_strategy = strategy;
382 }
383
384 pub fn add_listener(&mut self, listener: Arc<dyn EventListener>) {
386 self.listeners.push(listener);
387 }
388
389 pub fn register_node(&mut self, node_id: NodeId) {
391 self.known_nodes.insert(node_id);
392 }
393
394 pub fn table(&self) -> &SymbolTable {
396 &self.table
397 }
398
399 pub fn table_mut(&mut self) -> &mut SymbolTable {
401 &mut self.table
402 }
403
404 pub fn pending_events(&self) -> Vec<SyncEvent> {
406 self.pending_events.iter().cloned().collect()
407 }
408
409 pub fn clear_pending_events(&mut self) {
411 self.pending_events.clear();
412 }
413
414 pub fn statistics(&self) -> &SyncStatistics {
416 &self.stats
417 }
418
419 pub fn add_domain(&mut self, domain: DomainInfo) -> Result<(), AdapterError> {
421 let name = domain.name.clone();
422 self.table
423 .add_domain(domain.clone())
424 .map_err(|e| AdapterError::InvalidOperation(format!("Add domain failed: {}", e)))?;
425
426 self.clock.increment(&self.node_id);
428 let entity_data = serde_json::to_string(&domain)
429 .map_err(|e| AdapterError::InvalidOperation(format!("Serialization error: {}", e)))?;
430
431 let event = SyncEvent::new(
432 self.node_id.clone(),
433 self.clock.clone(),
434 SyncChangeType::DomainAdded,
435 name,
436 Some(entity_data),
437 );
438
439 self.pending_events.push_back(event.clone());
440 self.applied_events.insert(event.id.clone());
441 self.stats.events_sent += 1;
442
443 Ok(())
444 }
445
446 pub fn add_predicate(&mut self, predicate: PredicateInfo) -> Result<(), AdapterError> {
448 let name = predicate.name.clone();
449 self.table
450 .add_predicate(predicate.clone())
451 .map_err(|e| AdapterError::InvalidOperation(format!("Add predicate failed: {}", e)))?;
452
453 self.clock.increment(&self.node_id);
455 let entity_data = serde_json::to_string(&predicate)
456 .map_err(|e| AdapterError::InvalidOperation(format!("Serialization error: {}", e)))?;
457
458 let event = SyncEvent::new(
459 self.node_id.clone(),
460 self.clock.clone(),
461 SyncChangeType::PredicateAdded,
462 name,
463 Some(entity_data),
464 );
465
466 self.pending_events.push_back(event.clone());
467 self.applied_events.insert(event.id.clone());
468 self.stats.events_sent += 1;
469
470 Ok(())
471 }
472
473 pub fn remove_domain(&mut self, name: &str) -> Result<(), AdapterError> {
475 if self.table.get_domain(name).is_none() {
477 return Err(AdapterError::DomainNotFound(name.to_string()));
478 }
479
480 self.clock.increment(&self.node_id);
484 let event = SyncEvent::new(
485 self.node_id.clone(),
486 self.clock.clone(),
487 SyncChangeType::DomainRemoved,
488 name.to_string(),
489 None,
490 );
491
492 self.pending_events.push_back(event.clone());
493 self.applied_events.insert(event.id.clone());
494 self.stats.events_sent += 1;
495
496 Ok(())
497 }
498
499 pub fn apply_event(&mut self, event: SyncEvent) -> Result<ApplyResult, AdapterError> {
501 self.stats.events_received += 1;
502
503 for listener in &self.listeners {
505 listener.on_event_received(&event);
506 }
507
508 if self.applied_events.contains(&event.id) {
510 self.stats.events_ignored += 1;
511 return Ok(ApplyResult::Ignored);
512 }
513
514 self.clock.merge(&event.clock);
516
517 let result = match event.change_type {
519 SyncChangeType::DomainAdded => self.apply_domain_added(&event)?,
520 SyncChangeType::DomainModified => self.apply_domain_modified(&event)?,
521 SyncChangeType::DomainRemoved => self.apply_domain_removed(&event)?,
522 SyncChangeType::PredicateAdded => self.apply_predicate_added(&event)?,
523 SyncChangeType::PredicateModified => self.apply_predicate_modified(&event)?,
524 SyncChangeType::PredicateRemoved => self.apply_predicate_removed(&event)?,
525 SyncChangeType::VariableAdded => self.apply_variable_added(&event)?,
526 SyncChangeType::VariableRemoved => self.apply_variable_removed(&event)?,
527 };
528
529 self.applied_events.insert(event.id.clone());
531
532 match result {
534 ApplyResult::Applied => self.stats.events_applied += 1,
535 ApplyResult::Ignored => self.stats.events_ignored += 1,
536 ApplyResult::ConflictResolved => {
537 self.stats.conflicts_resolved += 1;
538 self.stats.events_applied += 1;
539 }
540 ApplyResult::ManualRequired => self.stats.manual_resolutions_required += 1,
541 }
542
543 for listener in &self.listeners {
545 listener.on_event_applied(&event, &result);
546 }
547
548 Ok(result)
549 }
550
551 fn apply_domain_added(&mut self, event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
552 if let Some(_existing) = self.table.get_domain(&event.entity_name) {
554 self.stats.conflicts_detected += 1;
555
556 for listener in &self.listeners {
558 listener.on_conflict_detected(event, "domain_already_exists");
559 }
560
561 match self.resolution_strategy {
563 ConflictResolution::LastWriteWins => {
564 Ok(ApplyResult::ConflictResolved)
567 }
568 ConflictResolution::FirstWriteWins => {
569 Ok(ApplyResult::Ignored)
571 }
572 ConflictResolution::Manual => Ok(ApplyResult::ManualRequired),
573 ConflictResolution::Merge | ConflictResolution::VectorClock => {
574 Ok(ApplyResult::ConflictResolved)
576 }
577 }
578 } else {
579 let entity_data = event
581 .entity_data
582 .as_ref()
583 .ok_or_else(|| AdapterError::InvalidOperation("Missing entity data".to_string()))?;
584
585 let domain: DomainInfo = serde_json::from_str(entity_data).map_err(|e| {
586 AdapterError::InvalidOperation(format!("Deserialization error: {}", e))
587 })?;
588
589 self.table
590 .add_domain(domain)
591 .map_err(|e| AdapterError::InvalidOperation(format!("Add domain failed: {}", e)))?;
592 Ok(ApplyResult::Applied)
593 }
594 }
595
596 fn apply_domain_modified(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
597 Ok(ApplyResult::Applied)
600 }
601
602 fn apply_domain_removed(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
603 Ok(ApplyResult::Applied)
606 }
607
608 fn apply_predicate_added(&mut self, event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
609 if self.table.get_predicate(&event.entity_name).is_some() {
611 self.stats.conflicts_detected += 1;
612
613 for listener in &self.listeners {
614 listener.on_conflict_detected(event, "predicate_already_exists");
615 }
616
617 match self.resolution_strategy {
618 ConflictResolution::FirstWriteWins => Ok(ApplyResult::Ignored),
619 ConflictResolution::Manual => Ok(ApplyResult::ManualRequired),
620 _ => Ok(ApplyResult::ConflictResolved),
621 }
622 } else {
623 let entity_data = event
624 .entity_data
625 .as_ref()
626 .ok_or_else(|| AdapterError::InvalidOperation("Missing entity data".to_string()))?;
627
628 let predicate: PredicateInfo = serde_json::from_str(entity_data).map_err(|e| {
629 AdapterError::InvalidOperation(format!("Deserialization error: {}", e))
630 })?;
631
632 self.table.add_predicate(predicate).map_err(|e| {
633 AdapterError::InvalidOperation(format!("Add predicate failed: {}", e))
634 })?;
635 Ok(ApplyResult::Applied)
636 }
637 }
638
639 fn apply_predicate_modified(
640 &mut self,
641 _event: &SyncEvent,
642 ) -> Result<ApplyResult, AdapterError> {
643 Ok(ApplyResult::Applied)
644 }
645
646 fn apply_predicate_removed(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
647 Ok(ApplyResult::Applied)
648 }
649
650 fn apply_variable_added(&mut self, event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
651 if let Some(data) = &event.entity_data {
653 let parts: Vec<&str> = data.split(':').collect();
654 if parts.len() == 2 {
655 let var_name = parts[0];
656 let domain_name = parts[1];
657
658 if self.table.bind_variable(var_name, domain_name).is_err() {
659 Ok(ApplyResult::Ignored)
661 } else {
662 Ok(ApplyResult::Applied)
663 }
664 } else {
665 Err(AdapterError::InvalidOperation(
666 "Invalid variable data format".to_string(),
667 ))
668 }
669 } else {
670 Err(AdapterError::InvalidOperation(
671 "Missing entity data for variable".to_string(),
672 ))
673 }
674 }
675
676 fn apply_variable_removed(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
677 Ok(ApplyResult::Applied)
678 }
679
680 pub fn synchronize<P: SyncProtocol>(&mut self, protocol: &P) -> Result<(), AdapterError> {
682 for event in &self.pending_events {
684 protocol.broadcast_event(event)?;
685 }
686 self.pending_events.clear();
687
688 let events = protocol.receive_events()?;
690 for event in events {
691 self.apply_event(event)?;
692 }
693
694 Ok(())
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701
702 #[test]
703 fn test_node_id_creation() {
704 let node = NodeId::new("test-node");
705 assert_eq!(node.as_str(), "test-node");
706 assert_eq!(node.to_string(), "test-node");
707 }
708
709 #[test]
710 fn test_node_id_random() {
711 let node1 = NodeId::random();
712 let node2 = NodeId::random();
713 assert_ne!(node1, node2);
714 }
715
716 #[test]
717 fn test_vector_clock_basics() {
718 let mut clock = VectorClock::new();
719 let node1 = NodeId::new("node1");
720 let node2 = NodeId::new("node2");
721
722 assert_eq!(clock.get(&node1), 0);
723
724 clock.increment(&node1);
725 assert_eq!(clock.get(&node1), 1);
726
727 clock.increment(&node1);
728 assert_eq!(clock.get(&node1), 2);
729
730 clock.increment(&node2);
731 assert_eq!(clock.get(&node2), 1);
732 }
733
734 #[test]
735 fn test_vector_clock_merge() {
736 let node1 = NodeId::new("node1");
737 let node2 = NodeId::new("node2");
738
739 let mut clock1 = VectorClock::new();
740 clock1.increment(&node1);
741 clock1.increment(&node1);
742
743 let mut clock2 = VectorClock::new();
744 clock2.increment(&node2);
745
746 clock1.merge(&clock2);
747 assert_eq!(clock1.get(&node1), 2);
748 assert_eq!(clock1.get(&node2), 1);
749 }
750
751 #[test]
752 fn test_vector_clock_happens_before() {
753 let node1 = NodeId::new("node1");
754
755 let mut clock1 = VectorClock::new();
756 clock1.increment(&node1);
757
758 let mut clock2 = VectorClock::new();
759 clock2.increment(&node1);
760 clock2.increment(&node1);
761
762 assert!(clock1.happens_before(&clock2));
763 assert!(!clock2.happens_before(&clock1));
764 }
765
766 #[test]
767 fn test_vector_clock_concurrent() {
768 let node1 = NodeId::new("node1");
769 let node2 = NodeId::new("node2");
770
771 let mut clock1 = VectorClock::new();
772 clock1.increment(&node1);
773
774 let mut clock2 = VectorClock::new();
775 clock2.increment(&node2);
776
777 assert!(clock1.is_concurrent(&clock2));
778 assert!(clock2.is_concurrent(&clock1));
779 }
780
781 #[test]
782 fn test_sync_event_creation() {
783 let node = NodeId::new("test-node");
784 let clock = VectorClock::new();
785
786 let event = SyncEvent::new(
787 node.clone(),
788 clock,
789 SyncChangeType::DomainAdded,
790 "Person".to_string(),
791 Some("{}".to_string()),
792 );
793
794 assert_eq!(event.origin, node);
795 assert_eq!(event.entity_name, "Person");
796 assert!(event.timestamp > 0);
797 }
798
799 #[test]
800 fn test_in_memory_protocol() {
801 let protocol = InMemorySyncProtocol::new();
802 let node = NodeId::new("test");
803 let clock = VectorClock::new();
804
805 let event = SyncEvent::new(
806 node.clone(),
807 clock,
808 SyncChangeType::DomainAdded,
809 "Person".to_string(),
810 None,
811 );
812
813 protocol.send_event(&node, &event).unwrap();
814
815 let received = protocol.receive_events().unwrap();
816 assert_eq!(received.len(), 1);
817 assert_eq!(received[0].entity_name, "Person");
818 }
819
820 #[test]
821 fn test_sync_manager_creation() {
822 let node = NodeId::new("test");
823 let table = SymbolTable::new();
824 let mgr = SynchronizationManager::new(node.clone(), table);
825
826 assert_eq!(mgr.node_id, node);
827 assert_eq!(mgr.stats.events_sent, 0);
828 }
829
830 #[test]
831 fn test_sync_manager_add_domain() {
832 let node = NodeId::new("test");
833 let table = SymbolTable::new();
834 let mut mgr = SynchronizationManager::new(node, table);
835
836 let domain = DomainInfo::new("Person", 100);
837 mgr.add_domain(domain).unwrap();
838
839 assert_eq!(mgr.pending_events().len(), 1);
840 assert_eq!(mgr.stats.events_sent, 1);
841 assert!(mgr.table().get_domain("Person").is_some());
842 }
843
844 #[test]
845 fn test_sync_manager_add_predicate() {
846 let node = NodeId::new("test");
847 let mut table = SymbolTable::new();
848 table.add_domain(DomainInfo::new("Person", 100)).unwrap();
849
850 let mut mgr = SynchronizationManager::new(node, table);
851
852 let predicate = PredicateInfo::new("knows", vec!["Person".to_string()]);
853 mgr.add_predicate(predicate).unwrap();
854
855 assert_eq!(mgr.pending_events().len(), 1);
856 assert_eq!(mgr.stats.events_sent, 1);
857 }
858
859 #[test]
860 fn test_sync_manager_apply_domain_event() {
861 let node1 = NodeId::new("node1");
862 let node2 = NodeId::new("node2");
863
864 let table1 = SymbolTable::new();
865 let mut mgr1 = SynchronizationManager::new(node1, table1);
866
867 let table2 = SymbolTable::new();
868 let mut mgr2 = SynchronizationManager::new(node2, table2);
869
870 let domain = DomainInfo::new("Person", 100);
872 mgr1.add_domain(domain).unwrap();
873
874 let events = mgr1.pending_events();
876 let result = mgr2.apply_event(events[0].clone()).unwrap();
877
878 assert_eq!(result, ApplyResult::Applied);
879 assert!(mgr2.table().get_domain("Person").is_some());
880 assert_eq!(mgr2.stats.events_received, 1);
881 assert_eq!(mgr2.stats.events_applied, 1);
882 }
883
884 #[test]
885 fn test_sync_manager_duplicate_event() {
886 let node = NodeId::new("node1");
887 let table = SymbolTable::new();
888 let mut mgr = SynchronizationManager::new(node.clone(), table);
889
890 let domain = DomainInfo::new("Person", 100);
891 let event_data = serde_json::to_string(&domain).unwrap();
892
893 let mut clock = VectorClock::new();
894 clock.increment(&node);
895
896 let event = SyncEvent::new(
897 node,
898 clock,
899 SyncChangeType::DomainAdded,
900 "Person".to_string(),
901 Some(event_data),
902 );
903
904 let result1 = mgr.apply_event(event.clone()).unwrap();
906 assert_eq!(result1, ApplyResult::Applied);
907
908 let result2 = mgr.apply_event(event).unwrap();
910 assert_eq!(result2, ApplyResult::Ignored);
911 assert_eq!(mgr.stats.events_ignored, 1);
912 }
913
914 #[test]
915 fn test_sync_manager_conflict_resolution() {
916 let node1 = NodeId::new("node1");
917 let node2 = NodeId::new("node2");
918
919 let mut table = SymbolTable::new();
920 table.add_domain(DomainInfo::new("Person", 100)).unwrap();
921
922 let mut mgr = SynchronizationManager::new(node2.clone(), table);
923 mgr.set_resolution_strategy(ConflictResolution::FirstWriteWins);
924
925 let domain = DomainInfo::new("Person", 200);
927 let event_data = serde_json::to_string(&domain).unwrap();
928
929 let mut clock = VectorClock::new();
930 clock.increment(&node1);
931
932 let event = SyncEvent::new(
933 node1,
934 clock,
935 SyncChangeType::DomainAdded,
936 "Person".to_string(),
937 Some(event_data),
938 );
939
940 let result = mgr.apply_event(event).unwrap();
941 assert_eq!(result, ApplyResult::Ignored);
942 assert_eq!(mgr.stats.conflicts_detected, 1);
943 }
944
945 #[test]
946 fn test_sync_manager_full_synchronization() {
947 let node1 = NodeId::new("node1");
948 let node2 = NodeId::new("node2");
949
950 let table1 = SymbolTable::new();
951 let mut mgr1 = SynchronizationManager::new(node1, table1);
952
953 let table2 = SymbolTable::new();
954 let mut mgr2 = SynchronizationManager::new(node2, table2);
955
956 mgr1.add_domain(DomainInfo::new("Person", 100)).unwrap();
958 mgr1.add_domain(DomainInfo::new("Place", 50)).unwrap();
959
960 let events = mgr1.pending_events();
962 assert_eq!(events.len(), 2);
963
964 for event in events {
965 mgr2.apply_event(event).unwrap();
966 }
967
968 assert!(mgr2.table().get_domain("Person").is_some());
970 assert!(mgr2.table().get_domain("Place").is_some());
971 assert_eq!(mgr2.stats.events_applied, 2);
972 }
973
974 #[test]
975 fn test_conflict_resolution_strategies() {
976 for strategy in &[
977 ConflictResolution::LastWriteWins,
978 ConflictResolution::FirstWriteWins,
979 ConflictResolution::Manual,
980 ConflictResolution::Merge,
981 ConflictResolution::VectorClock,
982 ] {
983 let node = NodeId::new("test");
984 let table = SymbolTable::new();
985 let mut mgr = SynchronizationManager::new(node, table);
986 mgr.set_resolution_strategy(*strategy);
987 }
989 }
990
991 #[test]
992 fn test_register_nodes() {
993 let node1 = NodeId::new("node1");
994 let node2 = NodeId::new("node2");
995 let node3 = NodeId::new("node3");
996
997 let table = SymbolTable::new();
998 let mut mgr = SynchronizationManager::new(node1, table);
999
1000 mgr.register_node(node2.clone());
1001 mgr.register_node(node3.clone());
1002
1003 assert_eq!(mgr.known_nodes.len(), 2);
1004 assert!(mgr.known_nodes.contains(&node2));
1005 assert!(mgr.known_nodes.contains(&node3));
1006 }
1007}