leptos_sync_core/crdt/advanced/
dag.rs1use super::common::{PositionId, AdvancedCrdtError};
4use super::super::{CRDT, Mergeable, ReplicaId};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
10pub struct DagNode<T> {
11 pub id: PositionId,
13 pub value: T,
15 pub incoming: HashSet<PositionId>,
17 pub outgoing: HashSet<PositionId>,
19 pub visible: bool,
21}
22
23impl<T> DagNode<T> {
24 pub fn new(id: PositionId, value: T) -> Self {
26 Self {
27 id,
28 value,
29 incoming: HashSet::new(),
30 outgoing: HashSet::new(),
31 visible: true,
32 }
33 }
34}
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub struct Dag<T> {
39 replica_id: ReplicaId,
41 nodes: HashMap<PositionId, DagNode<T>>,
43 timestamp_counter: u64,
45 disambiguation_counter: u64,
47}
48
49impl<T: Clone + PartialEq> Dag<T> {
50 pub fn new(replica_id: ReplicaId) -> Self {
52 Self {
53 replica_id,
54 nodes: HashMap::new(),
55 timestamp_counter: 0,
56 disambiguation_counter: 0,
57 }
58 }
59
60 pub fn add_node(&mut self, value: T) -> Result<PositionId, AdvancedCrdtError> {
62 self.timestamp_counter += 1;
63 self.disambiguation_counter += 1;
64
65 let id = PositionId::new(
66 self.replica_id.clone(),
67 self.timestamp_counter,
68 self.disambiguation_counter,
69 );
70
71 let node = DagNode::new(id.clone(), value);
72 self.nodes.insert(id.clone(), node);
73
74 Ok(id)
75 }
76
77 pub fn add_edge(&mut self, from: &PositionId, to: &PositionId) -> Result<(), AdvancedCrdtError> {
79 if !self.nodes.contains_key(from) || !self.nodes.contains_key(to) {
80 return Err(AdvancedCrdtError::ElementNotFound("Node not found".to_string()));
81 }
82
83 if self.would_create_cycle(from, to) {
85 return Err(AdvancedCrdtError::CycleDetected("Adding edge would create cycle".to_string()));
86 }
87
88 if let Some(from_node) = self.nodes.get_mut(from) {
90 from_node.outgoing.insert(to.clone());
91 }
92 if let Some(to_node) = self.nodes.get_mut(to) {
93 to_node.incoming.insert(from.clone());
94 }
95
96 Ok(())
97 }
98
99 pub fn remove_edge(&mut self, from: &PositionId, to: &PositionId) -> Result<(), AdvancedCrdtError> {
101 if let Some(from_node) = self.nodes.get_mut(from) {
102 from_node.outgoing.remove(to);
103 }
104 if let Some(to_node) = self.nodes.get_mut(to) {
105 to_node.incoming.remove(from);
106 }
107
108 Ok(())
109 }
110
111 pub fn delete_node(&mut self, node_id: &PositionId) -> Result<(), AdvancedCrdtError> {
113 if let Some(node) = self.nodes.get(node_id) {
114 let incoming_edges = node.incoming.clone();
115 let outgoing_edges = node.outgoing.clone();
116
117 if let Some(node) = self.nodes.get_mut(node_id) {
119 node.visible = false;
120 }
121
122 for incoming in &incoming_edges {
124 if let Some(incoming_node) = self.nodes.get_mut(incoming) {
125 incoming_node.outgoing.remove(node_id);
126 }
127 }
128 for outgoing in &outgoing_edges {
129 if let Some(outgoing_node) = self.nodes.get_mut(outgoing) {
130 outgoing_node.incoming.remove(node_id);
131 }
132 }
133
134 Ok(())
135 } else {
136 Err(AdvancedCrdtError::ElementNotFound(format!("Node {:?}", node_id)))
137 }
138 }
139
140 fn would_create_cycle(&self, from: &PositionId, to: &PositionId) -> bool {
142 if from == to {
143 return true;
144 }
145
146 let mut visited = HashSet::new();
148 self.dfs_cycle_check(to, from, &mut visited)
149 }
150
151 fn dfs_cycle_check(&self, current: &PositionId, target: &PositionId, visited: &mut HashSet<PositionId>) -> bool {
153 if current == target {
154 return true;
155 }
156
157 if visited.contains(current) {
158 return false;
159 }
160
161 visited.insert(current.clone());
162
163 if let Some(node) = self.nodes.get(current) {
164 for next in &node.outgoing {
165 if self.dfs_cycle_check(next, target, visited) {
166 return true;
167 }
168 }
169 }
170
171 false
172 }
173
174 pub fn topological_sort(&self) -> Vec<PositionId> {
176 let mut result = Vec::new();
177 let mut visited = HashSet::new();
178
179 for node_id in self.nodes.keys() {
180 if !visited.contains(node_id) {
181 self.dfs_topological(node_id, &mut visited, &mut result);
182 }
183 }
184
185 result.reverse();
186 result
187 }
188
189 fn dfs_topological(&self, node_id: &PositionId, visited: &mut HashSet<PositionId>, result: &mut Vec<PositionId>) {
191 if visited.contains(node_id) {
192 return;
193 }
194
195 visited.insert(node_id.clone());
196
197 if let Some(node) = self.nodes.get(node_id) {
198 for next in &node.outgoing {
199 self.dfs_topological(next, visited, result);
200 }
201 }
202
203 result.push(node_id.clone());
204 }
205
206 pub fn len(&self) -> usize {
208 self.nodes.len()
209 }
210
211 pub fn is_empty(&self) -> bool {
213 self.nodes.is_empty()
214 }
215
216 pub fn get_nodes(&self) -> &HashMap<PositionId, DagNode<T>> {
218 &self.nodes
219 }
220}
221
222impl<T: Clone + PartialEq> CRDT for Dag<T> {
223 fn replica_id(&self) -> &ReplicaId {
224 &self.replica_id
225 }
226}
227
228impl<T: Clone + PartialEq + Send + Sync> Mergeable for Dag<T> {
229 type Error = AdvancedCrdtError;
230
231 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
232 for (node_id, other_node) in &other.nodes {
234 if let Some(self_node) = self.nodes.get_mut(node_id) {
235 if other_node.id.timestamp > self_node.id.timestamp {
237 *self_node = other_node.clone();
238 }
239 } else {
240 self.nodes.insert(node_id.clone(), other_node.clone());
242 }
243 }
244
245 Ok(())
246 }
247
248 fn has_conflict(&self, other: &Self) -> bool {
249 for (node_id, self_node) in &self.nodes {
251 if let Some(other_node) = other.nodes.get(node_id) {
252 if self_node.value != other_node.value {
253 return true;
254 }
255 }
256 }
257 false
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use super::super::super::ReplicaId;
265 use uuid::Uuid;
266
267 fn create_replica(id: u64) -> ReplicaId {
268 ReplicaId::from(Uuid::from_u64_pair(0, id))
269 }
270
271 #[test]
272 fn test_dag_creation() {
273 let replica_id = create_replica(1);
274 let dag = Dag::<String>::new(replica_id.clone());
275
276 assert_eq!(dag.replica_id(), &replica_id);
277 assert!(dag.is_empty());
278 assert_eq!(dag.len(), 0);
279 }
280
281 #[test]
282 fn test_dag_operations() {
283 let replica_id = create_replica(1);
284 let mut dag = Dag::<String>::new(replica_id);
285
286 let node1_id = dag.add_node("node1".to_string()).unwrap();
288 let node2_id = dag.add_node("node2".to_string()).unwrap();
289 let node3_id = dag.add_node("node3".to_string()).unwrap();
290
291 assert_eq!(dag.len(), 3);
292
293 dag.add_edge(&node1_id, &node2_id).unwrap();
295 dag.add_edge(&node2_id, &node3_id).unwrap();
296
297 let sorted = dag.topological_sort();
299 assert_eq!(sorted.len(), 3);
300 assert_eq!(sorted[0], node1_id);
301 assert_eq!(sorted[1], node2_id);
302 assert_eq!(sorted[2], node3_id);
303
304 dag.remove_edge(&node1_id, &node2_id).unwrap();
306
307 dag.delete_node(&node2_id).unwrap();
309 assert_eq!(dag.len(), 3); }
311
312 #[test]
313 fn test_dag_cycle_detection() {
314 let replica_id = create_replica(1);
315 let mut dag = Dag::<String>::new(replica_id);
316
317 let node1_id = dag.add_node("node1".to_string()).unwrap();
319 let node2_id = dag.add_node("node2".to_string()).unwrap();
320 let node3_id = dag.add_node("node3".to_string()).unwrap();
321
322 dag.add_edge(&node1_id, &node2_id).unwrap();
324 dag.add_edge(&node2_id, &node3_id).unwrap();
325
326 let result = dag.add_edge(&node3_id, &node1_id);
328 assert!(result.is_err());
329 assert_eq!(result.unwrap_err(), AdvancedCrdtError::CycleDetected("Adding edge would create cycle".to_string()));
330 }
331
332 #[test]
333 fn test_dag_merge() {
334 let replica_id1 = create_replica(1);
335 let replica_id2 = create_replica(2);
336
337 let mut dag1 = Dag::<String>::new(replica_id1);
338 let mut dag2 = Dag::<String>::new(replica_id2);
339
340 let node1_id = dag1.add_node("node1".to_string()).unwrap();
342 let node2_id = dag2.add_node("node2".to_string()).unwrap();
343
344 dag1.merge(&dag2).unwrap();
349
350 assert_eq!(dag1.len(), 2);
352 }
353}