Skip to main content

crdt_kit/
rga.rs

1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::vec::Vec;
3use core::fmt;
4
5use crate::{Crdt, DeltaCrdt, NodeId};
6
7/// Error type for RGA operations.
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum RgaError {
10    /// Index is out of bounds for the current visible sequence length.
11    IndexOutOfBounds {
12        /// The index that was requested.
13        index: usize,
14        /// The current length of the visible sequence.
15        len: usize,
16    },
17}
18
19impl fmt::Display for RgaError {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        match self {
22            Self::IndexOutOfBounds { index, len } => {
23                write!(f, "index {index} out of bounds for length {len}")
24            }
25        }
26    }
27}
28
29#[cfg(feature = "std")]
30impl std::error::Error for RgaError {}
31
32/// A single node in the RGA sequence.
33#[derive(Debug, Clone, PartialEq, Eq)]
34#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
35pub struct RgaNode<T: Clone + Ord> {
36    /// Unique identifier: (actor, counter).
37    pub id: (NodeId, u64),
38    /// The element value.
39    pub value: T,
40    /// Whether this element has been tombstoned (logically deleted).
41    pub deleted: bool,
42}
43
44/// A Replicated Growable Array (RGA) — an ordered sequence CRDT.
45///
46/// RGA supports insert and delete at arbitrary positions while
47/// guaranteeing convergence across replicas. Each element is assigned
48/// a unique identifier `(actor, counter)` which determines causal
49/// ordering.
50///
51/// # Example
52///
53/// ```
54/// use crdt_kit::prelude::*;
55///
56/// let mut list1 = Rga::new(1);
57/// list1.insert_at(0, 'H').unwrap();
58/// list1.insert_at(1, 'i').unwrap();
59///
60/// let mut list2 = Rga::new(2);
61/// list2.insert_at(0, '!').unwrap();
62///
63/// list1.merge(&list2);
64/// list2.merge(&list1);
65///
66/// // Both replicas converge to the same sequence
67/// let v1: Vec<&char> = list1.iter().collect();
68/// let v2: Vec<&char> = list2.iter().collect();
69/// assert_eq!(v1, v2);
70/// assert_eq!(list1.len(), 3);
71/// ```
72#[derive(Debug, Clone, PartialEq, Eq)]
73#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
74pub struct Rga<T: Clone + Ord> {
75    actor: NodeId,
76    counter: u64,
77    /// Flat ordered sequence of elements (including tombstones).
78    elements: Vec<RgaNode<T>>,
79    /// Version vector: max counter observed per actor.
80    version: BTreeMap<NodeId, u64>,
81    /// Cached count of visible (non-tombstoned) elements.
82    visible_len: usize,
83}
84
85impl<T: Clone + Ord> Rga<T> {
86    /// Create a new empty RGA for the given node.
87    pub fn new(actor: NodeId) -> Self {
88        Self {
89            actor,
90            counter: 0,
91            elements: Vec::new(),
92            version: BTreeMap::new(),
93            visible_len: 0,
94        }
95    }
96
97    /// Create a fork of this replica with a different node ID.
98    pub fn fork(&self, new_actor: NodeId) -> Self {
99        Self {
100            actor: new_actor,
101            counter: self.counter,
102            elements: self.elements.clone(),
103            version: self.version.clone(),
104            visible_len: self.visible_len,
105        }
106    }
107
108    /// Insert a value at the given index in the visible sequence.
109    pub fn insert_at(&mut self, index: usize, value: T) -> Result<(), RgaError> {
110        if index > self.visible_len {
111            return Err(RgaError::IndexOutOfBounds {
112                index,
113                len: self.visible_len,
114            });
115        }
116
117        self.counter += 1;
118        let id = (self.actor, self.counter);
119        self.version
120            .entry(self.actor)
121            .and_modify(|c| *c = (*c).max(self.counter))
122            .or_insert(self.counter);
123
124        let node = RgaNode {
125            id,
126            value,
127            deleted: false,
128        };
129
130        let raw_index = self.raw_index_for_insert(index);
131        self.elements.insert(raw_index, node);
132        self.visible_len += 1;
133        Ok(())
134    }
135
136    /// Remove the element at the given index from the visible sequence.
137    pub fn remove(&mut self, index: usize) -> Result<T, RgaError> {
138        if index >= self.visible_len {
139            return Err(RgaError::IndexOutOfBounds {
140                index,
141                len: self.visible_len,
142            });
143        }
144        let raw = self.visible_to_raw(index);
145        self.elements[raw].deleted = true;
146        self.visible_len -= 1;
147        Ok(self.elements[raw].value.clone())
148    }
149
150    /// Get a reference to the element at the given index in the visible sequence.
151    #[must_use]
152    pub fn get(&self, index: usize) -> Option<&T> {
153        if index >= self.visible_len {
154            return None;
155        }
156        let raw = self.visible_to_raw(index);
157        Some(&self.elements[raw].value)
158    }
159
160    /// Get the number of visible (non-tombstoned) elements.
161    #[must_use]
162    pub fn len(&self) -> usize {
163        self.visible_len
164    }
165
166    /// Check if the visible sequence is empty.
167    #[must_use]
168    pub fn is_empty(&self) -> bool {
169        self.visible_len == 0
170    }
171
172    /// Iterate over the visible elements in order.
173    pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
174        self.elements
175            .iter()
176            .filter(|n| !n.deleted)
177            .map(|n| &n.value)
178    }
179
180    /// Get this replica's node ID.
181    #[must_use]
182    pub fn actor(&self) -> NodeId {
183        self.actor
184    }
185
186    /// Collect visible elements into a `Vec`.
187    #[must_use]
188    pub fn to_vec(&self) -> Vec<T> {
189        self.iter().cloned().collect()
190    }
191
192    // ---- internal helpers ----
193
194    fn visible_to_raw(&self, visible: usize) -> usize {
195        let mut seen = 0;
196        for (raw, node) in self.elements.iter().enumerate() {
197            if !node.deleted {
198                if seen == visible {
199                    return raw;
200                }
201                seen += 1;
202            }
203        }
204        panic!(
205            "visible index {} not found (only {} visible elements)",
206            visible, seen
207        );
208    }
209
210    fn raw_index_for_insert(&self, visible_index: usize) -> usize {
211        if visible_index == 0 {
212            return 0;
213        }
214        if visible_index >= self.visible_len {
215            return self.elements.len();
216        }
217        self.visible_to_raw(visible_index)
218    }
219
220    fn find_insert_position(&self, node: &RgaNode<T>, after_raw: Option<usize>) -> usize {
221        let start = match after_raw {
222            Some(idx) => idx + 1,
223            None => 0,
224        };
225
226        let new_key = (node.id.1, node.id.0); // (counter, actor)
227
228        for i in start..self.elements.len() {
229            let existing = &self.elements[i];
230            let existing_key = (existing.id.1, existing.id.0);
231            if existing_key < new_key {
232                return i;
233            }
234        }
235
236        self.elements.len()
237    }
238}
239
240/// Delta for [`Rga`]: elements and tombstones that the other replica is missing.
241#[derive(Debug, Clone, PartialEq, Eq)]
242#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
243pub struct RgaDelta<T: Clone + Ord> {
244    /// Elements that the other replica doesn't have yet.
245    pub new_elements: Vec<RgaNode<T>>,
246    /// IDs of elements that are deleted in source but not in other.
247    pub tombstoned_ids: Vec<(NodeId, u64)>,
248    /// Version vector of the source.
249    pub version: BTreeMap<NodeId, u64>,
250}
251
252impl<T: Clone + Ord> DeltaCrdt for Rga<T> {
253    type Delta = RgaDelta<T>;
254
255    fn delta(&self, other: &Self) -> RgaDelta<T> {
256        let new_elements: Vec<_> = self
257            .elements
258            .iter()
259            .filter(|e| {
260                let actor_max = other.version.get(&e.id.0).copied().unwrap_or(0);
261                e.id.1 > actor_max
262            })
263            .cloned()
264            .collect();
265
266        let tombstoned_ids: Vec<_> = self
267            .elements
268            .iter()
269            .filter(|e| {
270                e.deleted && {
271                    let actor_max = other.version.get(&e.id.0).copied().unwrap_or(0);
272                    e.id.1 <= actor_max
273                }
274            })
275            .map(|e| e.id)
276            .collect();
277
278        RgaDelta {
279            new_elements,
280            tombstoned_ids,
281            version: self.version.clone(),
282        }
283    }
284
285    fn apply_delta(&mut self, delta: &RgaDelta<T>) {
286        // Phase 1: Apply tombstones using a pre-built index (no shifts yet).
287        let id_index: BTreeMap<(NodeId, u64), usize> = self
288            .elements
289            .iter()
290            .enumerate()
291            .map(|(i, e)| (e.id, i))
292            .collect();
293
294        for &id in &delta.tombstoned_ids {
295            if let Some(&raw) = id_index.get(&id) {
296                if !self.elements[raw].deleted {
297                    self.elements[raw].deleted = true;
298                    self.visible_len -= 1;
299                }
300            }
301        }
302
303        // Phase 2: Insert new elements. Use a set for dedup; find
304        // predecessor positions by scanning self.elements directly,
305        // avoiding the O(k) index-shift loop per insertion.
306        let mut known_ids: BTreeSet<(NodeId, u64)> =
307            self.elements.iter().map(|e| e.id).collect();
308
309        for (delta_idx, elem) in delta.new_elements.iter().enumerate() {
310            if !known_ids.contains(&elem.id) {
311                let predecessor_raw = if delta_idx == 0 {
312                    None
313                } else {
314                    (0..delta_idx).rev().find_map(|i| {
315                        self.elements
316                            .iter()
317                            .position(|e| e.id == delta.new_elements[i].id)
318                    })
319                };
320
321                let pos = self.find_insert_position(elem, predecessor_raw);
322                self.elements.insert(pos, elem.clone());
323                if !elem.deleted {
324                    self.visible_len += 1;
325                }
326                known_ids.insert(elem.id);
327            }
328        }
329
330        for (&actor, &cnt) in &delta.version {
331            let entry = self.version.entry(actor).or_insert(0);
332            *entry = (*entry).max(cnt);
333        }
334
335        if let Some(&max_cnt) = self.version.values().max() {
336            self.counter = self.counter.max(max_cnt);
337        }
338    }
339}
340
341impl<T: Clone + Ord> Crdt for Rga<T> {
342    fn merge(&mut self, other: &Self) {
343        // Phase 1: Apply tombstones using a pre-built index (no shifts yet).
344        let id_index: BTreeMap<(NodeId, u64), usize> = self
345            .elements
346            .iter()
347            .enumerate()
348            .map(|(i, e)| (e.id, i))
349            .collect();
350
351        for other_elem in &other.elements {
352            if other_elem.deleted {
353                if let Some(&raw) = id_index.get(&other_elem.id) {
354                    if !self.elements[raw].deleted {
355                        self.elements[raw].deleted = true;
356                        self.visible_len -= 1;
357                    }
358                }
359            }
360        }
361
362        // Phase 2: Insert new elements. Use a set for dedup; find
363        // predecessor positions by scanning self.elements directly,
364        // avoiding the O(k) index-shift loop per insertion.
365        let mut known_ids: BTreeSet<(NodeId, u64)> =
366            self.elements.iter().map(|e| e.id).collect();
367
368        for (other_idx, other_elem) in other.elements.iter().enumerate() {
369            if !known_ids.contains(&other_elem.id) {
370                let predecessor_raw = if other_idx == 0 {
371                    None
372                } else {
373                    (0..other_idx).rev().find_map(|i| {
374                        self.elements
375                            .iter()
376                            .position(|e| e.id == other.elements[i].id)
377                    })
378                };
379
380                let pos = self.find_insert_position(other_elem, predecessor_raw);
381                self.elements.insert(pos, other_elem.clone());
382                if !other_elem.deleted {
383                    self.visible_len += 1;
384                }
385                known_ids.insert(other_elem.id);
386            }
387        }
388
389        for (&actor, &cnt) in &other.version {
390            let entry = self.version.entry(actor).or_insert(0);
391            *entry = (*entry).max(cnt);
392        }
393
394        if let Some(&max_cnt) = self.version.values().max() {
395            self.counter = self.counter.max(max_cnt);
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn new_rga_is_empty() {
406        let rga = Rga::<String>::new(1);
407        assert!(rga.is_empty());
408        assert_eq!(rga.len(), 0);
409        assert_eq!(rga.get(0), None);
410    }
411
412    #[test]
413    fn insert_at_head() {
414        let mut rga = Rga::new(1);
415        rga.insert_at(0, 'H').unwrap();
416        rga.insert_at(1, 'i').unwrap();
417        assert_eq!(rga.len(), 2);
418        assert_eq!(rga.get(0), Some(&'H'));
419        assert_eq!(rga.get(1), Some(&'i'));
420    }
421
422    #[test]
423    fn insert_at_middle() {
424        let mut rga = Rga::new(1);
425        rga.insert_at(0, 'a').unwrap();
426        rga.insert_at(1, 'c').unwrap();
427        rga.insert_at(1, 'b').unwrap();
428        assert_eq!(rga.to_vec(), vec!['a', 'b', 'c']);
429    }
430
431    #[test]
432    fn insert_out_of_bounds_returns_error() {
433        let mut rga = Rga::new(1);
434        rga.insert_at(0, 'x').unwrap();
435        let err = rga.insert_at(5, 'y');
436        assert_eq!(
437            err,
438            Err(RgaError::IndexOutOfBounds { index: 5, len: 1 })
439        );
440    }
441
442    #[test]
443    fn remove_element() {
444        let mut rga = Rga::new(1);
445        rga.insert_at(0, 'a').unwrap();
446        rga.insert_at(1, 'b').unwrap();
447        rga.insert_at(2, 'c').unwrap();
448
449        let removed = rga.remove(1).unwrap();
450        assert_eq!(removed, 'b');
451        assert_eq!(rga.len(), 2);
452        assert_eq!(rga.to_vec(), vec!['a', 'c']);
453    }
454
455    #[test]
456    fn merge_disjoint_inserts() {
457        let mut r1 = Rga::new(1);
458        r1.insert_at(0, 'x').unwrap();
459
460        let mut r2 = Rga::new(2);
461        r2.insert_at(0, 'y').unwrap();
462
463        r1.merge(&r2);
464        assert_eq!(r1.len(), 2);
465        let v = r1.to_vec();
466        assert!(v.contains(&'x'));
467        assert!(v.contains(&'y'));
468    }
469
470    #[test]
471    fn merge_concurrent_inserts_at_same_position() {
472        let mut r1 = Rga::new(1);
473        r1.insert_at(0, 'A').unwrap();
474
475        let mut r2 = Rga::new(2);
476        r2.insert_at(0, 'B').unwrap();
477
478        let mut r1_copy = r1.clone();
479        let mut r2_copy = r2.clone();
480
481        r1_copy.merge(&r2);
482        r2_copy.merge(&r1);
483
484        assert_eq!(r1_copy.to_vec(), r2_copy.to_vec());
485        assert_eq!(r1_copy.len(), 2);
486    }
487
488    #[test]
489    fn merge_concurrent_inserts_after_shared_prefix() {
490        let mut r1 = Rga::new(1);
491        r1.insert_at(0, 'H').unwrap();
492        r1.insert_at(1, 'e').unwrap();
493
494        let mut r2 = r1.fork(2);
495
496        r1.insert_at(2, 'X').unwrap();
497        r2.insert_at(2, 'Y').unwrap();
498
499        let mut r1_merged = r1.clone();
500        r1_merged.merge(&r2);
501
502        let mut r2_merged = r2.clone();
503        r2_merged.merge(&r1);
504
505        assert_eq!(r1_merged.to_vec(), r2_merged.to_vec());
506        assert_eq!(r1_merged.len(), 4);
507        assert_eq!(r1_merged.get(0), Some(&'H'));
508        assert_eq!(r1_merged.get(1), Some(&'e'));
509    }
510
511    #[test]
512    fn merge_with_deletions() {
513        let mut r1 = Rga::new(1);
514        r1.insert_at(0, 'a').unwrap();
515        r1.insert_at(1, 'b').unwrap();
516        r1.insert_at(2, 'c').unwrap();
517
518        let mut r2 = r1.fork(2);
519
520        r1.remove(1).unwrap();
521        r2.insert_at(3, 'd').unwrap();
522
523        r1.merge(&r2);
524        assert!(!r1.to_vec().contains(&'b'));
525        assert!(r1.to_vec().contains(&'d'));
526        assert_eq!(r1.len(), 3);
527    }
528
529    #[test]
530    fn merge_is_commutative() {
531        let mut r1 = Rga::new(1);
532        r1.insert_at(0, 1).unwrap();
533        r1.insert_at(1, 2).unwrap();
534
535        let mut r2 = Rga::new(2);
536        r2.insert_at(0, 3).unwrap();
537        r2.insert_at(1, 4).unwrap();
538
539        let mut left = r1.clone();
540        left.merge(&r2);
541
542        let mut right = r2.clone();
543        right.merge(&r1);
544
545        assert_eq!(left.to_vec(), right.to_vec());
546    }
547
548    #[test]
549    fn merge_is_idempotent() {
550        let mut r1 = Rga::new(1);
551        r1.insert_at(0, 'x').unwrap();
552        r1.insert_at(1, 'y').unwrap();
553
554        let mut r2 = Rga::new(2);
555        r2.insert_at(0, 'z').unwrap();
556
557        r1.merge(&r2);
558        let after_first = r1.clone();
559        r1.merge(&r2);
560        assert_eq!(r1, after_first);
561    }
562
563    #[test]
564    fn delta_apply_equivalent_to_merge() {
565        let mut r1 = Rga::new(1);
566        r1.insert_at(0, 'H').unwrap();
567        r1.insert_at(1, 'i').unwrap();
568
569        let mut r2 = Rga::new(2);
570        r2.insert_at(0, '!').unwrap();
571
572        let mut via_merge = r2.clone();
573        via_merge.merge(&r1);
574
575        let mut via_delta = r2.clone();
576        let d = r1.delta(&r2);
577        via_delta.apply_delta(&d);
578
579        assert_eq!(via_merge.to_vec(), via_delta.to_vec());
580    }
581
582    #[test]
583    fn fork_creates_independent_replica() {
584        let mut r1 = Rga::new(1);
585        r1.insert_at(0, 'x').unwrap();
586        r1.insert_at(1, 'y').unwrap();
587
588        let mut r2 = r1.fork(2);
589        r2.insert_at(2, 'z').unwrap();
590
591        assert_eq!(r1.len(), 2);
592        assert_eq!(r2.len(), 3);
593
594        r1.merge(&r2);
595        assert_eq!(r1.to_vec(), vec!['x', 'y', 'z']);
596    }
597}