leptos_sync_core/crdt/advanced/
rga.rs1use super::common::{PositionId, AdvancedCrdtError};
4use super::super::{CRDT, Mergeable, ReplicaId};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
10pub struct RgaElement<T> {
11 pub position: PositionId,
13 pub value: T,
15 pub visible: bool,
17 pub prev: Option<PositionId>,
19}
20
21impl<T> RgaElement<T> {
22 pub fn new(position: PositionId, value: T, prev: Option<PositionId>) -> Self {
24 Self {
25 position,
26 value,
27 visible: true,
28 prev,
29 }
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35pub struct Rga<T> {
36 replica_id: ReplicaId,
38 elements: HashMap<PositionId, RgaElement<T>>,
40 timestamp_counter: u64,
42 disambiguation_counter: u64,
44}
45
46impl<T: Clone + PartialEq> Rga<T> {
47 pub fn new(replica_id: ReplicaId) -> Self {
49 Self {
50 replica_id,
51 elements: HashMap::new(),
52 timestamp_counter: 0,
53 disambiguation_counter: 0,
54 }
55 }
56
57 pub fn insert_after(&mut self, value: T, after: Option<PositionId>) -> Result<PositionId, AdvancedCrdtError> {
59 self.timestamp_counter += 1;
60 self.disambiguation_counter += 1;
61
62 let position = PositionId::new(
63 self.replica_id.clone(),
64 self.timestamp_counter,
65 self.disambiguation_counter,
66 );
67
68 let element = RgaElement::new(position.clone(), value, after);
69 self.elements.insert(position.clone(), element);
70
71 Ok(position)
72 }
73
74 pub fn delete(&mut self, position: &PositionId) -> Result<(), AdvancedCrdtError> {
76 if let Some(element) = self.elements.get_mut(position) {
77 element.visible = false;
78 Ok(())
79 } else {
80 Err(AdvancedCrdtError::ElementNotFound(format!("Position {:?}", position)))
81 }
82 }
83
84 pub fn to_vec(&self) -> Vec<T> {
86 let mut result = Vec::new();
87
88 let mut elements: Vec<_> = self.elements.values()
91 .filter(|e| e.visible)
92 .collect();
93
94 elements.sort_by(|a, b| a.position.cmp(&b.position));
96
97 for element in elements {
99 result.push(element.value.clone());
100 }
101
102 result
103 }
104
105 fn find_first_element(&self) -> Option<PositionId> {
107 self.elements.values()
109 .find(|e| e.prev.is_none())
110 .map(|e| e.position.clone())
111 }
112
113 fn find_next_element(&self, position: &PositionId) -> Option<PositionId> {
115 self.elements.values()
116 .find(|e| e.prev.as_ref() == Some(position))
117 .map(|e| e.position.clone())
118 }
119
120 pub fn len(&self) -> usize {
122 self.elements.len()
123 }
124
125 pub fn is_empty(&self) -> bool {
127 self.elements.is_empty()
128 }
129}
130
131impl<T: Clone + PartialEq> CRDT for Rga<T> {
132 fn replica_id(&self) -> &ReplicaId {
133 &self.replica_id
134 }
135}
136
137impl<T: Clone + PartialEq + Send + Sync> Mergeable for Rga<T> {
138 type Error = AdvancedCrdtError;
139
140 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
141 for (position, other_element) in &other.elements {
143 if let Some(self_element) = self.elements.get_mut(position) {
144 if other_element.position.timestamp > self_element.position.timestamp {
146 *self_element = other_element.clone();
147 }
148 } else {
149 self.elements.insert(position.clone(), other_element.clone());
151 }
152 }
153
154 Ok(())
155 }
156
157 fn has_conflict(&self, other: &Self) -> bool {
158 for (position, self_element) in &self.elements {
160 if let Some(other_element) = other.elements.get(position) {
161 if self_element.value != other_element.value {
162 return true;
163 }
164 }
165 }
166 false
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use super::super::super::ReplicaId;
174 use uuid::Uuid;
175
176 fn create_replica(id: u64) -> ReplicaId {
177 ReplicaId::from(Uuid::from_u64_pair(0, id))
178 }
179
180 #[test]
181 fn test_rga_creation() {
182 let replica_id = create_replica(1);
183 let rga = Rga::<String>::new(replica_id.clone());
184
185 assert_eq!(rga.replica_id(), &replica_id);
186 assert!(rga.is_empty());
187 assert_eq!(rga.len(), 0);
188 }
189
190 #[test]
191 fn test_rga_insert_and_delete() {
192 let replica_id = create_replica(1);
193 let mut rga = Rga::<String>::new(replica_id);
194
195 let pos1 = rga.insert_after("hello".to_string(), None).unwrap();
197 let pos2 = rga.insert_after("world".to_string(), Some(pos1.clone())).unwrap();
198 let pos3 = rga.insert_after("!".to_string(), Some(pos2.clone())).unwrap();
199
200 assert_eq!(rga.len(), 3);
201 assert_eq!(rga.to_vec(), vec!["hello", "world", "!"]);
202
203 rga.delete(&pos2).unwrap();
205 assert_eq!(rga.to_vec(), vec!["hello", "!"]);
206
207 rga.delete(&pos1).unwrap();
209 assert_eq!(rga.to_vec(), vec!["!"]);
210
211 rga.delete(&pos3).unwrap();
213 assert_eq!(rga.to_vec(), Vec::<String>::new());
214 }
215
216 #[test]
217 fn test_rga_merge() {
218 let replica_id1 = create_replica(1);
219 let replica_id2 = create_replica(2);
220
221 let mut rga1 = Rga::<String>::new(replica_id1);
222 let mut rga2 = Rga::<String>::new(replica_id2);
223
224 let _pos1 = rga1.insert_after("hello".to_string(), None).unwrap();
226 let _pos2 = rga2.insert_after("world".to_string(), None).unwrap();
227
228 rga1.merge(&rga2).unwrap();
230
231 let elements = rga1.to_vec();
233 assert!(elements.contains(&"hello".to_string()));
234 assert!(elements.contains(&"world".to_string()));
235 }
236}