logicaffeine_data/crdt/sequence/
yata.rs1use crate::crdt::causal::VClock;
7use crate::crdt::delta::DeltaCrdt;
8use crate::crdt::replica::{generate_replica_id, ReplicaId};
9use crate::crdt::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use std::cmp::Ordering;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct YATADelta<T> {
17 pub items: Vec<YataItem<T>>,
18 pub clock: u64,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct YataId {
24 pub clock: u64,
26 pub replica: ReplicaId,
28}
29
30impl YataId {
31 fn new(clock: u64, replica: ReplicaId) -> Self {
32 Self { clock, replica }
33 }
34}
35
36impl Ord for YataId {
37 fn cmp(&self, other: &Self) -> Ordering {
38 match self.clock.cmp(&other.clock) {
39 Ordering::Equal => self.replica.cmp(&other.replica),
40 ord => ord,
41 }
42 }
43}
44
45impl PartialOrd for YataId {
46 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47 Some(self.cmp(other))
48 }
49}
50
51#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
53pub struct YataItem<T> {
54 pub id: YataId,
55 pub value: T,
56 pub deleted: bool,
57 pub origin_left: Option<YataId>,
59 pub origin_right: Option<YataId>,
61}
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
68pub struct YATA<T> {
69 items: Vec<YataItem<T>>,
70 clock: u64,
71 replica_id: ReplicaId,
72}
73
74impl<T: Clone + PartialEq> YATA<T> {
75 pub fn new(replica_id: ReplicaId) -> Self {
77 Self {
78 items: Vec::new(),
79 clock: 0,
80 replica_id,
81 }
82 }
83
84 pub fn new_random() -> Self {
86 Self::new(generate_replica_id())
87 }
88
89 pub fn append(&mut self, value: T) {
91 self.clock += 1;
92 let id = YataId::new(self.clock, self.replica_id);
93 let origin_left = self.last_visible_id();
94
95 self.items.push(YataItem {
96 id,
97 value,
98 deleted: false,
99 origin_left,
100 origin_right: None,
101 });
102 }
103
104 pub fn insert_after(&mut self, index: usize, value: T) {
106 let origin_left = self.visible_id_at(index);
107 let origin_right = self.visible_id_at(index + 1);
108
109 self.clock += 1;
110 let id = YataId::new(self.clock, self.replica_id);
111
112 self.items.push(YataItem {
113 id,
114 value,
115 deleted: false,
116 origin_left,
117 origin_right,
118 });
119 }
120
121 pub fn insert_before(&mut self, index: usize, value: T) {
123 if index == 0 {
124 self.clock += 1;
125 let id = YataId::new(self.clock, self.replica_id);
126 let origin_right = self.visible_id_at(0);
127
128 self.items.push(YataItem {
129 id,
130 value,
131 deleted: false,
132 origin_left: None,
133 origin_right,
134 });
135 } else {
136 self.insert_after(index - 1, value);
137 }
138 }
139
140 pub fn remove(&mut self, index: usize) {
142 if let Some(id) = self.visible_id_at(index) {
143 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
144 item.deleted = true;
145 }
146 }
147 }
148
149 pub fn get(&self, index: usize) -> Option<&T> {
151 self.visible_items().nth(index).map(|i| &i.value)
152 }
153
154 pub fn len(&self) -> usize {
156 self.visible_items().count()
157 }
158
159 pub fn is_empty(&self) -> bool {
161 self.len() == 0
162 }
163
164 pub fn to_vec(&self) -> Vec<T> {
166 self.visible_items().map(|i| i.value.clone()).collect()
167 }
168
169 pub fn iter(&self) -> impl Iterator<Item = &T> {
171 self.visible_items().map(|i| &i.value)
172 }
173
174 fn visible_items(&self) -> impl Iterator<Item = &YataItem<T>> {
176 self.sorted_items().into_iter().filter(|i| !i.deleted)
177 }
178
179 fn sorted_items(&self) -> Vec<&YataItem<T>> {
181 let mut result: Vec<&YataItem<T>> = Vec::new();
182
183 let mut to_process: Vec<&YataItem<T>> = self
185 .items
186 .iter()
187 .filter(|i| i.origin_left.is_none())
188 .collect();
189
190 to_process.sort_by(|a, b| a.id.cmp(&b.id));
192
193 while let Some(item) = to_process.pop() {
194 result.push(item);
195
196 let mut followers: Vec<&YataItem<T>> = self
198 .items
199 .iter()
200 .filter(|i| i.origin_left == Some(item.id))
201 .collect();
202
203 followers.sort_by(|a, b| a.id.cmp(&b.id));
205
206 to_process.extend(followers);
207 }
208
209 result
210 }
211
212 fn last_visible_id(&self) -> Option<YataId> {
213 self.sorted_items()
214 .into_iter()
215 .filter(|i| !i.deleted)
216 .last()
217 .map(|i| i.id)
218 }
219
220 fn visible_id_at(&self, index: usize) -> Option<YataId> {
221 self.visible_items().nth(index).map(|i| i.id)
222 }
223}
224
225impl<T: Clone + PartialEq> Merge for YATA<T> {
226 fn merge(&mut self, other: &Self) {
227 self.clock = self.clock.max(other.clock);
228
229 for other_item in &other.items {
230 let exists = self.items.iter().any(|i| i.id == other_item.id);
231 if !exists {
232 self.items.push(other_item.clone());
233 } else if let Some(my_item) = self.items.iter_mut().find(|i| i.id == other_item.id) {
234 if other_item.deleted {
235 my_item.deleted = true;
236 }
237 }
238 }
239 }
240}
241
242impl<T: Clone + PartialEq + Serialize + DeserializeOwned + Send + 'static> DeltaCrdt for YATA<T> {
243 type Delta = YATADelta<T>;
244
245 fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
246 let current = self.version();
247 if since.dominates(¤t) {
248 return None;
249 }
250
251 Some(YATADelta {
252 items: self.items.clone(),
253 clock: self.clock,
254 })
255 }
256
257 fn apply_delta(&mut self, delta: &Self::Delta) {
258 self.clock = self.clock.max(delta.clock);
259
260 for delta_item in &delta.items {
261 let exists = self.items.iter().any(|i| i.id == delta_item.id);
262 if !exists {
263 self.items.push(delta_item.clone());
264 } else if let Some(my_item) = self.items.iter_mut().find(|i| i.id == delta_item.id) {
265 if delta_item.deleted {
266 my_item.deleted = true;
267 }
268 }
269 }
270 }
271
272 fn version(&self) -> VClock {
273 let mut clock = VClock::new();
274 for item in &self.items {
275 let current = clock.get(item.id.replica);
276 if item.id.clock > current {
277 for _ in current..item.id.clock {
278 clock.increment(item.id.replica);
279 }
280 }
281 }
282 clock
283 }
284}
285
286impl<T: Clone + PartialEq> Default for YATA<T> {
287 fn default() -> Self {
288 Self::new_random()
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn test_yata_append() {
298 let mut seq: YATA<char> = YATA::new(1);
299 seq.append('a');
300 seq.append('b');
301 assert_eq!(seq.to_vec(), vec!['a', 'b']);
302 }
303
304 #[test]
305 fn test_yata_concurrent() {
306 let mut a: YATA<char> = YATA::new(1);
307 let mut b: YATA<char> = YATA::new(2);
308
309 a.append('A');
310 b.append('B');
311
312 a.merge(&b);
313 b.merge(&a);
314
315 assert_eq!(a.to_vec(), b.to_vec());
316 }
317}