leptos_sync_core/crdt/advanced/
dag.rs

1//! DAG (Directed Acyclic Graph) for complex relationships
2
3use super::common::{PositionId, AdvancedCrdtError};
4use super::super::{CRDT, Mergeable, ReplicaId};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8/// DAG (Directed Acyclic Graph) node
9#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
10pub struct DagNode<T> {
11    /// Unique node identifier
12    pub id: PositionId,
13    /// Node value
14    pub value: T,
15    /// Incoming edges (dependencies)
16    pub incoming: HashSet<PositionId>,
17    /// Outgoing edges (dependents)
18    pub outgoing: HashSet<PositionId>,
19    /// Whether the node is visible (not deleted)
20    pub visible: bool,
21}
22
23impl<T> DagNode<T> {
24    /// Create a new DAG node
25    pub fn new(id: PositionId, value: T) -> Self {
26        Self {
27            id,
28            value,
29            incoming: HashSet::new(),
30            outgoing: HashSet::new(),
31            visible: true,
32        }
33    }
34}
35
36/// DAG (Directed Acyclic Graph) for complex relationships
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub struct Dag<T> {
39    /// Replica ID
40    replica_id: ReplicaId,
41    /// Nodes indexed by ID
42    nodes: HashMap<PositionId, DagNode<T>>,
43    /// Logical timestamp counter
44    timestamp_counter: u64,
45    /// Disambiguation counter
46    disambiguation_counter: u64,
47}
48
49impl<T: Clone + PartialEq> Dag<T> {
50    /// Create a new DAG
51    pub fn new(replica_id: ReplicaId) -> Self {
52        Self {
53            replica_id,
54            nodes: HashMap::new(),
55            timestamp_counter: 0,
56            disambiguation_counter: 0,
57        }
58    }
59    
60    /// Add a node
61    pub fn add_node(&mut self, value: T) -> Result<PositionId, AdvancedCrdtError> {
62        self.timestamp_counter += 1;
63        self.disambiguation_counter += 1;
64        
65        let id = PositionId::new(
66            self.replica_id.clone(),
67            self.timestamp_counter,
68            self.disambiguation_counter,
69        );
70        
71        let node = DagNode::new(id.clone(), value);
72        self.nodes.insert(id.clone(), node);
73        
74        Ok(id)
75    }
76    
77    /// Add an edge from source to target
78    pub fn add_edge(&mut self, from: &PositionId, to: &PositionId) -> Result<(), AdvancedCrdtError> {
79        if !self.nodes.contains_key(from) || !self.nodes.contains_key(to) {
80            return Err(AdvancedCrdtError::ElementNotFound("Node not found".to_string()));
81        }
82        
83        // Check for cycle
84        if self.would_create_cycle(from, to) {
85            return Err(AdvancedCrdtError::CycleDetected("Adding edge would create cycle".to_string()));
86        }
87        
88        // Add edge
89        if let Some(from_node) = self.nodes.get_mut(from) {
90            from_node.outgoing.insert(to.clone());
91        }
92        if let Some(to_node) = self.nodes.get_mut(to) {
93            to_node.incoming.insert(from.clone());
94        }
95        
96        Ok(())
97    }
98    
99    /// Remove an edge
100    pub fn remove_edge(&mut self, from: &PositionId, to: &PositionId) -> Result<(), AdvancedCrdtError> {
101        if let Some(from_node) = self.nodes.get_mut(from) {
102            from_node.outgoing.remove(to);
103        }
104        if let Some(to_node) = self.nodes.get_mut(to) {
105            to_node.incoming.remove(from);
106        }
107        
108        Ok(())
109    }
110    
111    /// Delete a node
112    pub fn delete_node(&mut self, node_id: &PositionId) -> Result<(), AdvancedCrdtError> {
113        if let Some(node) = self.nodes.get(node_id) {
114            let incoming_edges = node.incoming.clone();
115            let outgoing_edges = node.outgoing.clone();
116            
117            // Mark node as invisible
118            if let Some(node) = self.nodes.get_mut(node_id) {
119                node.visible = false;
120            }
121            
122            // Remove all edges involving this node
123            for incoming in &incoming_edges {
124                if let Some(incoming_node) = self.nodes.get_mut(incoming) {
125                    incoming_node.outgoing.remove(node_id);
126                }
127            }
128            for outgoing in &outgoing_edges {
129                if let Some(outgoing_node) = self.nodes.get_mut(outgoing) {
130                    outgoing_node.incoming.remove(node_id);
131                }
132            }
133            
134            Ok(())
135        } else {
136            Err(AdvancedCrdtError::ElementNotFound(format!("Node {:?}", node_id)))
137        }
138    }
139    
140    /// Check if adding an edge would create a cycle
141    fn would_create_cycle(&self, from: &PositionId, to: &PositionId) -> bool {
142        if from == to {
143            return true;
144        }
145        
146        // Use DFS to check for path from 'to' to 'from'
147        let mut visited = HashSet::new();
148        self.dfs_cycle_check(to, from, &mut visited)
149    }
150    
151    /// DFS helper for cycle detection
152    fn dfs_cycle_check(&self, current: &PositionId, target: &PositionId, visited: &mut HashSet<PositionId>) -> bool {
153        if current == target {
154            return true;
155        }
156        
157        if visited.contains(current) {
158            return false;
159        }
160        
161        visited.insert(current.clone());
162        
163        if let Some(node) = self.nodes.get(current) {
164            for next in &node.outgoing {
165                if self.dfs_cycle_check(next, target, visited) {
166                    return true;
167                }
168            }
169        }
170        
171        false
172    }
173    
174    /// Get topological sort of the DAG
175    pub fn topological_sort(&self) -> Vec<PositionId> {
176        let mut result = Vec::new();
177        let mut visited = HashSet::new();
178        
179        for node_id in self.nodes.keys() {
180            if !visited.contains(node_id) {
181                self.dfs_topological(node_id, &mut visited, &mut result);
182            }
183        }
184        
185        result.reverse();
186        result
187    }
188    
189    /// DFS helper for topological sort
190    fn dfs_topological(&self, node_id: &PositionId, visited: &mut HashSet<PositionId>, result: &mut Vec<PositionId>) {
191        if visited.contains(node_id) {
192            return;
193        }
194        
195        visited.insert(node_id.clone());
196        
197        if let Some(node) = self.nodes.get(node_id) {
198            for next in &node.outgoing {
199                self.dfs_topological(next, visited, result);
200            }
201        }
202        
203        result.push(node_id.clone());
204    }
205    
206    /// Get node count
207    pub fn len(&self) -> usize {
208        self.nodes.len()
209    }
210    
211    /// Check if DAG is empty
212    pub fn is_empty(&self) -> bool {
213        self.nodes.is_empty()
214    }
215    
216    /// Get all nodes (for debugging/inspection)
217    pub fn get_nodes(&self) -> &HashMap<PositionId, DagNode<T>> {
218        &self.nodes
219    }
220}
221
222impl<T: Clone + PartialEq> CRDT for Dag<T> {
223    fn replica_id(&self) -> &ReplicaId {
224        &self.replica_id
225    }
226}
227
228impl<T: Clone + PartialEq + Send + Sync> Mergeable for Dag<T> {
229    type Error = AdvancedCrdtError;
230    
231    fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
232        // Merge all nodes from other DAG
233        for (node_id, other_node) in &other.nodes {
234            if let Some(self_node) = self.nodes.get_mut(node_id) {
235                // Node exists in both, keep the one with higher timestamp
236                if other_node.id.timestamp > self_node.id.timestamp {
237                    *self_node = other_node.clone();
238                }
239            } else {
240                // Node only exists in other, add it
241                self.nodes.insert(node_id.clone(), other_node.clone());
242            }
243        }
244        
245        Ok(())
246    }
247    
248    fn has_conflict(&self, other: &Self) -> bool {
249        // Check for conflicting nodes (same ID, different values)
250        for (node_id, self_node) in &self.nodes {
251            if let Some(other_node) = other.nodes.get(node_id) {
252                if self_node.value != other_node.value {
253                    return true;
254                }
255            }
256        }
257        false
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use super::super::super::ReplicaId;
265    use uuid::Uuid;
266    
267    fn create_replica(id: u64) -> ReplicaId {
268        ReplicaId::from(Uuid::from_u64_pair(0, id))
269    }
270    
271    #[test]
272    fn test_dag_creation() {
273        let replica_id = create_replica(1);
274        let dag = Dag::<String>::new(replica_id.clone());
275        
276        assert_eq!(dag.replica_id(), &replica_id);
277        assert!(dag.is_empty());
278        assert_eq!(dag.len(), 0);
279    }
280    
281    #[test]
282    fn test_dag_operations() {
283        let replica_id = create_replica(1);
284        let mut dag = Dag::<String>::new(replica_id);
285        
286        // Add nodes
287        let node1_id = dag.add_node("node1".to_string()).unwrap();
288        let node2_id = dag.add_node("node2".to_string()).unwrap();
289        let node3_id = dag.add_node("node3".to_string()).unwrap();
290        
291        assert_eq!(dag.len(), 3);
292        
293        // Add edges
294        dag.add_edge(&node1_id, &node2_id).unwrap();
295        dag.add_edge(&node2_id, &node3_id).unwrap();
296        
297        // Test topological sort
298        let sorted = dag.topological_sort();
299        assert_eq!(sorted.len(), 3);
300        assert_eq!(sorted[0], node1_id);
301        assert_eq!(sorted[1], node2_id);
302        assert_eq!(sorted[2], node3_id);
303        
304        // Remove edge
305        dag.remove_edge(&node1_id, &node2_id).unwrap();
306        
307        // Delete node
308        dag.delete_node(&node2_id).unwrap();
309        assert_eq!(dag.len(), 3); // Node still exists but is invisible
310    }
311    
312    #[test]
313    fn test_dag_cycle_detection() {
314        let replica_id = create_replica(1);
315        let mut dag = Dag::<String>::new(replica_id);
316        
317        // Add nodes
318        let node1_id = dag.add_node("node1".to_string()).unwrap();
319        let node2_id = dag.add_node("node2".to_string()).unwrap();
320        let node3_id = dag.add_node("node3".to_string()).unwrap();
321        
322        // Add edges to create a cycle
323        dag.add_edge(&node1_id, &node2_id).unwrap();
324        dag.add_edge(&node2_id, &node3_id).unwrap();
325        
326        // Try to add edge that would create cycle
327        let result = dag.add_edge(&node3_id, &node1_id);
328        assert!(result.is_err());
329        assert_eq!(result.unwrap_err(), AdvancedCrdtError::CycleDetected("Adding edge would create cycle".to_string()));
330    }
331    
332    #[test]
333    fn test_dag_merge() {
334        let replica_id1 = create_replica(1);
335        let replica_id2 = create_replica(2);
336        
337        let mut dag1 = Dag::<String>::new(replica_id1);
338        let mut dag2 = Dag::<String>::new(replica_id2);
339        
340        // Add different nodes
341        let node1_id = dag1.add_node("node1".to_string()).unwrap();
342        let node2_id = dag2.add_node("node2".to_string()).unwrap();
343        
344        // Add edges (no self-loops in DAG)
345        // Just add nodes without edges for merge test
346        
347        // Merge
348        dag1.merge(&dag2).unwrap();
349        
350        // Should contain both nodes
351        assert_eq!(dag1.len(), 2);
352    }
353}