Skip to main content

crdt_kit/
rga.rs

1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::string::String;
3use alloc::vec::Vec;
4
5use crate::Crdt;
6
7/// A unique node identifier: `(actor, counter)`.
8type NodeId = (String, u64);
9/// Map from parent id to list of child ids used during sequence rebuild.
10type ChildrenMap = BTreeMap<Option<NodeId>, Vec<NodeId>>;
11
12/// A Replicated Growable Array (RGA) — an ordered sequence CRDT.
13///
14/// RGA supports insert and delete at arbitrary positions while
15/// guaranteeing convergence across replicas. Each element is assigned
16/// a unique identifier `(actor, counter)` which determines causal
17/// ordering. When two replicas concurrently insert at the same
18/// position, the conflict is resolved deterministically by comparing
19/// the unique identifiers, ensuring all replicas converge to the
20/// same sequence after merging.
21///
22/// # Example
23///
24/// ```
25/// use crdt_kit::prelude::*;
26///
27/// let mut list1 = Rga::new("node-1");
28/// list1.insert_at(0, 'H');
29/// list1.insert_at(1, 'i');
30///
31/// let mut list2 = Rga::new("node-2");
32/// list2.insert_at(0, '!');
33///
34/// list1.merge(&list2);
35/// list2.merge(&list1);
36///
37/// // Both replicas converge to the same sequence
38/// let v1: Vec<&char> = list1.iter().collect();
39/// let v2: Vec<&char> = list2.iter().collect();
40/// assert_eq!(v1, v2);
41/// assert_eq!(list1.len(), 3);
42/// ```
43#[derive(Debug, Clone, PartialEq, Eq)]
44#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45pub struct Rga<T: Clone + Ord> {
46    actor: String,
47    counter: u64,
48    /// Ordered backbone: each node is identified by a unique `(actor, counter)`.
49    /// The value is `(element, parent_id)` where `parent_id` is the id of the
50    /// node after which this node was inserted (`None` for a head insert).
51    nodes: BTreeMap<(String, u64), RgaNode<T>>,
52    /// Set of ids that have been tombstoned (logically deleted).
53    tombstones: BTreeSet<(String, u64)>,
54    /// Cached linear order of node ids. Recomputed on mutation.
55    sequence: Vec<(String, u64)>,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
60struct RgaNode<T: Clone + Ord> {
61    value: T,
62    /// The id of the node after which this node was inserted.
63    /// `None` means it was inserted at the head of the list.
64    parent: Option<(String, u64)>,
65}
66
67impl<T: Clone + Ord> Rga<T> {
68    /// Create a new empty RGA for the given actor.
69    pub fn new(actor: impl Into<String>) -> Self {
70        Self {
71            actor: actor.into(),
72            counter: 0,
73            nodes: BTreeMap::new(),
74            tombstones: BTreeSet::new(),
75            sequence: Vec::new(),
76        }
77    }
78
79    /// Insert a value at the given index in the visible sequence.
80    ///
81    /// # Panics
82    ///
83    /// Panics if `index > self.len()`.
84    pub fn insert_at(&mut self, index: usize, value: T) {
85        let visible = self.visible_sequence();
86        assert!(
87            index <= visible.len(),
88            "index {} out of bounds for length {}",
89            index,
90            visible.len()
91        );
92
93        let parent = if index == 0 {
94            None
95        } else {
96            Some(visible[index - 1].clone())
97        };
98
99        self.counter += 1;
100        let id = (self.actor.clone(), self.counter);
101
102        self.nodes.insert(
103            id,
104            RgaNode {
105                value,
106                parent: parent.clone(),
107            },
108        );
109
110        self.rebuild_sequence();
111    }
112
113    /// Remove the element at the given index from the visible sequence.
114    ///
115    /// Returns the removed value, or `None` if the index is out of bounds.
116    pub fn remove(&mut self, index: usize) -> Option<T> {
117        let visible = self.visible_sequence();
118        if index >= visible.len() {
119            return None;
120        }
121
122        let id = visible[index].clone();
123        self.tombstones.insert(id.clone());
124        self.rebuild_sequence();
125
126        self.nodes.get(&id).map(|node| node.value.clone())
127    }
128
129    /// Get a reference to the element at the given index in the visible sequence.
130    #[must_use]
131    pub fn get(&self, index: usize) -> Option<&T> {
132        let visible = self.visible_sequence();
133        visible
134            .get(index)
135            .and_then(|id| self.nodes.get(id))
136            .map(|node| &node.value)
137    }
138
139    /// Get the number of visible (non-tombstoned) elements.
140    #[must_use]
141    pub fn len(&self) -> usize {
142        self.visible_sequence().len()
143    }
144
145    /// Check if the visible sequence is empty.
146    #[must_use]
147    pub fn is_empty(&self) -> bool {
148        self.len() == 0
149    }
150
151    /// Iterate over the visible elements in order.
152    pub fn iter(&self) -> impl Iterator<Item = &T> {
153        let visible = self.visible_sequence();
154        visible
155            .into_iter()
156            .filter_map(move |id| self.nodes.get(&id).map(|node| &node.value))
157            .collect::<Vec<_>>()
158            .into_iter()
159    }
160
161    /// Get this replica's actor ID.
162    #[must_use]
163    pub fn actor(&self) -> &str {
164        &self.actor
165    }
166
167    /// Collect visible elements into a `Vec`.
168    #[must_use]
169    pub fn to_vec(&self) -> Vec<T> {
170        self.iter().cloned().collect()
171    }
172
173    /// Return the ordered list of ids for visible (non-tombstoned) elements.
174    fn visible_sequence(&self) -> Vec<(String, u64)> {
175        self.sequence
176            .iter()
177            .filter(|id| !self.tombstones.contains(id))
178            .cloned()
179            .collect()
180    }
181
182    /// Rebuild the linearised sequence from the DAG of nodes.
183    ///
184    /// The algorithm works by grouping all nodes by their parent id, then
185    /// performing a depth-first traversal starting from nodes whose parent
186    /// is `None` (i.e. inserted at the head). Among siblings (nodes sharing
187    /// the same parent), we sort by id in *reverse* order so that the most
188    /// recent / highest-priority node appears first — this matches RGA
189    /// semantics where a newer concurrent insert at the same position
190    /// appears before older ones.
191    fn rebuild_sequence(&mut self) {
192        // Group children by parent id.
193        let mut children: ChildrenMap = BTreeMap::new();
194        for (id, node) in &self.nodes {
195            children
196                .entry(node.parent.clone())
197                .or_default()
198                .push(id.clone());
199        }
200
201        // Sort each sibling group so that higher ids come first.
202        // Higher id = larger counter, or same counter but lexicographically
203        // larger actor, which ensures deterministic total ordering.
204        for siblings in children.values_mut() {
205            siblings.sort_by(|a, b| b.cmp(a));
206        }
207
208        // DFS traversal to build the linear sequence.
209        let mut sequence = Vec::with_capacity(self.nodes.len());
210        let mut stack: Vec<(String, u64)> = Vec::new();
211
212        // Push root children (parent == None) onto the stack.
213        if let Some(roots) = children.get(&None) {
214            // Push in reverse so that the highest-priority node is processed first.
215            for id in roots.iter().rev() {
216                stack.push(id.clone());
217            }
218        }
219
220        while let Some(id) = stack.pop() {
221            sequence.push(id.clone());
222            // Push this node's children in reverse order.
223            if let Some(kids) = children.get(&Some(id)) {
224                for kid in kids.iter().rev() {
225                    stack.push(kid.clone());
226                }
227            }
228        }
229
230        self.sequence = sequence;
231    }
232}
233
234impl<T: Clone + Ord> Crdt for Rga<T> {
235    fn merge(&mut self, other: &Self) {
236        // Import all nodes from the other replica that we don't have yet.
237        for (id, node) in &other.nodes {
238            self.nodes.entry(id.clone()).or_insert_with(|| node.clone());
239        }
240
241        // Merge tombstones (union).
242        self.tombstones.extend(other.tombstones.iter().cloned());
243
244        // Update our counter to be at least as high as the other's.
245        self.counter = self.counter.max(other.counter);
246
247        // Rebuild the sequence to reflect the merged state.
248        self.rebuild_sequence();
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn new_rga_is_empty() {
258        let rga = Rga::<String>::new("a");
259        assert!(rga.is_empty());
260        assert_eq!(rga.len(), 0);
261        assert_eq!(rga.get(0), None);
262    }
263
264    #[test]
265    fn insert_at_head() {
266        let mut rga = Rga::new("a");
267        rga.insert_at(0, 'H');
268        rga.insert_at(1, 'i');
269        assert_eq!(rga.len(), 2);
270        assert_eq!(rga.get(0), Some(&'H'));
271        assert_eq!(rga.get(1), Some(&'i'));
272    }
273
274    #[test]
275    fn insert_at_middle() {
276        let mut rga = Rga::new("a");
277        rga.insert_at(0, 'a');
278        rga.insert_at(1, 'c');
279        rga.insert_at(1, 'b');
280        assert_eq!(rga.to_vec(), vec!['a', 'b', 'c']);
281    }
282
283    #[test]
284    fn insert_at_end() {
285        let mut rga = Rga::new("a");
286        rga.insert_at(0, 1);
287        rga.insert_at(1, 2);
288        rga.insert_at(2, 3);
289        assert_eq!(rga.to_vec(), vec![1, 2, 3]);
290    }
291
292    #[test]
293    #[should_panic(expected = "index 5 out of bounds")]
294    fn insert_out_of_bounds_panics() {
295        let mut rga = Rga::new("a");
296        rga.insert_at(0, 'x');
297        rga.insert_at(5, 'y');
298    }
299
300    #[test]
301    fn remove_element() {
302        let mut rga = Rga::new("a");
303        rga.insert_at(0, 'a');
304        rga.insert_at(1, 'b');
305        rga.insert_at(2, 'c');
306
307        let removed = rga.remove(1);
308        assert_eq!(removed, Some('b'));
309        assert_eq!(rga.len(), 2);
310        assert_eq!(rga.to_vec(), vec!['a', 'c']);
311    }
312
313    #[test]
314    fn remove_out_of_bounds_returns_none() {
315        let mut rga = Rga::new("a");
316        rga.insert_at(0, 'a');
317        assert_eq!(rga.remove(5), None);
318        assert_eq!(rga.len(), 1);
319    }
320
321    #[test]
322    fn remove_first_and_last() {
323        let mut rga = Rga::new("a");
324        rga.insert_at(0, 'a');
325        rga.insert_at(1, 'b');
326        rga.insert_at(2, 'c');
327
328        rga.remove(0);
329        assert_eq!(rga.to_vec(), vec!['b', 'c']);
330
331        rga.remove(1);
332        assert_eq!(rga.to_vec(), vec!['b']);
333
334        rga.remove(0);
335        assert!(rga.is_empty());
336    }
337
338    #[test]
339    fn get_returns_correct_values() {
340        let mut rga = Rga::new("a");
341        rga.insert_at(0, "hello");
342        rga.insert_at(1, "world");
343        assert_eq!(rga.get(0), Some(&"hello"));
344        assert_eq!(rga.get(1), Some(&"world"));
345        assert_eq!(rga.get(2), None);
346    }
347
348    #[test]
349    fn iterate_elements() {
350        let mut rga = Rga::new("a");
351        rga.insert_at(0, 10);
352        rga.insert_at(1, 20);
353        rga.insert_at(2, 30);
354        rga.remove(1);
355
356        let elems: Vec<&i32> = rga.iter().collect();
357        assert_eq!(elems, vec![&10, &30]);
358    }
359
360    #[test]
361    fn actor_returns_id() {
362        let rga = Rga::<i32>::new("node-42");
363        assert_eq!(rga.actor(), "node-42");
364    }
365
366    // --- Merge tests ---
367
368    #[test]
369    fn merge_disjoint_inserts() {
370        let mut r1 = Rga::new("a");
371        r1.insert_at(0, 'x');
372
373        let mut r2 = Rga::new("b");
374        r2.insert_at(0, 'y');
375
376        r1.merge(&r2);
377        assert_eq!(r1.len(), 2);
378        // Both elements present
379        let v = r1.to_vec();
380        assert!(v.contains(&'x'));
381        assert!(v.contains(&'y'));
382    }
383
384    #[test]
385    fn merge_concurrent_inserts_at_same_position() {
386        // Both replicas start empty and insert at position 0.
387        let mut r1 = Rga::new("a");
388        r1.insert_at(0, 'A');
389
390        let mut r2 = Rga::new("b");
391        r2.insert_at(0, 'B');
392
393        let mut r1_copy = r1.clone();
394        let mut r2_copy = r2.clone();
395
396        r1_copy.merge(&r2);
397        r2_copy.merge(&r1);
398
399        // Both replicas must converge to the same order.
400        assert_eq!(r1_copy.to_vec(), r2_copy.to_vec());
401        assert_eq!(r1_copy.len(), 2);
402    }
403
404    #[test]
405    fn merge_concurrent_inserts_after_shared_prefix() {
406        // Both replicas share a prefix and then insert at the same position.
407        let mut r1 = Rga::new("a");
408        r1.insert_at(0, 'H');
409        r1.insert_at(1, 'e');
410
411        let mut r2 = r1.clone();
412        // Change the actor for r2 so it generates distinct ids.
413        r2.actor = "b".to_string();
414
415        // r1 inserts 'X' after 'e'
416        r1.insert_at(2, 'X');
417        // r2 inserts 'Y' after 'e'
418        r2.insert_at(2, 'Y');
419
420        let mut r1_merged = r1.clone();
421        r1_merged.merge(&r2);
422
423        let mut r2_merged = r2.clone();
424        r2_merged.merge(&r1);
425
426        assert_eq!(r1_merged.to_vec(), r2_merged.to_vec());
427        assert_eq!(r1_merged.len(), 4);
428
429        // Prefix is preserved.
430        assert_eq!(r1_merged.get(0), Some(&'H'));
431        assert_eq!(r1_merged.get(1), Some(&'e'));
432    }
433
434    #[test]
435    fn merge_with_deletions() {
436        let mut r1 = Rga::new("a");
437        r1.insert_at(0, 'a');
438        r1.insert_at(1, 'b');
439        r1.insert_at(2, 'c');
440
441        let mut r2 = r1.clone();
442        r2.actor = "b".to_string();
443
444        // r1 removes 'b'
445        r1.remove(1);
446        // r2 inserts 'd' at end
447        r2.insert_at(3, 'd');
448
449        r1.merge(&r2);
450        // 'b' should be tombstoned, 'd' should be added
451        assert!(!r1.to_vec().contains(&'b'));
452        assert!(r1.to_vec().contains(&'d'));
453        assert_eq!(r1.len(), 3); // a, c, d
454    }
455
456    #[test]
457    fn merge_is_commutative() {
458        let mut r1 = Rga::new("a");
459        r1.insert_at(0, 1);
460        r1.insert_at(1, 2);
461
462        let mut r2 = Rga::new("b");
463        r2.insert_at(0, 3);
464        r2.insert_at(1, 4);
465
466        let mut left = r1.clone();
467        left.merge(&r2);
468
469        let mut right = r2.clone();
470        right.merge(&r1);
471
472        assert_eq!(left.to_vec(), right.to_vec());
473    }
474
475    #[test]
476    fn merge_commutativity_with_deletions() {
477        let mut r1 = Rga::new("a");
478        r1.insert_at(0, 'x');
479        r1.insert_at(1, 'y');
480
481        let mut r2 = r1.clone();
482        r2.actor = "b".to_string();
483
484        r1.remove(0); // remove 'x'
485        r2.insert_at(2, 'z');
486
487        let mut left = r1.clone();
488        left.merge(&r2);
489
490        let mut right = r2.clone();
491        right.merge(&r1);
492
493        assert_eq!(left.to_vec(), right.to_vec());
494    }
495
496    #[test]
497    fn merge_is_associative() {
498        let mut r1 = Rga::new("a");
499        r1.insert_at(0, 'A');
500
501        let mut r2 = Rga::new("b");
502        r2.insert_at(0, 'B');
503
504        let mut r3 = Rga::new("c");
505        r3.insert_at(0, 'C');
506
507        // (r1 merge r2) merge r3
508        let mut left = r1.clone();
509        left.merge(&r2);
510        left.merge(&r3);
511
512        // r1 merge (r2 merge r3)
513        let mut r2_r3 = r2.clone();
514        r2_r3.merge(&r3);
515        let mut right = r1.clone();
516        right.merge(&r2_r3);
517
518        assert_eq!(left.to_vec(), right.to_vec());
519    }
520
521    #[test]
522    fn merge_is_idempotent() {
523        let mut r1 = Rga::new("a");
524        r1.insert_at(0, 'x');
525        r1.insert_at(1, 'y');
526
527        let mut r2 = Rga::new("b");
528        r2.insert_at(0, 'z');
529
530        r1.merge(&r2);
531        let after_first = r1.clone();
532
533        r1.merge(&r2);
534        assert_eq!(r1.to_vec(), after_first.to_vec());
535        assert_eq!(r1, after_first);
536    }
537
538    #[test]
539    fn merge_self_is_idempotent() {
540        let mut rga = Rga::new("a");
541        rga.insert_at(0, 1);
542        rga.insert_at(1, 2);
543        rga.remove(0);
544
545        let snapshot = rga.clone();
546        rga.merge(&snapshot);
547
548        assert_eq!(rga, snapshot);
549    }
550
551    #[test]
552    fn causal_ordering_preserved() {
553        // Build a sequence on one replica, then merge into another.
554        let mut r1 = Rga::new("a");
555        r1.insert_at(0, 'H');
556        r1.insert_at(1, 'e');
557        r1.insert_at(2, 'l');
558        r1.insert_at(3, 'l');
559        r1.insert_at(4, 'o');
560
561        let mut r2 = Rga::new("b");
562        r2.merge(&r1);
563
564        assert_eq!(r2.to_vec(), vec!['H', 'e', 'l', 'l', 'o']);
565    }
566
567    #[test]
568    fn causal_ordering_insert_between() {
569        let mut rga = Rga::new("a");
570        rga.insert_at(0, 1);
571        rga.insert_at(1, 3);
572        rga.insert_at(1, 2); // insert 2 between 1 and 3
573
574        assert_eq!(rga.to_vec(), vec![1, 2, 3]);
575    }
576
577    #[test]
578    fn three_way_merge_convergence() {
579        // Three replicas each insert at position 0 concurrently.
580        let mut r1 = Rga::new("a");
581        r1.insert_at(0, 'A');
582
583        let mut r2 = Rga::new("b");
584        r2.insert_at(0, 'B');
585
586        let mut r3 = Rga::new("c");
587        r3.insert_at(0, 'C');
588
589        let mut m1 = r1.clone();
590        m1.merge(&r2);
591        m1.merge(&r3);
592
593        let mut m2 = r2.clone();
594        m2.merge(&r1);
595        m2.merge(&r3);
596
597        let mut m3 = r3.clone();
598        m3.merge(&r1);
599        m3.merge(&r2);
600
601        assert_eq!(m1.to_vec(), m2.to_vec());
602        assert_eq!(m2.to_vec(), m3.to_vec());
603        assert_eq!(m1.len(), 3);
604    }
605
606    #[test]
607    fn concurrent_delete_same_element() {
608        let mut r1 = Rga::new("a");
609        r1.insert_at(0, 'x');
610
611        let mut r2 = r1.clone();
612        r2.actor = "b".to_string();
613
614        // Both replicas delete the same element.
615        r1.remove(0);
616        r2.remove(0);
617
618        r1.merge(&r2);
619        assert!(r1.is_empty());
620    }
621
622    #[test]
623    fn merge_preserves_existing_order() {
624        let mut r1 = Rga::new("a");
625        r1.insert_at(0, 1);
626        r1.insert_at(1, 2);
627        r1.insert_at(2, 3);
628        r1.insert_at(3, 4);
629
630        let snapshot = r1.to_vec();
631
632        let mut r2 = Rga::new("b");
633        r2.insert_at(0, 10);
634
635        r1.merge(&r2);
636
637        // Original elements should still appear in their original relative order.
638        let merged = r1.to_vec();
639        let original_positions: Vec<usize> = snapshot
640            .iter()
641            .map(|v| merged.iter().position(|x| x == v).unwrap())
642            .collect();
643
644        // The original order should be strictly increasing.
645        for w in original_positions.windows(2) {
646            assert!(w[0] < w[1]);
647        }
648    }
649
650    #[test]
651    fn empty_merge_empty() {
652        let mut r1 = Rga::<i32>::new("a");
653        let r2 = Rga::<i32>::new("b");
654        r1.merge(&r2);
655        assert!(r1.is_empty());
656    }
657
658    #[test]
659    fn merge_into_empty() {
660        let mut r1 = Rga::<char>::new("a");
661        let mut r2 = Rga::new("b");
662        r2.insert_at(0, 'z');
663
664        r1.merge(&r2);
665        assert_eq!(r1.to_vec(), vec!['z']);
666    }
667
668    #[test]
669    fn repeated_insert_remove_cycles() {
670        let mut rga = Rga::new("a");
671        for i in 0..5 {
672            rga.insert_at(0, i);
673        }
674        // rga is [4, 3, 2, 1, 0]
675        assert_eq!(rga.len(), 5);
676
677        // Remove all
678        while !rga.is_empty() {
679            rga.remove(0);
680        }
681        assert!(rga.is_empty());
682
683        // Re-insert
684        rga.insert_at(0, 99);
685        assert_eq!(rga.to_vec(), vec![99]);
686    }
687}