Skip to main content

mdcs_db/
rga_list.rs

1//! RGA List - Replicated Growable Array for ordered sequences.
2//!
3//! RGA provides a CRDT list that supports:
4//! - Insert at any position
5//! - Delete at any position
6//! - Move elements (delete + insert)
7//!
8//! Uses unique IDs to maintain consistent ordering across replicas.
9
10use mdcs_core::lattice::Lattice;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use ulid::Ulid;
14
15/// Unique identifier for a list element.
16#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct ListId {
18    /// The replica that created this element.
19    pub replica: String,
20    /// Sequence number within that replica.
21    pub seq: u64,
22    /// Unique identifier for disambiguation.
23    pub ulid: Ulid,
24}
25
26impl ListId {
27    pub fn new(replica: impl Into<String>, seq: u64) -> Self {
28        Self {
29            replica: replica.into(),
30            seq,
31            ulid: Ulid::new(),
32        }
33    }
34
35    /// Create a genesis ID (for the virtual head).
36    pub fn genesis() -> Self {
37        Self {
38            replica: "".to_string(),
39            seq: 0,
40            ulid: Ulid::nil(),
41        }
42    }
43}
44
45impl PartialOrd for ListId {
46    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51impl Ord for ListId {
52    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
53        // Higher sequence = later in causal order
54        // Tie-break on replica ID, then ULID
55        self.seq
56            .cmp(&other.seq)
57            .then_with(|| self.replica.cmp(&other.replica))
58            .then_with(|| self.ulid.cmp(&other.ulid))
59    }
60}
61
62/// A node in the RGA list.
63#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
64pub struct ListNode<T> {
65    /// The unique ID of this node.
66    pub id: ListId,
67    /// The value stored (None if deleted - tombstone).
68    pub value: Option<T>,
69    /// The ID of the element this was inserted after.
70    pub origin: ListId,
71    /// Whether this node is deleted (tombstone).
72    pub deleted: bool,
73}
74
75impl<T> ListNode<T> {
76    pub fn new(id: ListId, value: T, origin: ListId) -> Self {
77        Self {
78            id,
79            value: Some(value),
80            origin,
81            deleted: false,
82        }
83    }
84}
85
86/// Delta for RGA list operations.
87#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
88pub struct RGAListDelta<T: Clone + PartialEq> {
89    /// Nodes to insert.
90    pub inserts: Vec<ListNode<T>>,
91    /// IDs of nodes to delete.
92    pub deletes: Vec<ListId>,
93}
94
95impl<T: Clone + PartialEq> RGAListDelta<T> {
96    pub fn new() -> Self {
97        Self {
98            inserts: Vec::new(),
99            deletes: Vec::new(),
100        }
101    }
102
103    pub fn is_empty(&self) -> bool {
104        self.inserts.is_empty() && self.deletes.is_empty()
105    }
106}
107
108impl<T: Clone + PartialEq> Default for RGAListDelta<T> {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114/// Replicated Growable Array - an ordered list CRDT.
115///
116/// Supports insert, delete, and move operations with
117/// deterministic conflict resolution.
118#[derive(Clone, Debug, Serialize, Deserialize)]
119pub struct RGAList<T: Clone + PartialEq> {
120    /// All nodes indexed by their ID.
121    nodes: HashMap<ListId, ListNode<T>>,
122    /// Children of each node (for ordering).
123    /// Maps origin -> list of children sorted by ID.
124    children: HashMap<ListId, Vec<ListId>>,
125    /// The replica ID for this instance.
126    replica_id: String,
127    /// Sequence counter for generating IDs.
128    seq: u64,
129    /// Pending delta for replication.
130    #[serde(skip)]
131    pending_delta: Option<RGAListDelta<T>>,
132}
133
134impl<T: Clone + PartialEq> RGAList<T> {
135    /// Create a new empty RGA list.
136    pub fn new(replica_id: impl Into<String>) -> Self {
137        let replica_id = replica_id.into();
138        let mut list = Self {
139            nodes: HashMap::new(),
140            children: HashMap::new(),
141            replica_id,
142            seq: 0,
143            pending_delta: None,
144        };
145
146        // Insert virtual head node
147        let genesis = ListId::genesis();
148        list.children.insert(genesis, Vec::new());
149
150        list
151    }
152
153    /// Get the replica ID.
154    pub fn replica_id(&self) -> &str {
155        &self.replica_id
156    }
157
158    /// Generate a new unique ID.
159    fn next_id(&mut self) -> ListId {
160        self.seq += 1;
161        ListId::new(&self.replica_id, self.seq)
162    }
163
164    /// Insert a value at the given index.
165    pub fn insert(&mut self, index: usize, value: T) {
166        let origin = self
167            .id_at_index(index.saturating_sub(1))
168            .unwrap_or(ListId::genesis());
169        self.insert_after(&origin, value);
170    }
171
172    /// Insert a value after the given origin ID.
173    pub fn insert_after(&mut self, origin: &ListId, value: T) {
174        let id = self.next_id();
175        let node = ListNode::new(id.clone(), value, origin.clone());
176
177        self.integrate_node(node.clone());
178
179        // Record delta
180        let delta = self.pending_delta.get_or_insert_with(RGAListDelta::new);
181        delta.inserts.push(node);
182    }
183
184    /// Insert at the beginning.
185    pub fn push_front(&mut self, value: T) {
186        self.insert(0, value);
187    }
188
189    /// Insert at the end.
190    pub fn push_back(&mut self, value: T) {
191        let len = self.len();
192        self.insert(len, value);
193    }
194
195    /// Delete the element at the given index.
196    pub fn delete(&mut self, index: usize) -> Option<T> {
197        let id = self.id_at_index(index)?;
198        self.delete_by_id(&id)
199    }
200
201    /// Delete an element by its ID.
202    pub fn delete_by_id(&mut self, id: &ListId) -> Option<T> {
203        if let Some(node) = self.nodes.get_mut(id) {
204            if !node.deleted {
205                node.deleted = true;
206                let value = node.value.take();
207
208                // Record delta
209                let delta = self.pending_delta.get_or_insert_with(RGAListDelta::new);
210                delta.deletes.push(id.clone());
211
212                return value;
213            }
214        }
215        None
216    }
217
218    /// Move an element from one index to another.
219    pub fn move_element(&mut self, from: usize, to: usize) -> bool {
220        if let Some(value) = self.delete(from) {
221            // Adjust target index if moving forward
222            let adjusted_to = if to > from { to - 1 } else { to };
223            self.insert(adjusted_to, value);
224            true
225        } else {
226            false
227        }
228    }
229
230    /// Get the element at the given index.
231    pub fn get(&self, index: usize) -> Option<&T> {
232        let id = self.id_at_index(index)?;
233        self.nodes.get(&id).and_then(|n| n.value.as_ref())
234    }
235
236    /// Get a mutable reference to the element at the given index.
237    pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
238        let id = self.id_at_index(index)?;
239        self.nodes.get_mut(&id).and_then(|n| n.value.as_mut())
240    }
241
242    /// Get the number of non-deleted elements.
243    pub fn len(&self) -> usize {
244        self.nodes.values().filter(|n| !n.deleted).count()
245    }
246
247    /// Check if the list is empty.
248    pub fn is_empty(&self) -> bool {
249        self.len() == 0
250    }
251
252    /// Iterate over values in order.
253    pub fn iter(&self) -> impl Iterator<Item = &T> {
254        self.iter_nodes()
255            .filter(|n| !n.deleted)
256            .filter_map(|n| n.value.as_ref())
257    }
258
259    /// Iterate over (index, value) pairs.
260    pub fn iter_indexed(&self) -> impl Iterator<Item = (usize, &T)> {
261        self.iter().enumerate()
262    }
263
264    /// Convert to a Vec.
265    pub fn to_vec(&self) -> Vec<T> {
266        self.iter().cloned().collect()
267    }
268
269    /// Get the ID at a given visible index.
270    fn id_at_index(&self, index: usize) -> Option<ListId> {
271        self.iter_nodes()
272            .filter(|n| !n.deleted)
273            .nth(index)
274            .map(|n| n.id.clone())
275    }
276
277    /// Get the visible index for an ID.
278    pub fn index_of_id(&self, id: &ListId) -> Option<usize> {
279        self.iter_nodes()
280            .filter(|n| !n.deleted)
281            .position(|n| &n.id == id)
282    }
283
284    /// Iterate over all nodes in order (including tombstones).
285    fn iter_nodes(&self) -> impl Iterator<Item = &ListNode<T>> {
286        RGAIterator {
287            list: self,
288            stack: vec![ListId::genesis()],
289            visited: std::collections::HashSet::new(),
290        }
291    }
292
293    /// Integrate a node into the list.
294    fn integrate_node(&mut self, node: ListNode<T>) {
295        let id = node.id.clone();
296        let origin = node.origin.clone();
297
298        // Add to nodes map
299        self.nodes.insert(id.clone(), node);
300
301        // Add to children of origin, maintaining sort order
302        let children = self.children.entry(origin).or_default();
303
304        // Find insertion position (maintain descending order by ID for RGA)
305        let pos = children
306            .iter()
307            .position(|c| c < &id)
308            .unwrap_or(children.len());
309        children.insert(pos, id.clone());
310
311        // Ensure this node has a children entry
312        self.children.entry(id).or_default();
313    }
314
315    /// Take the pending delta.
316    pub fn take_delta(&mut self) -> Option<RGAListDelta<T>> {
317        self.pending_delta.take()
318    }
319
320    /// Apply a delta from another replica.
321    pub fn apply_delta(&mut self, delta: &RGAListDelta<T>) {
322        // Apply inserts
323        for node in &delta.inserts {
324            if !self.nodes.contains_key(&node.id) {
325                self.integrate_node(node.clone());
326            }
327        }
328
329        // Apply deletes
330        for id in &delta.deletes {
331            if let Some(node) = self.nodes.get_mut(id) {
332                node.deleted = true;
333                node.value = None;
334            }
335        }
336    }
337}
338
339/// Iterator for traversing the RGA list in order.
340struct RGAIterator<'a, T: Clone + PartialEq> {
341    list: &'a RGAList<T>,
342    stack: Vec<ListId>,
343    visited: std::collections::HashSet<ListId>,
344}
345
346impl<'a, T: Clone + PartialEq> Iterator for RGAIterator<'a, T> {
347    type Item = &'a ListNode<T>;
348
349    fn next(&mut self) -> Option<Self::Item> {
350        while let Some(id) = self.stack.pop() {
351            if self.visited.contains(&id) {
352                continue;
353            }
354            self.visited.insert(id.clone());
355
356            // Push children in reverse order (so first child is processed first)
357            if let Some(children) = self.list.children.get(&id) {
358                for child in children.iter().rev() {
359                    if !self.visited.contains(child) {
360                        self.stack.push(child.clone());
361                    }
362                }
363            }
364
365            // Return the node (skip genesis)
366            if id != ListId::genesis() {
367                if let Some(node) = self.list.nodes.get(&id) {
368                    return Some(node);
369                }
370            }
371        }
372        None
373    }
374}
375
376impl<T: Clone + PartialEq> PartialEq for RGAList<T> {
377    fn eq(&self, other: &Self) -> bool {
378        // Compare visible content
379        self.to_vec() == other.to_vec()
380    }
381}
382
383impl<T: Clone + PartialEq> Lattice for RGAList<T> {
384    fn bottom() -> Self {
385        Self::new("")
386    }
387
388    fn join(&self, other: &Self) -> Self {
389        let mut result = self.clone();
390
391        // Merge all nodes from other
392        for (id, node) in &other.nodes {
393            if let Some(existing) = result.nodes.get_mut(id) {
394                // If deleted in either, mark as deleted
395                if node.deleted {
396                    existing.deleted = true;
397                    existing.value = None;
398                }
399            } else {
400                // Add new node
401                result.integrate_node(node.clone());
402            }
403        }
404
405        result
406    }
407}
408
409impl<T: Clone + PartialEq> Default for RGAList<T> {
410    fn default() -> Self {
411        Self::new("")
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn test_basic_operations() {
421        let mut list: RGAList<String> = RGAList::new("r1");
422
423        list.push_back("a".to_string());
424        list.push_back("b".to_string());
425        list.push_back("c".to_string());
426
427        assert_eq!(list.len(), 3);
428        assert_eq!(list.get(0), Some(&"a".to_string()));
429        assert_eq!(list.get(1), Some(&"b".to_string()));
430        assert_eq!(list.get(2), Some(&"c".to_string()));
431    }
432
433    #[test]
434    fn test_insert_at_index() {
435        let mut list: RGAList<i32> = RGAList::new("r1");
436
437        list.push_back(1);
438        list.push_back(3);
439        list.insert(1, 2);
440
441        assert_eq!(list.to_vec(), vec![1, 2, 3]);
442    }
443
444    #[test]
445    fn test_delete() {
446        let mut list: RGAList<i32> = RGAList::new("r1");
447
448        list.push_back(1);
449        list.push_back(2);
450        list.push_back(3);
451
452        let deleted = list.delete(1);
453        assert_eq!(deleted, Some(2));
454        assert_eq!(list.to_vec(), vec![1, 3]);
455    }
456
457    #[test]
458    fn test_concurrent_inserts() {
459        let mut list1: RGAList<&str> = RGAList::new("r1");
460        let mut list2: RGAList<&str> = RGAList::new("r2");
461
462        // Both start with "a"
463        list1.push_back("a");
464        list2.apply_delta(&list1.take_delta().unwrap());
465
466        // Concurrent inserts after "a"
467        list1.push_back("b"); // r1 inserts "b"
468        list2.push_back("c"); // r2 inserts "c"
469
470        // Exchange deltas
471        let delta1 = list1.take_delta().unwrap();
472        let delta2 = list2.take_delta().unwrap();
473
474        list1.apply_delta(&delta2);
475        list2.apply_delta(&delta1);
476
477        // Should converge to same order
478        assert_eq!(list1.to_vec(), list2.to_vec());
479    }
480
481    #[test]
482    fn test_move_element() {
483        let mut list: RGAList<i32> = RGAList::new("r1");
484
485        list.push_back(1);
486        list.push_back(2);
487        list.push_back(3);
488
489        list.move_element(0, 2);
490        assert_eq!(list.to_vec(), vec![2, 1, 3]);
491    }
492
493    #[test]
494    fn test_lattice_join() {
495        let mut list1: RGAList<i32> = RGAList::new("r1");
496        let mut list2: RGAList<i32> = RGAList::new("r2");
497
498        list1.push_back(1);
499        list2.push_back(2);
500
501        let merged = list1.join(&list2);
502
503        // Both elements should be present
504        assert_eq!(merged.len(), 2);
505    }
506
507    #[test]
508    fn test_iter() {
509        let mut list: RGAList<i32> = RGAList::new("r1");
510
511        list.push_back(1);
512        list.push_back(2);
513        list.push_back(3);
514
515        let collected: Vec<_> = list.iter().cloned().collect();
516        assert_eq!(collected, vec![1, 2, 3]);
517    }
518}