1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::vec::Vec;
3use core::fmt;
4
5use crate::{Crdt, DeltaCrdt, NodeId};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum RgaError {
10 IndexOutOfBounds {
12 index: usize,
14 len: usize,
16 },
17}
18
19impl fmt::Display for RgaError {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 match self {
22 Self::IndexOutOfBounds { index, len } => {
23 write!(f, "index {index} out of bounds for length {len}")
24 }
25 }
26 }
27}
28
29#[cfg(feature = "std")]
30impl std::error::Error for RgaError {}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
35pub struct RgaNode<T: Clone + Ord> {
36 pub id: (NodeId, u64),
38 pub value: T,
40 pub deleted: bool,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
73#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
74pub struct Rga<T: Clone + Ord> {
75 actor: NodeId,
76 counter: u64,
77 elements: Vec<RgaNode<T>>,
79 version: BTreeMap<NodeId, u64>,
81 visible_len: usize,
83}
84
85impl<T: Clone + Ord> Rga<T> {
86 pub fn new(actor: NodeId) -> Self {
88 Self {
89 actor,
90 counter: 0,
91 elements: Vec::new(),
92 version: BTreeMap::new(),
93 visible_len: 0,
94 }
95 }
96
97 pub fn fork(&self, new_actor: NodeId) -> Self {
99 Self {
100 actor: new_actor,
101 counter: self.counter,
102 elements: self.elements.clone(),
103 version: self.version.clone(),
104 visible_len: self.visible_len,
105 }
106 }
107
108 pub fn insert_at(&mut self, index: usize, value: T) -> Result<(), RgaError> {
110 if index > self.visible_len {
111 return Err(RgaError::IndexOutOfBounds {
112 index,
113 len: self.visible_len,
114 });
115 }
116
117 self.counter += 1;
118 let id = (self.actor, self.counter);
119 self.version
120 .entry(self.actor)
121 .and_modify(|c| *c = (*c).max(self.counter))
122 .or_insert(self.counter);
123
124 let node = RgaNode {
125 id,
126 value,
127 deleted: false,
128 };
129
130 let raw_index = self.raw_index_for_insert(index);
131 self.elements.insert(raw_index, node);
132 self.visible_len += 1;
133 Ok(())
134 }
135
136 pub fn remove(&mut self, index: usize) -> Result<T, RgaError> {
138 if index >= self.visible_len {
139 return Err(RgaError::IndexOutOfBounds {
140 index,
141 len: self.visible_len,
142 });
143 }
144 let raw = self.visible_to_raw(index);
145 self.elements[raw].deleted = true;
146 self.visible_len -= 1;
147 Ok(self.elements[raw].value.clone())
148 }
149
150 #[must_use]
152 pub fn get(&self, index: usize) -> Option<&T> {
153 if index >= self.visible_len {
154 return None;
155 }
156 let raw = self.visible_to_raw(index);
157 Some(&self.elements[raw].value)
158 }
159
160 #[must_use]
162 pub fn len(&self) -> usize {
163 self.visible_len
164 }
165
166 #[must_use]
168 pub fn is_empty(&self) -> bool {
169 self.visible_len == 0
170 }
171
172 pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
174 self.elements
175 .iter()
176 .filter(|n| !n.deleted)
177 .map(|n| &n.value)
178 }
179
180 #[must_use]
182 pub fn actor(&self) -> NodeId {
183 self.actor
184 }
185
186 #[must_use]
188 pub fn to_vec(&self) -> Vec<T> {
189 self.iter().cloned().collect()
190 }
191
192 fn visible_to_raw(&self, visible: usize) -> usize {
195 let mut seen = 0;
196 for (raw, node) in self.elements.iter().enumerate() {
197 if !node.deleted {
198 if seen == visible {
199 return raw;
200 }
201 seen += 1;
202 }
203 }
204 panic!(
205 "visible index {} not found (only {} visible elements)",
206 visible, seen
207 );
208 }
209
210 fn raw_index_for_insert(&self, visible_index: usize) -> usize {
211 if visible_index == 0 {
212 return 0;
213 }
214 if visible_index >= self.visible_len {
215 return self.elements.len();
216 }
217 self.visible_to_raw(visible_index)
218 }
219
220 fn find_insert_position(&self, node: &RgaNode<T>, after_raw: Option<usize>) -> usize {
221 let start = match after_raw {
222 Some(idx) => idx + 1,
223 None => 0,
224 };
225
226 let new_key = (node.id.1, node.id.0); for i in start..self.elements.len() {
229 let existing = &self.elements[i];
230 let existing_key = (existing.id.1, existing.id.0);
231 if existing_key < new_key {
232 return i;
233 }
234 }
235
236 self.elements.len()
237 }
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
242#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
243pub struct RgaDelta<T: Clone + Ord> {
244 pub new_elements: Vec<RgaNode<T>>,
246 pub tombstoned_ids: Vec<(NodeId, u64)>,
248 pub version: BTreeMap<NodeId, u64>,
250}
251
252impl<T: Clone + Ord> DeltaCrdt for Rga<T> {
253 type Delta = RgaDelta<T>;
254
255 fn delta(&self, other: &Self) -> RgaDelta<T> {
256 let new_elements: Vec<_> = self
257 .elements
258 .iter()
259 .filter(|e| {
260 let actor_max = other.version.get(&e.id.0).copied().unwrap_or(0);
261 e.id.1 > actor_max
262 })
263 .cloned()
264 .collect();
265
266 let tombstoned_ids: Vec<_> = self
267 .elements
268 .iter()
269 .filter(|e| {
270 e.deleted && {
271 let actor_max = other.version.get(&e.id.0).copied().unwrap_or(0);
272 e.id.1 <= actor_max
273 }
274 })
275 .map(|e| e.id)
276 .collect();
277
278 RgaDelta {
279 new_elements,
280 tombstoned_ids,
281 version: self.version.clone(),
282 }
283 }
284
285 fn apply_delta(&mut self, delta: &RgaDelta<T>) {
286 let id_index: BTreeMap<(NodeId, u64), usize> = self
288 .elements
289 .iter()
290 .enumerate()
291 .map(|(i, e)| (e.id, i))
292 .collect();
293
294 for &id in &delta.tombstoned_ids {
295 if let Some(&raw) = id_index.get(&id) {
296 if !self.elements[raw].deleted {
297 self.elements[raw].deleted = true;
298 self.visible_len -= 1;
299 }
300 }
301 }
302
303 let mut known_ids: BTreeSet<(NodeId, u64)> =
307 self.elements.iter().map(|e| e.id).collect();
308
309 for (delta_idx, elem) in delta.new_elements.iter().enumerate() {
310 if !known_ids.contains(&elem.id) {
311 let predecessor_raw = if delta_idx == 0 {
312 None
313 } else {
314 (0..delta_idx).rev().find_map(|i| {
315 self.elements
316 .iter()
317 .position(|e| e.id == delta.new_elements[i].id)
318 })
319 };
320
321 let pos = self.find_insert_position(elem, predecessor_raw);
322 self.elements.insert(pos, elem.clone());
323 if !elem.deleted {
324 self.visible_len += 1;
325 }
326 known_ids.insert(elem.id);
327 }
328 }
329
330 for (&actor, &cnt) in &delta.version {
331 let entry = self.version.entry(actor).or_insert(0);
332 *entry = (*entry).max(cnt);
333 }
334
335 if let Some(&max_cnt) = self.version.values().max() {
336 self.counter = self.counter.max(max_cnt);
337 }
338 }
339}
340
341impl<T: Clone + Ord> Crdt for Rga<T> {
342 fn merge(&mut self, other: &Self) {
343 let id_index: BTreeMap<(NodeId, u64), usize> = self
345 .elements
346 .iter()
347 .enumerate()
348 .map(|(i, e)| (e.id, i))
349 .collect();
350
351 for other_elem in &other.elements {
352 if other_elem.deleted {
353 if let Some(&raw) = id_index.get(&other_elem.id) {
354 if !self.elements[raw].deleted {
355 self.elements[raw].deleted = true;
356 self.visible_len -= 1;
357 }
358 }
359 }
360 }
361
362 let mut known_ids: BTreeSet<(NodeId, u64)> =
366 self.elements.iter().map(|e| e.id).collect();
367
368 for (other_idx, other_elem) in other.elements.iter().enumerate() {
369 if !known_ids.contains(&other_elem.id) {
370 let predecessor_raw = if other_idx == 0 {
371 None
372 } else {
373 (0..other_idx).rev().find_map(|i| {
374 self.elements
375 .iter()
376 .position(|e| e.id == other.elements[i].id)
377 })
378 };
379
380 let pos = self.find_insert_position(other_elem, predecessor_raw);
381 self.elements.insert(pos, other_elem.clone());
382 if !other_elem.deleted {
383 self.visible_len += 1;
384 }
385 known_ids.insert(other_elem.id);
386 }
387 }
388
389 for (&actor, &cnt) in &other.version {
390 let entry = self.version.entry(actor).or_insert(0);
391 *entry = (*entry).max(cnt);
392 }
393
394 if let Some(&max_cnt) = self.version.values().max() {
395 self.counter = self.counter.max(max_cnt);
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn new_rga_is_empty() {
406 let rga = Rga::<String>::new(1);
407 assert!(rga.is_empty());
408 assert_eq!(rga.len(), 0);
409 assert_eq!(rga.get(0), None);
410 }
411
412 #[test]
413 fn insert_at_head() {
414 let mut rga = Rga::new(1);
415 rga.insert_at(0, 'H').unwrap();
416 rga.insert_at(1, 'i').unwrap();
417 assert_eq!(rga.len(), 2);
418 assert_eq!(rga.get(0), Some(&'H'));
419 assert_eq!(rga.get(1), Some(&'i'));
420 }
421
422 #[test]
423 fn insert_at_middle() {
424 let mut rga = Rga::new(1);
425 rga.insert_at(0, 'a').unwrap();
426 rga.insert_at(1, 'c').unwrap();
427 rga.insert_at(1, 'b').unwrap();
428 assert_eq!(rga.to_vec(), vec!['a', 'b', 'c']);
429 }
430
431 #[test]
432 fn insert_out_of_bounds_returns_error() {
433 let mut rga = Rga::new(1);
434 rga.insert_at(0, 'x').unwrap();
435 let err = rga.insert_at(5, 'y');
436 assert_eq!(
437 err,
438 Err(RgaError::IndexOutOfBounds { index: 5, len: 1 })
439 );
440 }
441
442 #[test]
443 fn remove_element() {
444 let mut rga = Rga::new(1);
445 rga.insert_at(0, 'a').unwrap();
446 rga.insert_at(1, 'b').unwrap();
447 rga.insert_at(2, 'c').unwrap();
448
449 let removed = rga.remove(1).unwrap();
450 assert_eq!(removed, 'b');
451 assert_eq!(rga.len(), 2);
452 assert_eq!(rga.to_vec(), vec!['a', 'c']);
453 }
454
455 #[test]
456 fn merge_disjoint_inserts() {
457 let mut r1 = Rga::new(1);
458 r1.insert_at(0, 'x').unwrap();
459
460 let mut r2 = Rga::new(2);
461 r2.insert_at(0, 'y').unwrap();
462
463 r1.merge(&r2);
464 assert_eq!(r1.len(), 2);
465 let v = r1.to_vec();
466 assert!(v.contains(&'x'));
467 assert!(v.contains(&'y'));
468 }
469
470 #[test]
471 fn merge_concurrent_inserts_at_same_position() {
472 let mut r1 = Rga::new(1);
473 r1.insert_at(0, 'A').unwrap();
474
475 let mut r2 = Rga::new(2);
476 r2.insert_at(0, 'B').unwrap();
477
478 let mut r1_copy = r1.clone();
479 let mut r2_copy = r2.clone();
480
481 r1_copy.merge(&r2);
482 r2_copy.merge(&r1);
483
484 assert_eq!(r1_copy.to_vec(), r2_copy.to_vec());
485 assert_eq!(r1_copy.len(), 2);
486 }
487
488 #[test]
489 fn merge_concurrent_inserts_after_shared_prefix() {
490 let mut r1 = Rga::new(1);
491 r1.insert_at(0, 'H').unwrap();
492 r1.insert_at(1, 'e').unwrap();
493
494 let mut r2 = r1.fork(2);
495
496 r1.insert_at(2, 'X').unwrap();
497 r2.insert_at(2, 'Y').unwrap();
498
499 let mut r1_merged = r1.clone();
500 r1_merged.merge(&r2);
501
502 let mut r2_merged = r2.clone();
503 r2_merged.merge(&r1);
504
505 assert_eq!(r1_merged.to_vec(), r2_merged.to_vec());
506 assert_eq!(r1_merged.len(), 4);
507 assert_eq!(r1_merged.get(0), Some(&'H'));
508 assert_eq!(r1_merged.get(1), Some(&'e'));
509 }
510
511 #[test]
512 fn merge_with_deletions() {
513 let mut r1 = Rga::new(1);
514 r1.insert_at(0, 'a').unwrap();
515 r1.insert_at(1, 'b').unwrap();
516 r1.insert_at(2, 'c').unwrap();
517
518 let mut r2 = r1.fork(2);
519
520 r1.remove(1).unwrap();
521 r2.insert_at(3, 'd').unwrap();
522
523 r1.merge(&r2);
524 assert!(!r1.to_vec().contains(&'b'));
525 assert!(r1.to_vec().contains(&'d'));
526 assert_eq!(r1.len(), 3);
527 }
528
529 #[test]
530 fn merge_is_commutative() {
531 let mut r1 = Rga::new(1);
532 r1.insert_at(0, 1).unwrap();
533 r1.insert_at(1, 2).unwrap();
534
535 let mut r2 = Rga::new(2);
536 r2.insert_at(0, 3).unwrap();
537 r2.insert_at(1, 4).unwrap();
538
539 let mut left = r1.clone();
540 left.merge(&r2);
541
542 let mut right = r2.clone();
543 right.merge(&r1);
544
545 assert_eq!(left.to_vec(), right.to_vec());
546 }
547
548 #[test]
549 fn merge_is_idempotent() {
550 let mut r1 = Rga::new(1);
551 r1.insert_at(0, 'x').unwrap();
552 r1.insert_at(1, 'y').unwrap();
553
554 let mut r2 = Rga::new(2);
555 r2.insert_at(0, 'z').unwrap();
556
557 r1.merge(&r2);
558 let after_first = r1.clone();
559 r1.merge(&r2);
560 assert_eq!(r1, after_first);
561 }
562
563 #[test]
564 fn delta_apply_equivalent_to_merge() {
565 let mut r1 = Rga::new(1);
566 r1.insert_at(0, 'H').unwrap();
567 r1.insert_at(1, 'i').unwrap();
568
569 let mut r2 = Rga::new(2);
570 r2.insert_at(0, '!').unwrap();
571
572 let mut via_merge = r2.clone();
573 via_merge.merge(&r1);
574
575 let mut via_delta = r2.clone();
576 let d = r1.delta(&r2);
577 via_delta.apply_delta(&d);
578
579 assert_eq!(via_merge.to_vec(), via_delta.to_vec());
580 }
581
582 #[test]
583 fn fork_creates_independent_replica() {
584 let mut r1 = Rga::new(1);
585 r1.insert_at(0, 'x').unwrap();
586 r1.insert_at(1, 'y').unwrap();
587
588 let mut r2 = r1.fork(2);
589 r2.insert_at(2, 'z').unwrap();
590
591 assert_eq!(r1.len(), 2);
592 assert_eq!(r2.len(), 3);
593
594 r1.merge(&r2);
595 assert_eq!(r1.to_vec(), vec!['x', 'y', 'z']);
596 }
597}