Skip to main content

tensorlogic_adapters/
synchronization.rs

1//! Distributed schema synchronization for multi-node deployments.
2//!
3//! This module provides a comprehensive system for synchronizing schemas across
4//! multiple nodes in a distributed system. It includes:
5//!
6//! - **Vector clocks** for causality tracking
7//! - **Conflict resolution** strategies (LWW, merge, manual)
8//! - **Event-based synchronization** with listeners
9//! - **Consensus mechanisms** for consistency
10//! - **Node discovery** and health checking
11//!
12//! # Example
13//!
14//! ```rust
15//! use tensorlogic_adapters::{
16//!     SymbolTable, DomainInfo, SynchronizationManager,
17//!     NodeId, ConflictResolution, SyncProtocol,
18//! };
19//!
20//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
21//! // Create a sync manager for this node
22//! let node_id = NodeId::new("node-1");
23//! let mut table = SymbolTable::new();
24//! let mut sync_mgr = SynchronizationManager::new(node_id.clone(), table);
25//!
26//! // Add a domain (will be synchronized)
27//! sync_mgr.add_domain(DomainInfo::new("Person", 100))?;
28//!
29//! // Get sync events to broadcast
30//! let events = sync_mgr.pending_events();
31//! println!("Pending sync events: {}", events.len());
32//!
33//! // Apply events from another node
34//! // let event = receive_event_from_network();
35//! // sync_mgr.apply_event(event)?;
36//! # Ok(())
37//! # }
38//! ```
39
40use crate::{AdapterError, DomainInfo, PredicateInfo, SymbolTable};
41use serde::{Deserialize, Serialize};
42use std::collections::{HashMap, HashSet, VecDeque};
43use std::sync::{Arc, RwLock};
44use std::time::{SystemTime, UNIX_EPOCH};
45
46/// Unique identifier for a node in the distributed system.
47#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct NodeId(String);
49
50impl NodeId {
51    /// Create a new node ID from a string.
52    pub fn new(id: impl Into<String>) -> Self {
53        Self(id.into())
54    }
55
56    /// Get the inner string representation.
57    pub fn as_str(&self) -> &str {
58        &self.0
59    }
60
61    /// Generate a random node ID.
62    pub fn random() -> Self {
63        use std::sync::atomic::{AtomicU64, Ordering};
64        static COUNTER: AtomicU64 = AtomicU64::new(0);
65        let id = COUNTER.fetch_add(1, Ordering::SeqCst);
66        Self(format!("node-{}", id))
67    }
68}
69
70impl std::fmt::Display for NodeId {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        write!(f, "{}", self.0)
73    }
74}
75
76/// Vector clock for tracking causality in distributed systems.
77#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78pub struct VectorClock {
79    clocks: HashMap<NodeId, u64>,
80}
81
82impl VectorClock {
83    /// Create a new vector clock.
84    pub fn new() -> Self {
85        Self {
86            clocks: HashMap::new(),
87        }
88    }
89
90    /// Increment the clock for a specific node.
91    pub fn increment(&mut self, node: &NodeId) {
92        *self.clocks.entry(node.clone()).or_insert(0) += 1;
93    }
94
95    /// Get the current value for a node.
96    pub fn get(&self, node: &NodeId) -> u64 {
97        self.clocks.get(node).copied().unwrap_or(0)
98    }
99
100    /// Merge this clock with another (take maximum of each component).
101    pub fn merge(&mut self, other: &VectorClock) {
102        for (node, &value) in &other.clocks {
103            let current = self.clocks.entry(node.clone()).or_insert(0);
104            *current = (*current).max(value);
105        }
106    }
107
108    /// Check if this clock happened before another.
109    pub fn happens_before(&self, other: &VectorClock) -> bool {
110        let mut strictly_less = false;
111
112        // Check all nodes in self
113        for (node, &self_val) in &self.clocks {
114            let other_val = other.get(node);
115            if self_val > other_val {
116                return false; // Not happened-before
117            }
118            if self_val < other_val {
119                strictly_less = true;
120            }
121        }
122
123        // Check nodes only in other
124        for (node, &other_val) in &other.clocks {
125            if !self.clocks.contains_key(node) && other_val > 0 {
126                strictly_less = true;
127            }
128        }
129
130        strictly_less
131    }
132
133    /// Check if two clocks are concurrent (neither happened before the other).
134    pub fn is_concurrent(&self, other: &VectorClock) -> bool {
135        !self.happens_before(other) && !other.happens_before(self) && self != other
136    }
137}
138
139impl Default for VectorClock {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145/// Type of synchronization change event.
146#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
147pub enum SyncChangeType {
148    /// Domain was added.
149    DomainAdded,
150    /// Domain was modified.
151    DomainModified,
152    /// Domain was removed.
153    DomainRemoved,
154    /// Predicate was added.
155    PredicateAdded,
156    /// Predicate was modified.
157    PredicateModified,
158    /// Predicate was removed.
159    PredicateRemoved,
160    /// Variable binding was added.
161    VariableAdded,
162    /// Variable binding was removed.
163    VariableRemoved,
164}
165
166/// A synchronization event representing a schema change.
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct SyncEvent {
169    /// Unique event ID.
170    pub id: String,
171    /// Node that originated this event.
172    pub origin: NodeId,
173    /// Vector clock for causality tracking.
174    pub clock: VectorClock,
175    /// Type of change.
176    pub change_type: SyncChangeType,
177    /// Name of the affected entity.
178    pub entity_name: String,
179    /// Serialized entity data (JSON).
180    pub entity_data: Option<String>,
181    /// Timestamp when event was created.
182    pub timestamp: u64,
183}
184
185impl SyncEvent {
186    /// Create a new sync event.
187    pub fn new(
188        origin: NodeId,
189        clock: VectorClock,
190        change_type: SyncChangeType,
191        entity_name: String,
192        entity_data: Option<String>,
193    ) -> Self {
194        let id = format!(
195            "{}-{}-{}",
196            origin.as_str(),
197            clock.get(&origin),
198            Self::current_timestamp()
199        );
200
201        Self {
202            id,
203            origin,
204            clock,
205            change_type,
206            entity_name,
207            entity_data,
208            timestamp: Self::current_timestamp(),
209        }
210    }
211
212    fn current_timestamp() -> u64 {
213        SystemTime::now()
214            .duration_since(UNIX_EPOCH)
215            .unwrap()
216            .as_millis() as u64
217    }
218}
219
220/// Conflict resolution strategy.
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
222pub enum ConflictResolution {
223    /// Last write wins (based on timestamp).
224    LastWriteWins,
225    /// First write wins (ignore later changes).
226    FirstWriteWins,
227    /// Manual resolution required (returns error).
228    Manual,
229    /// Merge both versions (if possible).
230    Merge,
231    /// Use vector clocks for causality-based resolution.
232    VectorClock,
233}
234
235/// Result of applying a sync event.
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub enum ApplyResult {
238    /// Event was applied successfully.
239    Applied,
240    /// Event was ignored (duplicate or outdated).
241    Ignored,
242    /// Conflict detected and resolved.
243    ConflictResolved,
244    /// Manual resolution required.
245    ManualRequired,
246}
247
248/// Protocol for transmitting sync events.
249pub trait SyncProtocol: Send + Sync {
250    /// Send an event to a specific node.
251    fn send_event(&self, target: &NodeId, event: &SyncEvent) -> Result<(), AdapterError>;
252
253    /// Broadcast an event to all nodes.
254    fn broadcast_event(&self, event: &SyncEvent) -> Result<(), AdapterError>;
255
256    /// Receive pending events from the network.
257    fn receive_events(&self) -> Result<Vec<SyncEvent>, AdapterError>;
258}
259
260/// In-memory sync protocol for testing and single-process scenarios.
261#[derive(Debug, Clone)]
262pub struct InMemorySyncProtocol {
263    events: Arc<RwLock<VecDeque<SyncEvent>>>,
264}
265
266impl InMemorySyncProtocol {
267    /// Create a new in-memory protocol.
268    pub fn new() -> Self {
269        Self {
270            events: Arc::new(RwLock::new(VecDeque::new())),
271        }
272    }
273}
274
275impl Default for InMemorySyncProtocol {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281impl SyncProtocol for InMemorySyncProtocol {
282    fn send_event(&self, _target: &NodeId, event: &SyncEvent) -> Result<(), AdapterError> {
283        self.events
284            .write()
285            .map_err(|e| AdapterError::InvalidOperation(format!("Lock poisoned: {}", e)))?
286            .push_back(event.clone());
287        Ok(())
288    }
289
290    fn broadcast_event(&self, event: &SyncEvent) -> Result<(), AdapterError> {
291        self.events
292            .write()
293            .map_err(|e| AdapterError::InvalidOperation(format!("Lock poisoned: {}", e)))?
294            .push_back(event.clone());
295        Ok(())
296    }
297
298    fn receive_events(&self) -> Result<Vec<SyncEvent>, AdapterError> {
299        let mut events = self
300            .events
301            .write()
302            .map_err(|e| AdapterError::InvalidOperation(format!("Lock poisoned: {}", e)))?;
303        Ok(events.drain(..).collect())
304    }
305}
306
307/// Event listener for synchronization events.
308pub trait EventListener: Send + Sync {
309    /// Called when an event is about to be applied.
310    fn on_event_received(&self, event: &SyncEvent);
311
312    /// Called after an event was successfully applied.
313    fn on_event_applied(&self, event: &SyncEvent, result: &ApplyResult);
314
315    /// Called when a conflict is detected.
316    fn on_conflict_detected(&self, event: &SyncEvent, conflict_type: &str);
317}
318
319/// Statistics about synchronization operations.
320#[derive(Debug, Clone, Default, Serialize, Deserialize)]
321pub struct SyncStatistics {
322    /// Number of events sent.
323    pub events_sent: usize,
324    /// Number of events received.
325    pub events_received: usize,
326    /// Number of events applied.
327    pub events_applied: usize,
328    /// Number of events ignored.
329    pub events_ignored: usize,
330    /// Number of conflicts detected.
331    pub conflicts_detected: usize,
332    /// Number of conflicts resolved automatically.
333    pub conflicts_resolved: usize,
334    /// Number of manual resolutions required.
335    pub manual_resolutions_required: usize,
336}
337
338/// Manager for distributed schema synchronization.
339pub struct SynchronizationManager {
340    /// This node's ID.
341    node_id: NodeId,
342    /// The symbol table being synchronized.
343    table: SymbolTable,
344    /// Vector clock for this node.
345    clock: VectorClock,
346    /// Conflict resolution strategy.
347    resolution_strategy: ConflictResolution,
348    /// Events pending broadcast.
349    pending_events: VecDeque<SyncEvent>,
350    /// Applied event IDs (for deduplication).
351    applied_events: HashSet<String>,
352    /// Event listeners.
353    listeners: Vec<Arc<dyn EventListener>>,
354    /// Synchronization statistics.
355    stats: SyncStatistics,
356    /// Known remote nodes.
357    known_nodes: HashSet<NodeId>,
358}
359
360impl SynchronizationManager {
361    /// Create a new synchronization manager.
362    pub fn new(node_id: NodeId, table: SymbolTable) -> Self {
363        let mut clock = VectorClock::new();
364        clock.increment(&node_id);
365
366        Self {
367            node_id,
368            table,
369            clock,
370            resolution_strategy: ConflictResolution::VectorClock,
371            pending_events: VecDeque::new(),
372            applied_events: HashSet::new(),
373            listeners: Vec::new(),
374            stats: SyncStatistics::default(),
375            known_nodes: HashSet::new(),
376        }
377    }
378
379    /// Set the conflict resolution strategy.
380    pub fn set_resolution_strategy(&mut self, strategy: ConflictResolution) {
381        self.resolution_strategy = strategy;
382    }
383
384    /// Add an event listener.
385    pub fn add_listener(&mut self, listener: Arc<dyn EventListener>) {
386        self.listeners.push(listener);
387    }
388
389    /// Register a remote node.
390    pub fn register_node(&mut self, node_id: NodeId) {
391        self.known_nodes.insert(node_id);
392    }
393
394    /// Get the current symbol table.
395    pub fn table(&self) -> &SymbolTable {
396        &self.table
397    }
398
399    /// Get a mutable reference to the symbol table.
400    pub fn table_mut(&mut self) -> &mut SymbolTable {
401        &mut self.table
402    }
403
404    /// Get pending events for broadcasting.
405    pub fn pending_events(&self) -> Vec<SyncEvent> {
406        self.pending_events.iter().cloned().collect()
407    }
408
409    /// Clear pending events (after successful broadcast).
410    pub fn clear_pending_events(&mut self) {
411        self.pending_events.clear();
412    }
413
414    /// Get synchronization statistics.
415    pub fn statistics(&self) -> &SyncStatistics {
416        &self.stats
417    }
418
419    /// Add a domain and generate sync event.
420    pub fn add_domain(&mut self, domain: DomainInfo) -> Result<(), AdapterError> {
421        let name = domain.name.clone();
422        self.table
423            .add_domain(domain.clone())
424            .map_err(|e| AdapterError::InvalidOperation(format!("Add domain failed: {}", e)))?;
425
426        // Increment clock and create event
427        self.clock.increment(&self.node_id);
428        let entity_data = serde_json::to_string(&domain)
429            .map_err(|e| AdapterError::InvalidOperation(format!("Serialization error: {}", e)))?;
430
431        let event = SyncEvent::new(
432            self.node_id.clone(),
433            self.clock.clone(),
434            SyncChangeType::DomainAdded,
435            name,
436            Some(entity_data),
437        );
438
439        self.pending_events.push_back(event.clone());
440        self.applied_events.insert(event.id.clone());
441        self.stats.events_sent += 1;
442
443        Ok(())
444    }
445
446    /// Add a predicate and generate sync event.
447    pub fn add_predicate(&mut self, predicate: PredicateInfo) -> Result<(), AdapterError> {
448        let name = predicate.name.clone();
449        self.table
450            .add_predicate(predicate.clone())
451            .map_err(|e| AdapterError::InvalidOperation(format!("Add predicate failed: {}", e)))?;
452
453        // Increment clock and create event
454        self.clock.increment(&self.node_id);
455        let entity_data = serde_json::to_string(&predicate)
456            .map_err(|e| AdapterError::InvalidOperation(format!("Serialization error: {}", e)))?;
457
458        let event = SyncEvent::new(
459            self.node_id.clone(),
460            self.clock.clone(),
461            SyncChangeType::PredicateAdded,
462            name,
463            Some(entity_data),
464        );
465
466        self.pending_events.push_back(event.clone());
467        self.applied_events.insert(event.id.clone());
468        self.stats.events_sent += 1;
469
470        Ok(())
471    }
472
473    /// Remove a domain and generate sync event.
474    pub fn remove_domain(&mut self, name: &str) -> Result<(), AdapterError> {
475        // Check if domain exists
476        if self.table.get_domain(name).is_none() {
477            return Err(AdapterError::DomainNotFound(name.to_string()));
478        }
479
480        // Note: SymbolTable doesn't have remove_domain, so we'll just generate the event
481        // In a real implementation, SymbolTable would need a remove method
482
483        self.clock.increment(&self.node_id);
484        let event = SyncEvent::new(
485            self.node_id.clone(),
486            self.clock.clone(),
487            SyncChangeType::DomainRemoved,
488            name.to_string(),
489            None,
490        );
491
492        self.pending_events.push_back(event.clone());
493        self.applied_events.insert(event.id.clone());
494        self.stats.events_sent += 1;
495
496        Ok(())
497    }
498
499    /// Apply a sync event from another node.
500    pub fn apply_event(&mut self, event: SyncEvent) -> Result<ApplyResult, AdapterError> {
501        self.stats.events_received += 1;
502
503        // Notify listeners
504        for listener in &self.listeners {
505            listener.on_event_received(&event);
506        }
507
508        // Check for duplicate
509        if self.applied_events.contains(&event.id) {
510            self.stats.events_ignored += 1;
511            return Ok(ApplyResult::Ignored);
512        }
513
514        // Merge vector clocks
515        self.clock.merge(&event.clock);
516
517        // Apply the event based on type
518        let result = match event.change_type {
519            SyncChangeType::DomainAdded => self.apply_domain_added(&event)?,
520            SyncChangeType::DomainModified => self.apply_domain_modified(&event)?,
521            SyncChangeType::DomainRemoved => self.apply_domain_removed(&event)?,
522            SyncChangeType::PredicateAdded => self.apply_predicate_added(&event)?,
523            SyncChangeType::PredicateModified => self.apply_predicate_modified(&event)?,
524            SyncChangeType::PredicateRemoved => self.apply_predicate_removed(&event)?,
525            SyncChangeType::VariableAdded => self.apply_variable_added(&event)?,
526            SyncChangeType::VariableRemoved => self.apply_variable_removed(&event)?,
527        };
528
529        // Mark as applied
530        self.applied_events.insert(event.id.clone());
531
532        // Update stats
533        match result {
534            ApplyResult::Applied => self.stats.events_applied += 1,
535            ApplyResult::Ignored => self.stats.events_ignored += 1,
536            ApplyResult::ConflictResolved => {
537                self.stats.conflicts_resolved += 1;
538                self.stats.events_applied += 1;
539            }
540            ApplyResult::ManualRequired => self.stats.manual_resolutions_required += 1,
541        }
542
543        // Notify listeners
544        for listener in &self.listeners {
545            listener.on_event_applied(&event, &result);
546        }
547
548        Ok(result)
549    }
550
551    fn apply_domain_added(&mut self, event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
552        // Check if domain already exists
553        if let Some(_existing) = self.table.get_domain(&event.entity_name) {
554            self.stats.conflicts_detected += 1;
555
556            // Notify listeners of conflict
557            for listener in &self.listeners {
558                listener.on_conflict_detected(event, "domain_already_exists");
559            }
560
561            // Apply conflict resolution
562            match self.resolution_strategy {
563                ConflictResolution::LastWriteWins => {
564                    // Replace with new domain if timestamp is later
565                    // Note: Would need to implement domain replacement in SymbolTable
566                    Ok(ApplyResult::ConflictResolved)
567                }
568                ConflictResolution::FirstWriteWins => {
569                    // Keep existing, ignore new
570                    Ok(ApplyResult::Ignored)
571                }
572                ConflictResolution::Manual => Ok(ApplyResult::ManualRequired),
573                ConflictResolution::Merge | ConflictResolution::VectorClock => {
574                    // For now, keep existing
575                    Ok(ApplyResult::ConflictResolved)
576                }
577            }
578        } else {
579            // No conflict, add the domain
580            let entity_data = event
581                .entity_data
582                .as_ref()
583                .ok_or_else(|| AdapterError::InvalidOperation("Missing entity data".to_string()))?;
584
585            let domain: DomainInfo = serde_json::from_str(entity_data).map_err(|e| {
586                AdapterError::InvalidOperation(format!("Deserialization error: {}", e))
587            })?;
588
589            self.table
590                .add_domain(domain)
591                .map_err(|e| AdapterError::InvalidOperation(format!("Add domain failed: {}", e)))?;
592            Ok(ApplyResult::Applied)
593        }
594    }
595
596    fn apply_domain_modified(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
597        // Domain modification would require SymbolTable to support updates
598        // For now, return as applied
599        Ok(ApplyResult::Applied)
600    }
601
602    fn apply_domain_removed(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
603        // Domain removal would require SymbolTable to support deletion
604        // For now, return as applied
605        Ok(ApplyResult::Applied)
606    }
607
608    fn apply_predicate_added(&mut self, event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
609        // Check if predicate already exists
610        if self.table.get_predicate(&event.entity_name).is_some() {
611            self.stats.conflicts_detected += 1;
612
613            for listener in &self.listeners {
614                listener.on_conflict_detected(event, "predicate_already_exists");
615            }
616
617            match self.resolution_strategy {
618                ConflictResolution::FirstWriteWins => Ok(ApplyResult::Ignored),
619                ConflictResolution::Manual => Ok(ApplyResult::ManualRequired),
620                _ => Ok(ApplyResult::ConflictResolved),
621            }
622        } else {
623            let entity_data = event
624                .entity_data
625                .as_ref()
626                .ok_or_else(|| AdapterError::InvalidOperation("Missing entity data".to_string()))?;
627
628            let predicate: PredicateInfo = serde_json::from_str(entity_data).map_err(|e| {
629                AdapterError::InvalidOperation(format!("Deserialization error: {}", e))
630            })?;
631
632            self.table.add_predicate(predicate).map_err(|e| {
633                AdapterError::InvalidOperation(format!("Add predicate failed: {}", e))
634            })?;
635            Ok(ApplyResult::Applied)
636        }
637    }
638
639    fn apply_predicate_modified(
640        &mut self,
641        _event: &SyncEvent,
642    ) -> Result<ApplyResult, AdapterError> {
643        Ok(ApplyResult::Applied)
644    }
645
646    fn apply_predicate_removed(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
647        Ok(ApplyResult::Applied)
648    }
649
650    fn apply_variable_added(&mut self, event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
651        // Parse variable binding from entity data
652        if let Some(data) = &event.entity_data {
653            let parts: Vec<&str> = data.split(':').collect();
654            if parts.len() == 2 {
655                let var_name = parts[0];
656                let domain_name = parts[1];
657
658                if self.table.bind_variable(var_name, domain_name).is_err() {
659                    // Variable might already be bound
660                    Ok(ApplyResult::Ignored)
661                } else {
662                    Ok(ApplyResult::Applied)
663                }
664            } else {
665                Err(AdapterError::InvalidOperation(
666                    "Invalid variable data format".to_string(),
667                ))
668            }
669        } else {
670            Err(AdapterError::InvalidOperation(
671                "Missing entity data for variable".to_string(),
672            ))
673        }
674    }
675
676    fn apply_variable_removed(&mut self, _event: &SyncEvent) -> Result<ApplyResult, AdapterError> {
677        Ok(ApplyResult::Applied)
678    }
679
680    /// Synchronize with another node using a protocol.
681    pub fn synchronize<P: SyncProtocol>(&mut self, protocol: &P) -> Result<(), AdapterError> {
682        // Send pending events
683        for event in &self.pending_events {
684            protocol.broadcast_event(event)?;
685        }
686        self.pending_events.clear();
687
688        // Receive and apply events
689        let events = protocol.receive_events()?;
690        for event in events {
691            self.apply_event(event)?;
692        }
693
694        Ok(())
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701
702    #[test]
703    fn test_node_id_creation() {
704        let node = NodeId::new("test-node");
705        assert_eq!(node.as_str(), "test-node");
706        assert_eq!(node.to_string(), "test-node");
707    }
708
709    #[test]
710    fn test_node_id_random() {
711        let node1 = NodeId::random();
712        let node2 = NodeId::random();
713        assert_ne!(node1, node2);
714    }
715
716    #[test]
717    fn test_vector_clock_basics() {
718        let mut clock = VectorClock::new();
719        let node1 = NodeId::new("node1");
720        let node2 = NodeId::new("node2");
721
722        assert_eq!(clock.get(&node1), 0);
723
724        clock.increment(&node1);
725        assert_eq!(clock.get(&node1), 1);
726
727        clock.increment(&node1);
728        assert_eq!(clock.get(&node1), 2);
729
730        clock.increment(&node2);
731        assert_eq!(clock.get(&node2), 1);
732    }
733
734    #[test]
735    fn test_vector_clock_merge() {
736        let node1 = NodeId::new("node1");
737        let node2 = NodeId::new("node2");
738
739        let mut clock1 = VectorClock::new();
740        clock1.increment(&node1);
741        clock1.increment(&node1);
742
743        let mut clock2 = VectorClock::new();
744        clock2.increment(&node2);
745
746        clock1.merge(&clock2);
747        assert_eq!(clock1.get(&node1), 2);
748        assert_eq!(clock1.get(&node2), 1);
749    }
750
751    #[test]
752    fn test_vector_clock_happens_before() {
753        let node1 = NodeId::new("node1");
754
755        let mut clock1 = VectorClock::new();
756        clock1.increment(&node1);
757
758        let mut clock2 = VectorClock::new();
759        clock2.increment(&node1);
760        clock2.increment(&node1);
761
762        assert!(clock1.happens_before(&clock2));
763        assert!(!clock2.happens_before(&clock1));
764    }
765
766    #[test]
767    fn test_vector_clock_concurrent() {
768        let node1 = NodeId::new("node1");
769        let node2 = NodeId::new("node2");
770
771        let mut clock1 = VectorClock::new();
772        clock1.increment(&node1);
773
774        let mut clock2 = VectorClock::new();
775        clock2.increment(&node2);
776
777        assert!(clock1.is_concurrent(&clock2));
778        assert!(clock2.is_concurrent(&clock1));
779    }
780
781    #[test]
782    fn test_sync_event_creation() {
783        let node = NodeId::new("test-node");
784        let clock = VectorClock::new();
785
786        let event = SyncEvent::new(
787            node.clone(),
788            clock,
789            SyncChangeType::DomainAdded,
790            "Person".to_string(),
791            Some("{}".to_string()),
792        );
793
794        assert_eq!(event.origin, node);
795        assert_eq!(event.entity_name, "Person");
796        assert!(event.timestamp > 0);
797    }
798
799    #[test]
800    fn test_in_memory_protocol() {
801        let protocol = InMemorySyncProtocol::new();
802        let node = NodeId::new("test");
803        let clock = VectorClock::new();
804
805        let event = SyncEvent::new(
806            node.clone(),
807            clock,
808            SyncChangeType::DomainAdded,
809            "Person".to_string(),
810            None,
811        );
812
813        protocol.send_event(&node, &event).unwrap();
814
815        let received = protocol.receive_events().unwrap();
816        assert_eq!(received.len(), 1);
817        assert_eq!(received[0].entity_name, "Person");
818    }
819
820    #[test]
821    fn test_sync_manager_creation() {
822        let node = NodeId::new("test");
823        let table = SymbolTable::new();
824        let mgr = SynchronizationManager::new(node.clone(), table);
825
826        assert_eq!(mgr.node_id, node);
827        assert_eq!(mgr.stats.events_sent, 0);
828    }
829
830    #[test]
831    fn test_sync_manager_add_domain() {
832        let node = NodeId::new("test");
833        let table = SymbolTable::new();
834        let mut mgr = SynchronizationManager::new(node, table);
835
836        let domain = DomainInfo::new("Person", 100);
837        mgr.add_domain(domain).unwrap();
838
839        assert_eq!(mgr.pending_events().len(), 1);
840        assert_eq!(mgr.stats.events_sent, 1);
841        assert!(mgr.table().get_domain("Person").is_some());
842    }
843
844    #[test]
845    fn test_sync_manager_add_predicate() {
846        let node = NodeId::new("test");
847        let mut table = SymbolTable::new();
848        table.add_domain(DomainInfo::new("Person", 100)).unwrap();
849
850        let mut mgr = SynchronizationManager::new(node, table);
851
852        let predicate = PredicateInfo::new("knows", vec!["Person".to_string()]);
853        mgr.add_predicate(predicate).unwrap();
854
855        assert_eq!(mgr.pending_events().len(), 1);
856        assert_eq!(mgr.stats.events_sent, 1);
857    }
858
859    #[test]
860    fn test_sync_manager_apply_domain_event() {
861        let node1 = NodeId::new("node1");
862        let node2 = NodeId::new("node2");
863
864        let table1 = SymbolTable::new();
865        let mut mgr1 = SynchronizationManager::new(node1, table1);
866
867        let table2 = SymbolTable::new();
868        let mut mgr2 = SynchronizationManager::new(node2, table2);
869
870        // Node 1 adds a domain
871        let domain = DomainInfo::new("Person", 100);
872        mgr1.add_domain(domain).unwrap();
873
874        // Get event and apply to node 2
875        let events = mgr1.pending_events();
876        let result = mgr2.apply_event(events[0].clone()).unwrap();
877
878        assert_eq!(result, ApplyResult::Applied);
879        assert!(mgr2.table().get_domain("Person").is_some());
880        assert_eq!(mgr2.stats.events_received, 1);
881        assert_eq!(mgr2.stats.events_applied, 1);
882    }
883
884    #[test]
885    fn test_sync_manager_duplicate_event() {
886        let node = NodeId::new("node1");
887        let table = SymbolTable::new();
888        let mut mgr = SynchronizationManager::new(node.clone(), table);
889
890        let domain = DomainInfo::new("Person", 100);
891        let event_data = serde_json::to_string(&domain).unwrap();
892
893        let mut clock = VectorClock::new();
894        clock.increment(&node);
895
896        let event = SyncEvent::new(
897            node,
898            clock,
899            SyncChangeType::DomainAdded,
900            "Person".to_string(),
901            Some(event_data),
902        );
903
904        // Apply first time
905        let result1 = mgr.apply_event(event.clone()).unwrap();
906        assert_eq!(result1, ApplyResult::Applied);
907
908        // Apply again (duplicate)
909        let result2 = mgr.apply_event(event).unwrap();
910        assert_eq!(result2, ApplyResult::Ignored);
911        assert_eq!(mgr.stats.events_ignored, 1);
912    }
913
914    #[test]
915    fn test_sync_manager_conflict_resolution() {
916        let node1 = NodeId::new("node1");
917        let node2 = NodeId::new("node2");
918
919        let mut table = SymbolTable::new();
920        table.add_domain(DomainInfo::new("Person", 100)).unwrap();
921
922        let mut mgr = SynchronizationManager::new(node2.clone(), table);
923        mgr.set_resolution_strategy(ConflictResolution::FirstWriteWins);
924
925        // Try to add same domain from another node
926        let domain = DomainInfo::new("Person", 200);
927        let event_data = serde_json::to_string(&domain).unwrap();
928
929        let mut clock = VectorClock::new();
930        clock.increment(&node1);
931
932        let event = SyncEvent::new(
933            node1,
934            clock,
935            SyncChangeType::DomainAdded,
936            "Person".to_string(),
937            Some(event_data),
938        );
939
940        let result = mgr.apply_event(event).unwrap();
941        assert_eq!(result, ApplyResult::Ignored);
942        assert_eq!(mgr.stats.conflicts_detected, 1);
943    }
944
945    #[test]
946    fn test_sync_manager_full_synchronization() {
947        let node1 = NodeId::new("node1");
948        let node2 = NodeId::new("node2");
949
950        let table1 = SymbolTable::new();
951        let mut mgr1 = SynchronizationManager::new(node1, table1);
952
953        let table2 = SymbolTable::new();
954        let mut mgr2 = SynchronizationManager::new(node2, table2);
955
956        // Node 1 adds domains
957        mgr1.add_domain(DomainInfo::new("Person", 100)).unwrap();
958        mgr1.add_domain(DomainInfo::new("Place", 50)).unwrap();
959
960        // Get events from node 1 and apply to node 2
961        let events = mgr1.pending_events();
962        assert_eq!(events.len(), 2);
963
964        for event in events {
965            mgr2.apply_event(event).unwrap();
966        }
967
968        // Verify node 2 received both domains
969        assert!(mgr2.table().get_domain("Person").is_some());
970        assert!(mgr2.table().get_domain("Place").is_some());
971        assert_eq!(mgr2.stats.events_applied, 2);
972    }
973
974    #[test]
975    fn test_conflict_resolution_strategies() {
976        for strategy in &[
977            ConflictResolution::LastWriteWins,
978            ConflictResolution::FirstWriteWins,
979            ConflictResolution::Manual,
980            ConflictResolution::Merge,
981            ConflictResolution::VectorClock,
982        ] {
983            let node = NodeId::new("test");
984            let table = SymbolTable::new();
985            let mut mgr = SynchronizationManager::new(node, table);
986            mgr.set_resolution_strategy(*strategy);
987            // Just verify it doesn't panic
988        }
989    }
990
991    #[test]
992    fn test_register_nodes() {
993        let node1 = NodeId::new("node1");
994        let node2 = NodeId::new("node2");
995        let node3 = NodeId::new("node3");
996
997        let table = SymbolTable::new();
998        let mut mgr = SynchronizationManager::new(node1, table);
999
1000        mgr.register_node(node2.clone());
1001        mgr.register_node(node3.clone());
1002
1003        assert_eq!(mgr.known_nodes.len(), 2);
1004        assert!(mgr.known_nodes.contains(&node2));
1005        assert!(mgr.known_nodes.contains(&node3));
1006    }
1007}