Skip to main content

logicaffeine_data/crdt/sequence/
rga.rs

1//! RGA (Replicated Growable Array) CRDT.
2//!
3//! A sequence CRDT suitable for collaborative lists.
4//! Uses timestamps and replica IDs to order concurrent insertions.
5
6use crate::crdt::causal::VClock;
7use crate::crdt::delta::DeltaCrdt;
8use crate::crdt::replica::{generate_replica_id, ReplicaId};
9use crate::crdt::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::cmp::Ordering;
13use std::collections::HashMap;
14
15/// Delta for RGA synchronization.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RGADelta<T> {
18    pub nodes: Vec<RgaNode<T>>,
19    pub timestamp: u64,
20}
21
22/// Unique identifier for a node in the RGA.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
24pub struct RgaId {
25    /// Logical timestamp
26    pub timestamp: u64,
27    /// Replica that created this node
28    pub replica: ReplicaId,
29}
30
31impl RgaId {
32    fn new(timestamp: u64, replica: ReplicaId) -> Self {
33        Self { timestamp, replica }
34    }
35}
36
37impl Ord for RgaId {
38    fn cmp(&self, other: &Self) -> Ordering {
39        // Higher timestamp wins, then higher replica ID as tiebreaker
40        match self.timestamp.cmp(&other.timestamp) {
41            Ordering::Equal => self.replica.cmp(&other.replica),
42            ord => ord,
43        }
44    }
45}
46
47impl PartialOrd for RgaId {
48    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49        Some(self.cmp(other))
50    }
51}
52
53/// A node in the RGA linked structure.
54#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
55pub struct RgaNode<T> {
56    pub id: RgaId,
57    pub value: T,
58    pub deleted: bool,
59    /// ID of the node this was inserted after (None for head)
60    pub parent: Option<RgaId>,
61}
62
63/// Replicated Growable Array - a sequence CRDT.
64///
65/// Supports append, insert, and remove operations with deterministic
66/// conflict resolution for concurrent operations.
67#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
68pub struct RGA<T> {
69    /// All nodes (including deleted ones for tombstones)
70    nodes: Vec<RgaNode<T>>,
71    /// Current logical timestamp
72    timestamp: u64,
73    /// This replica's ID
74    replica_id: ReplicaId,
75}
76
77impl<T: Clone + PartialEq> RGA<T> {
78    /// Create a new empty RGA.
79    pub fn new(replica_id: ReplicaId) -> Self {
80        Self {
81            nodes: Vec::new(),
82            timestamp: 0,
83            replica_id,
84        }
85    }
86
87    /// Create a new RGA with a random replica ID.
88    pub fn new_random() -> Self {
89        Self::new(generate_replica_id())
90    }
91
92    /// Append a value to the end of the sequence.
93    pub fn append(&mut self, value: T) {
94        self.timestamp += 1;
95        let id = RgaId::new(self.timestamp, self.replica_id);
96        let parent = self.last_visible_id();
97
98        self.nodes.push(RgaNode {
99            id,
100            value,
101            deleted: false,
102            parent,
103        });
104    }
105
106    /// Insert a value before the element at the given index.
107    pub fn insert_before(&mut self, index: usize, value: T) {
108        if index == 0 {
109            // Insert at head
110            self.timestamp += 1;
111            let id = RgaId::new(self.timestamp, self.replica_id);
112            self.nodes.push(RgaNode {
113                id,
114                value,
115                deleted: false,
116                parent: None,
117            });
118        } else {
119            // Insert after the element before index
120            self.insert_after(index - 1, value);
121        }
122    }
123
124    /// Insert a value after the element at the given index.
125    pub fn insert_after(&mut self, index: usize, value: T) {
126        let parent_id = self.visible_id_at(index);
127        self.timestamp += 1;
128        let id = RgaId::new(self.timestamp, self.replica_id);
129
130        self.nodes.push(RgaNode {
131            id,
132            value,
133            deleted: false,
134            parent: parent_id,
135        });
136    }
137
138    /// Remove the element at the given index (tombstone deletion).
139    pub fn remove(&mut self, index: usize) {
140        if let Some(id) = self.visible_id_at(index) {
141            if let Some(node) = self.nodes.iter_mut().find(|n| n.id == id) {
142                node.deleted = true;
143            }
144        }
145    }
146
147    /// Get the element at the given index.
148    pub fn get(&self, index: usize) -> Option<&T> {
149        self.visible_nodes().nth(index).map(|n| &n.value)
150    }
151
152    /// Get the number of visible elements.
153    pub fn len(&self) -> usize {
154        self.visible_nodes().count()
155    }
156
157    /// Check if the sequence is empty.
158    pub fn is_empty(&self) -> bool {
159        self.len() == 0
160    }
161
162    /// Convert to a vector of values.
163    pub fn to_vec(&self) -> Vec<T> {
164        self.visible_nodes().map(|n| n.value.clone()).collect()
165    }
166
167    /// Iterate over visible elements.
168    pub fn iter(&self) -> impl Iterator<Item = &T> {
169        self.visible_nodes().map(|n| &n.value)
170    }
171
172    /// Get sorted visible nodes in document order.
173    fn visible_nodes(&self) -> impl Iterator<Item = &RgaNode<T>> {
174        self.sorted_nodes()
175            .into_iter()
176            .filter(|n| !n.deleted)
177    }
178
179    /// Get all nodes sorted in document order.
180    fn sorted_nodes(&self) -> Vec<&RgaNode<T>> {
181        // Build a map from parent ID to children
182        let mut children_map: HashMap<Option<RgaId>, Vec<&RgaNode<T>>> = HashMap::new();
183        for node in &self.nodes {
184            children_map.entry(node.parent).or_default().push(node);
185        }
186
187        // Sort children at each level - higher ID (later insert) comes first
188        for children in children_map.values_mut() {
189            children.sort_by(|a, b| b.id.cmp(&a.id));
190        }
191
192        // DFS traversal starting from root (parent = None)
193        let mut result: Vec<&RgaNode<T>> = Vec::new();
194        let mut stack: Vec<&RgaNode<T>> = Vec::new();
195
196        // Start with head nodes in reverse order
197        if let Some(heads) = children_map.get(&None) {
198            for node in heads.iter().rev() {
199                stack.push(node);
200            }
201        }
202
203        while let Some(node) = stack.pop() {
204            result.push(node);
205
206            // Add children in reverse order
207            if let Some(children) = children_map.get(&Some(node.id)) {
208                for child in children.iter().rev() {
209                    stack.push(child);
210                }
211            }
212        }
213
214        result
215    }
216
217    /// Get the ID of the last visible node.
218    fn last_visible_id(&self) -> Option<RgaId> {
219        self.sorted_nodes()
220            .into_iter()
221            .filter(|n| !n.deleted)
222            .last()
223            .map(|n| n.id)
224    }
225
226    /// Get the ID of the visible node at the given index.
227    fn visible_id_at(&self, index: usize) -> Option<RgaId> {
228        self.visible_nodes().nth(index).map(|n| n.id)
229    }
230}
231
232impl<T: Clone + PartialEq> Merge for RGA<T> {
233    fn merge(&mut self, other: &Self) {
234        // Update our timestamp to be at least as high as other's
235        self.timestamp = self.timestamp.max(other.timestamp);
236
237        // Add nodes from other that we don't have
238        for other_node in &other.nodes {
239            let exists = self.nodes.iter().any(|n| n.id == other_node.id);
240            if !exists {
241                self.nodes.push(other_node.clone());
242            } else {
243                // If we have the node, merge deleted status
244                if let Some(my_node) = self.nodes.iter_mut().find(|n| n.id == other_node.id) {
245                    if other_node.deleted {
246                        my_node.deleted = true;
247                    }
248                }
249            }
250        }
251    }
252}
253
254impl<T: Clone + PartialEq + Serialize + DeserializeOwned + Send + 'static> DeltaCrdt for RGA<T> {
255    type Delta = RGADelta<T>;
256
257    fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
258        let current = self.version();
259        if since.dominates(&current) {
260            return None;
261        }
262
263        // Return full state as delta
264        Some(RGADelta {
265            nodes: self.nodes.clone(),
266            timestamp: self.timestamp,
267        })
268    }
269
270    fn apply_delta(&mut self, delta: &Self::Delta) {
271        self.timestamp = self.timestamp.max(delta.timestamp);
272
273        for delta_node in &delta.nodes {
274            let exists = self.nodes.iter().any(|n| n.id == delta_node.id);
275            if !exists {
276                self.nodes.push(delta_node.clone());
277            } else if let Some(my_node) = self.nodes.iter_mut().find(|n| n.id == delta_node.id) {
278                if delta_node.deleted {
279                    my_node.deleted = true;
280                }
281            }
282        }
283    }
284
285    fn version(&self) -> VClock {
286        // Build VClock from the max timestamp per replica
287        let mut clock = VClock::new();
288        for node in &self.nodes {
289            let current = clock.get(node.id.replica);
290            if node.id.timestamp > current {
291                // Set to the max timestamp seen for this replica
292                for _ in current..node.id.timestamp {
293                    clock.increment(node.id.replica);
294                }
295            }
296        }
297        clock
298    }
299}
300
301impl<T: Clone + PartialEq> Default for RGA<T> {
302    fn default() -> Self {
303        Self::new_random()
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_rga_append() {
313        let mut seq: RGA<String> = RGA::new(1);
314        seq.append("a".to_string());
315        seq.append("b".to_string());
316        assert_eq!(seq.to_vec(), vec!["a", "b"]);
317    }
318
319    #[test]
320    fn test_rga_insert_before() {
321        let mut seq: RGA<String> = RGA::new(1);
322        seq.append("b".to_string());
323        seq.insert_before(0, "a".to_string());
324        assert_eq!(seq.to_vec(), vec!["a", "b"]);
325    }
326
327    #[test]
328    fn test_rga_remove() {
329        let mut seq: RGA<String> = RGA::new(1);
330        seq.append("a".to_string());
331        seq.append("b".to_string());
332        seq.remove(0);
333        assert_eq!(seq.to_vec(), vec!["b"]);
334    }
335
336    #[test]
337    fn test_rga_concurrent_append() {
338        let mut a: RGA<String> = RGA::new(1);
339        let mut b: RGA<String> = RGA::new(2);
340
341        a.append("from-a".to_string());
342        b.append("from-b".to_string());
343
344        a.merge(&b);
345        b.merge(&a);
346
347        assert_eq!(a.to_vec(), b.to_vec());
348    }
349}