Skip to main content

logicaffeine_data/crdt/sequence/
yata.rs

1//! YATA (Yet Another Transformation Approach) CRDT.
2//!
3//! A sequence CRDT optimized for collaborative text editing.
4//! Uses origin-left and origin-right to handle concurrent insertions.
5
6use crate::crdt::causal::VClock;
7use crate::crdt::delta::DeltaCrdt;
8use crate::crdt::replica::{generate_replica_id, ReplicaId};
9use crate::crdt::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::cmp::Ordering;
13
14/// Delta for YATA synchronization.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct YATADelta<T> {
17    pub items: Vec<YataItem<T>>,
18    pub clock: u64,
19}
20
21/// Unique identifier for a YATA item.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct YataId {
24    /// Logical clock
25    pub clock: u64,
26    /// Replica that created this item
27    pub replica: ReplicaId,
28}
29
30impl YataId {
31    fn new(clock: u64, replica: ReplicaId) -> Self {
32        Self { clock, replica }
33    }
34}
35
36impl Ord for YataId {
37    fn cmp(&self, other: &Self) -> Ordering {
38        match self.clock.cmp(&other.clock) {
39            Ordering::Equal => self.replica.cmp(&other.replica),
40            ord => ord,
41        }
42    }
43}
44
45impl PartialOrd for YataId {
46    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51/// A YATA item.
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
53pub struct YataItem<T> {
54    pub id: YataId,
55    pub value: T,
56    pub deleted: bool,
57    /// The item this was inserted to the left of (origin)
58    pub origin_left: Option<YataId>,
59    /// The item this was inserted to the right of (for tie-breaking)
60    pub origin_right: Option<YataId>,
61}
62
63/// YATA sequence CRDT.
64///
65/// Better handling of interleaving for collaborative text editing
66/// compared to RGA.
67#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
68pub struct YATA<T> {
69    items: Vec<YataItem<T>>,
70    clock: u64,
71    replica_id: ReplicaId,
72}
73
74impl<T: Clone + PartialEq> YATA<T> {
75    /// Create a new empty YATA sequence.
76    pub fn new(replica_id: ReplicaId) -> Self {
77        Self {
78            items: Vec::new(),
79            clock: 0,
80            replica_id,
81        }
82    }
83
84    /// Create with random replica ID.
85    pub fn new_random() -> Self {
86        Self::new(generate_replica_id())
87    }
88
89    /// Append a value to the end.
90    pub fn append(&mut self, value: T) {
91        self.clock += 1;
92        let id = YataId::new(self.clock, self.replica_id);
93        let origin_left = self.last_visible_id();
94
95        self.items.push(YataItem {
96            id,
97            value,
98            deleted: false,
99            origin_left,
100            origin_right: None,
101        });
102    }
103
104    /// Insert a value after the element at the given index.
105    pub fn insert_after(&mut self, index: usize, value: T) {
106        let origin_left = self.visible_id_at(index);
107        let origin_right = self.visible_id_at(index + 1);
108
109        self.clock += 1;
110        let id = YataId::new(self.clock, self.replica_id);
111
112        self.items.push(YataItem {
113            id,
114            value,
115            deleted: false,
116            origin_left,
117            origin_right,
118        });
119    }
120
121    /// Insert a value before the element at the given index.
122    pub fn insert_before(&mut self, index: usize, value: T) {
123        if index == 0 {
124            self.clock += 1;
125            let id = YataId::new(self.clock, self.replica_id);
126            let origin_right = self.visible_id_at(0);
127
128            self.items.push(YataItem {
129                id,
130                value,
131                deleted: false,
132                origin_left: None,
133                origin_right,
134            });
135        } else {
136            self.insert_after(index - 1, value);
137        }
138    }
139
140    /// Remove the element at the given index.
141    pub fn remove(&mut self, index: usize) {
142        if let Some(id) = self.visible_id_at(index) {
143            if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
144                item.deleted = true;
145            }
146        }
147    }
148
149    /// Get the element at the given index.
150    pub fn get(&self, index: usize) -> Option<&T> {
151        self.visible_items().nth(index).map(|i| &i.value)
152    }
153
154    /// Get the number of visible elements.
155    pub fn len(&self) -> usize {
156        self.visible_items().count()
157    }
158
159    /// Check if empty.
160    pub fn is_empty(&self) -> bool {
161        self.len() == 0
162    }
163
164    /// Convert to vector.
165    pub fn to_vec(&self) -> Vec<T> {
166        self.visible_items().map(|i| i.value.clone()).collect()
167    }
168
169    /// Iterate over visible elements.
170    pub fn iter(&self) -> impl Iterator<Item = &T> {
171        self.visible_items().map(|i| &i.value)
172    }
173
174    /// Get sorted visible items.
175    fn visible_items(&self) -> impl Iterator<Item = &YataItem<T>> {
176        self.sorted_items().into_iter().filter(|i| !i.deleted)
177    }
178
179    /// Sort items according to YATA rules.
180    fn sorted_items(&self) -> Vec<&YataItem<T>> {
181        let mut result: Vec<&YataItem<T>> = Vec::new();
182
183        // Start with items that have no left origin
184        let mut to_process: Vec<&YataItem<T>> = self
185            .items
186            .iter()
187            .filter(|i| i.origin_left.is_none())
188            .collect();
189
190        // Sort ascending so pop() gives higher IDs first (later inserts appear first)
191        to_process.sort_by(|a, b| a.id.cmp(&b.id));
192
193        while let Some(item) = to_process.pop() {
194            result.push(item);
195
196            // Find items whose origin_left is this item
197            let mut followers: Vec<&YataItem<T>> = self
198                .items
199                .iter()
200                .filter(|i| i.origin_left == Some(item.id))
201                .collect();
202
203            // Sort ascending so pop() gives higher IDs first
204            followers.sort_by(|a, b| a.id.cmp(&b.id));
205
206            to_process.extend(followers);
207        }
208
209        result
210    }
211
212    fn last_visible_id(&self) -> Option<YataId> {
213        self.sorted_items()
214            .into_iter()
215            .filter(|i| !i.deleted)
216            .last()
217            .map(|i| i.id)
218    }
219
220    fn visible_id_at(&self, index: usize) -> Option<YataId> {
221        self.visible_items().nth(index).map(|i| i.id)
222    }
223}
224
225impl<T: Clone + PartialEq> Merge for YATA<T> {
226    fn merge(&mut self, other: &Self) {
227        self.clock = self.clock.max(other.clock);
228
229        for other_item in &other.items {
230            let exists = self.items.iter().any(|i| i.id == other_item.id);
231            if !exists {
232                self.items.push(other_item.clone());
233            } else if let Some(my_item) = self.items.iter_mut().find(|i| i.id == other_item.id) {
234                if other_item.deleted {
235                    my_item.deleted = true;
236                }
237            }
238        }
239    }
240}
241
242impl<T: Clone + PartialEq + Serialize + DeserializeOwned + Send + 'static> DeltaCrdt for YATA<T> {
243    type Delta = YATADelta<T>;
244
245    fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
246        let current = self.version();
247        if since.dominates(&current) {
248            return None;
249        }
250
251        Some(YATADelta {
252            items: self.items.clone(),
253            clock: self.clock,
254        })
255    }
256
257    fn apply_delta(&mut self, delta: &Self::Delta) {
258        self.clock = self.clock.max(delta.clock);
259
260        for delta_item in &delta.items {
261            let exists = self.items.iter().any(|i| i.id == delta_item.id);
262            if !exists {
263                self.items.push(delta_item.clone());
264            } else if let Some(my_item) = self.items.iter_mut().find(|i| i.id == delta_item.id) {
265                if delta_item.deleted {
266                    my_item.deleted = true;
267                }
268            }
269        }
270    }
271
272    fn version(&self) -> VClock {
273        let mut clock = VClock::new();
274        for item in &self.items {
275            let current = clock.get(item.id.replica);
276            if item.id.clock > current {
277                for _ in current..item.id.clock {
278                    clock.increment(item.id.replica);
279                }
280            }
281        }
282        clock
283    }
284}
285
286impl<T: Clone + PartialEq> Default for YATA<T> {
287    fn default() -> Self {
288        Self::new_random()
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    #[test]
297    fn test_yata_append() {
298        let mut seq: YATA<char> = YATA::new(1);
299        seq.append('a');
300        seq.append('b');
301        assert_eq!(seq.to_vec(), vec!['a', 'b']);
302    }
303
304    #[test]
305    fn test_yata_concurrent() {
306        let mut a: YATA<char> = YATA::new(1);
307        let mut b: YATA<char> = YATA::new(2);
308
309        a.append('A');
310        b.append('B');
311
312        a.merge(&b);
313        b.merge(&a);
314
315        assert_eq!(a.to_vec(), b.to_vec());
316    }
317}