logicaffeine_data/crdt/sequence/
rga.rs1use crate::crdt::causal::VClock;
7use crate::crdt::delta::DeltaCrdt;
8use crate::crdt::replica::{generate_replica_id, ReplicaId};
9use crate::crdt::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::cmp::Ordering;
13use std::collections::HashMap;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RGADelta<T> {
18 pub nodes: Vec<RgaNode<T>>,
19 pub timestamp: u64,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
24pub struct RgaId {
25 pub timestamp: u64,
27 pub replica: ReplicaId,
29}
30
31impl RgaId {
32 fn new(timestamp: u64, replica: ReplicaId) -> Self {
33 Self { timestamp, replica }
34 }
35}
36
37impl Ord for RgaId {
38 fn cmp(&self, other: &Self) -> Ordering {
39 match self.timestamp.cmp(&other.timestamp) {
41 Ordering::Equal => self.replica.cmp(&other.replica),
42 ord => ord,
43 }
44 }
45}
46
47impl PartialOrd for RgaId {
48 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49 Some(self.cmp(other))
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
55pub struct RgaNode<T> {
56 pub id: RgaId,
57 pub value: T,
58 pub deleted: bool,
59 pub parent: Option<RgaId>,
61}
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
68pub struct RGA<T> {
69 nodes: Vec<RgaNode<T>>,
71 timestamp: u64,
73 replica_id: ReplicaId,
75}
76
77impl<T: Clone + PartialEq> RGA<T> {
78 pub fn new(replica_id: ReplicaId) -> Self {
80 Self {
81 nodes: Vec::new(),
82 timestamp: 0,
83 replica_id,
84 }
85 }
86
87 pub fn new_random() -> Self {
89 Self::new(generate_replica_id())
90 }
91
92 pub fn append(&mut self, value: T) {
94 self.timestamp += 1;
95 let id = RgaId::new(self.timestamp, self.replica_id);
96 let parent = self.last_visible_id();
97
98 self.nodes.push(RgaNode {
99 id,
100 value,
101 deleted: false,
102 parent,
103 });
104 }
105
106 pub fn insert_before(&mut self, index: usize, value: T) {
108 if index == 0 {
109 self.timestamp += 1;
111 let id = RgaId::new(self.timestamp, self.replica_id);
112 self.nodes.push(RgaNode {
113 id,
114 value,
115 deleted: false,
116 parent: None,
117 });
118 } else {
119 self.insert_after(index - 1, value);
121 }
122 }
123
124 pub fn insert_after(&mut self, index: usize, value: T) {
126 let parent_id = self.visible_id_at(index);
127 self.timestamp += 1;
128 let id = RgaId::new(self.timestamp, self.replica_id);
129
130 self.nodes.push(RgaNode {
131 id,
132 value,
133 deleted: false,
134 parent: parent_id,
135 });
136 }
137
138 pub fn remove(&mut self, index: usize) {
140 if let Some(id) = self.visible_id_at(index) {
141 if let Some(node) = self.nodes.iter_mut().find(|n| n.id == id) {
142 node.deleted = true;
143 }
144 }
145 }
146
147 pub fn get(&self, index: usize) -> Option<&T> {
149 self.visible_nodes().nth(index).map(|n| &n.value)
150 }
151
152 pub fn len(&self) -> usize {
154 self.visible_nodes().count()
155 }
156
157 pub fn is_empty(&self) -> bool {
159 self.len() == 0
160 }
161
162 pub fn to_vec(&self) -> Vec<T> {
164 self.visible_nodes().map(|n| n.value.clone()).collect()
165 }
166
167 pub fn iter(&self) -> impl Iterator<Item = &T> {
169 self.visible_nodes().map(|n| &n.value)
170 }
171
172 fn visible_nodes(&self) -> impl Iterator<Item = &RgaNode<T>> {
174 self.sorted_nodes()
175 .into_iter()
176 .filter(|n| !n.deleted)
177 }
178
179 fn sorted_nodes(&self) -> Vec<&RgaNode<T>> {
181 let mut children_map: HashMap<Option<RgaId>, Vec<&RgaNode<T>>> = HashMap::new();
183 for node in &self.nodes {
184 children_map.entry(node.parent).or_default().push(node);
185 }
186
187 for children in children_map.values_mut() {
189 children.sort_by(|a, b| b.id.cmp(&a.id));
190 }
191
192 let mut result: Vec<&RgaNode<T>> = Vec::new();
194 let mut stack: Vec<&RgaNode<T>> = Vec::new();
195
196 if let Some(heads) = children_map.get(&None) {
198 for node in heads.iter().rev() {
199 stack.push(node);
200 }
201 }
202
203 while let Some(node) = stack.pop() {
204 result.push(node);
205
206 if let Some(children) = children_map.get(&Some(node.id)) {
208 for child in children.iter().rev() {
209 stack.push(child);
210 }
211 }
212 }
213
214 result
215 }
216
217 fn last_visible_id(&self) -> Option<RgaId> {
219 self.sorted_nodes()
220 .into_iter()
221 .filter(|n| !n.deleted)
222 .last()
223 .map(|n| n.id)
224 }
225
226 fn visible_id_at(&self, index: usize) -> Option<RgaId> {
228 self.visible_nodes().nth(index).map(|n| n.id)
229 }
230}
231
232impl<T: Clone + PartialEq> Merge for RGA<T> {
233 fn merge(&mut self, other: &Self) {
234 self.timestamp = self.timestamp.max(other.timestamp);
236
237 for other_node in &other.nodes {
239 let exists = self.nodes.iter().any(|n| n.id == other_node.id);
240 if !exists {
241 self.nodes.push(other_node.clone());
242 } else {
243 if let Some(my_node) = self.nodes.iter_mut().find(|n| n.id == other_node.id) {
245 if other_node.deleted {
246 my_node.deleted = true;
247 }
248 }
249 }
250 }
251 }
252}
253
254impl<T: Clone + PartialEq + Serialize + DeserializeOwned + Send + 'static> DeltaCrdt for RGA<T> {
255 type Delta = RGADelta<T>;
256
257 fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
258 let current = self.version();
259 if since.dominates(¤t) {
260 return None;
261 }
262
263 Some(RGADelta {
265 nodes: self.nodes.clone(),
266 timestamp: self.timestamp,
267 })
268 }
269
270 fn apply_delta(&mut self, delta: &Self::Delta) {
271 self.timestamp = self.timestamp.max(delta.timestamp);
272
273 for delta_node in &delta.nodes {
274 let exists = self.nodes.iter().any(|n| n.id == delta_node.id);
275 if !exists {
276 self.nodes.push(delta_node.clone());
277 } else if let Some(my_node) = self.nodes.iter_mut().find(|n| n.id == delta_node.id) {
278 if delta_node.deleted {
279 my_node.deleted = true;
280 }
281 }
282 }
283 }
284
285 fn version(&self) -> VClock {
286 let mut clock = VClock::new();
288 for node in &self.nodes {
289 let current = clock.get(node.id.replica);
290 if node.id.timestamp > current {
291 for _ in current..node.id.timestamp {
293 clock.increment(node.id.replica);
294 }
295 }
296 }
297 clock
298 }
299}
300
301impl<T: Clone + PartialEq> Default for RGA<T> {
302 fn default() -> Self {
303 Self::new_random()
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_rga_append() {
313 let mut seq: RGA<String> = RGA::new(1);
314 seq.append("a".to_string());
315 seq.append("b".to_string());
316 assert_eq!(seq.to_vec(), vec!["a", "b"]);
317 }
318
319 #[test]
320 fn test_rga_insert_before() {
321 let mut seq: RGA<String> = RGA::new(1);
322 seq.append("b".to_string());
323 seq.insert_before(0, "a".to_string());
324 assert_eq!(seq.to_vec(), vec!["a", "b"]);
325 }
326
327 #[test]
328 fn test_rga_remove() {
329 let mut seq: RGA<String> = RGA::new(1);
330 seq.append("a".to_string());
331 seq.append("b".to_string());
332 seq.remove(0);
333 assert_eq!(seq.to_vec(), vec!["b"]);
334 }
335
336 #[test]
337 fn test_rga_concurrent_append() {
338 let mut a: RGA<String> = RGA::new(1);
339 let mut b: RGA<String> = RGA::new(2);
340
341 a.append("from-a".to_string());
342 b.append("from-b".to_string());
343
344 a.merge(&b);
345 b.merge(&a);
346
347 assert_eq!(a.to_vec(), b.to_vec());
348 }
349}