use alloc::collections::{BTreeMap, BTreeSet};
use alloc::vec::Vec;
use core::fmt;
use crate::{Crdt, DeltaCrdt, NodeId};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RgaError {
IndexOutOfBounds {
index: usize,
len: usize,
},
}
impl fmt::Display for RgaError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::IndexOutOfBounds { index, len } => {
write!(f, "index {index} out of bounds for length {len}")
}
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for RgaError {}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct RgaNode<T: Clone + Ord> {
pub id: (NodeId, u64),
pub value: T,
pub deleted: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Rga<T: Clone + Ord> {
actor: NodeId,
counter: u64,
elements: Vec<RgaNode<T>>,
version: BTreeMap<NodeId, u64>,
visible_len: usize,
}
impl<T: Clone + Ord> Rga<T> {
pub fn new(actor: NodeId) -> Self {
Self {
actor,
counter: 0,
elements: Vec::new(),
version: BTreeMap::new(),
visible_len: 0,
}
}
pub fn fork(&self, new_actor: NodeId) -> Self {
Self {
actor: new_actor,
counter: self.counter,
elements: self.elements.clone(),
version: self.version.clone(),
visible_len: self.visible_len,
}
}
pub fn insert_at(&mut self, index: usize, value: T) -> Result<(), RgaError> {
if index > self.visible_len {
return Err(RgaError::IndexOutOfBounds {
index,
len: self.visible_len,
});
}
self.counter += 1;
let id = (self.actor, self.counter);
self.version
.entry(self.actor)
.and_modify(|c| *c = (*c).max(self.counter))
.or_insert(self.counter);
let node = RgaNode {
id,
value,
deleted: false,
};
let raw_index = self.raw_index_for_insert(index);
self.elements.insert(raw_index, node);
self.visible_len += 1;
Ok(())
}
pub fn remove(&mut self, index: usize) -> Result<T, RgaError> {
if index >= self.visible_len {
return Err(RgaError::IndexOutOfBounds {
index,
len: self.visible_len,
});
}
let raw = self.visible_to_raw(index);
self.elements[raw].deleted = true;
self.visible_len -= 1;
Ok(self.elements[raw].value.clone())
}
#[must_use]
pub fn get(&self, index: usize) -> Option<&T> {
if index >= self.visible_len {
return None;
}
let raw = self.visible_to_raw(index);
Some(&self.elements[raw].value)
}
#[must_use]
pub fn len(&self) -> usize {
self.visible_len
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.visible_len == 0
}
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
self.elements
.iter()
.filter(|n| !n.deleted)
.map(|n| &n.value)
}
#[must_use]
pub fn actor(&self) -> NodeId {
self.actor
}
#[must_use]
pub fn to_vec(&self) -> Vec<T> {
self.iter().cloned().collect()
}
fn visible_to_raw(&self, visible: usize) -> usize {
let mut seen = 0;
for (raw, node) in self.elements.iter().enumerate() {
if !node.deleted {
if seen == visible {
return raw;
}
seen += 1;
}
}
panic!(
"visible index {} not found (only {} visible elements)",
visible, seen
);
}
fn raw_index_for_insert(&self, visible_index: usize) -> usize {
if visible_index == 0 {
return 0;
}
if visible_index >= self.visible_len {
return self.elements.len();
}
self.visible_to_raw(visible_index)
}
fn find_insert_position(&self, node: &RgaNode<T>, after_raw: Option<usize>) -> usize {
let start = match after_raw {
Some(idx) => idx + 1,
None => 0,
};
let new_key = (node.id.1, node.id.0);
for i in start..self.elements.len() {
let existing = &self.elements[i];
let existing_key = (existing.id.1, existing.id.0);
if existing_key < new_key {
return i;
}
}
self.elements.len()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct RgaDelta<T: Clone + Ord> {
pub new_elements: Vec<RgaNode<T>>,
pub tombstoned_ids: Vec<(NodeId, u64)>,
pub version: BTreeMap<NodeId, u64>,
}
impl<T: Clone + Ord> DeltaCrdt for Rga<T> {
type Delta = RgaDelta<T>;
fn delta(&self, other: &Self) -> RgaDelta<T> {
let new_elements: Vec<_> = self
.elements
.iter()
.filter(|e| {
let actor_max = other.version.get(&e.id.0).copied().unwrap_or(0);
e.id.1 > actor_max
})
.cloned()
.collect();
let tombstoned_ids: Vec<_> = self
.elements
.iter()
.filter(|e| {
e.deleted && {
let actor_max = other.version.get(&e.id.0).copied().unwrap_or(0);
e.id.1 <= actor_max
}
})
.map(|e| e.id)
.collect();
RgaDelta {
new_elements,
tombstoned_ids,
version: self.version.clone(),
}
}
fn apply_delta(&mut self, delta: &RgaDelta<T>) {
let id_index: BTreeMap<(NodeId, u64), usize> = self
.elements
.iter()
.enumerate()
.map(|(i, e)| (e.id, i))
.collect();
for &id in &delta.tombstoned_ids {
if let Some(&raw) = id_index.get(&id) {
if !self.elements[raw].deleted {
self.elements[raw].deleted = true;
self.visible_len -= 1;
}
}
}
let mut known_ids: BTreeSet<(NodeId, u64)> =
self.elements.iter().map(|e| e.id).collect();
for (delta_idx, elem) in delta.new_elements.iter().enumerate() {
if !known_ids.contains(&elem.id) {
let predecessor_raw = if delta_idx == 0 {
None
} else {
(0..delta_idx).rev().find_map(|i| {
self.elements
.iter()
.position(|e| e.id == delta.new_elements[i].id)
})
};
let pos = self.find_insert_position(elem, predecessor_raw);
self.elements.insert(pos, elem.clone());
if !elem.deleted {
self.visible_len += 1;
}
known_ids.insert(elem.id);
}
}
for (&actor, &cnt) in &delta.version {
let entry = self.version.entry(actor).or_insert(0);
*entry = (*entry).max(cnt);
}
if let Some(&max_cnt) = self.version.values().max() {
self.counter = self.counter.max(max_cnt);
}
}
}
impl<T: Clone + Ord> Crdt for Rga<T> {
fn merge(&mut self, other: &Self) {
let id_index: BTreeMap<(NodeId, u64), usize> = self
.elements
.iter()
.enumerate()
.map(|(i, e)| (e.id, i))
.collect();
for other_elem in &other.elements {
if other_elem.deleted {
if let Some(&raw) = id_index.get(&other_elem.id) {
if !self.elements[raw].deleted {
self.elements[raw].deleted = true;
self.visible_len -= 1;
}
}
}
}
let mut known_ids: BTreeSet<(NodeId, u64)> =
self.elements.iter().map(|e| e.id).collect();
for (other_idx, other_elem) in other.elements.iter().enumerate() {
if !known_ids.contains(&other_elem.id) {
let predecessor_raw = if other_idx == 0 {
None
} else {
(0..other_idx).rev().find_map(|i| {
self.elements
.iter()
.position(|e| e.id == other.elements[i].id)
})
};
let pos = self.find_insert_position(other_elem, predecessor_raw);
self.elements.insert(pos, other_elem.clone());
if !other_elem.deleted {
self.visible_len += 1;
}
known_ids.insert(other_elem.id);
}
}
for (&actor, &cnt) in &other.version {
let entry = self.version.entry(actor).or_insert(0);
*entry = (*entry).max(cnt);
}
if let Some(&max_cnt) = self.version.values().max() {
self.counter = self.counter.max(max_cnt);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_rga_is_empty() {
let rga = Rga::<String>::new(1);
assert!(rga.is_empty());
assert_eq!(rga.len(), 0);
assert_eq!(rga.get(0), None);
}
#[test]
fn insert_at_head() {
let mut rga = Rga::new(1);
rga.insert_at(0, 'H').unwrap();
rga.insert_at(1, 'i').unwrap();
assert_eq!(rga.len(), 2);
assert_eq!(rga.get(0), Some(&'H'));
assert_eq!(rga.get(1), Some(&'i'));
}
#[test]
fn insert_at_middle() {
let mut rga = Rga::new(1);
rga.insert_at(0, 'a').unwrap();
rga.insert_at(1, 'c').unwrap();
rga.insert_at(1, 'b').unwrap();
assert_eq!(rga.to_vec(), vec!['a', 'b', 'c']);
}
#[test]
fn insert_out_of_bounds_returns_error() {
let mut rga = Rga::new(1);
rga.insert_at(0, 'x').unwrap();
let err = rga.insert_at(5, 'y');
assert_eq!(
err,
Err(RgaError::IndexOutOfBounds { index: 5, len: 1 })
);
}
#[test]
fn remove_element() {
let mut rga = Rga::new(1);
rga.insert_at(0, 'a').unwrap();
rga.insert_at(1, 'b').unwrap();
rga.insert_at(2, 'c').unwrap();
let removed = rga.remove(1).unwrap();
assert_eq!(removed, 'b');
assert_eq!(rga.len(), 2);
assert_eq!(rga.to_vec(), vec!['a', 'c']);
}
#[test]
fn merge_disjoint_inserts() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'x').unwrap();
let mut r2 = Rga::new(2);
r2.insert_at(0, 'y').unwrap();
r1.merge(&r2);
assert_eq!(r1.len(), 2);
let v = r1.to_vec();
assert!(v.contains(&'x'));
assert!(v.contains(&'y'));
}
#[test]
fn merge_concurrent_inserts_at_same_position() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'A').unwrap();
let mut r2 = Rga::new(2);
r2.insert_at(0, 'B').unwrap();
let mut r1_copy = r1.clone();
let mut r2_copy = r2.clone();
r1_copy.merge(&r2);
r2_copy.merge(&r1);
assert_eq!(r1_copy.to_vec(), r2_copy.to_vec());
assert_eq!(r1_copy.len(), 2);
}
#[test]
fn merge_concurrent_inserts_after_shared_prefix() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'H').unwrap();
r1.insert_at(1, 'e').unwrap();
let mut r2 = r1.fork(2);
r1.insert_at(2, 'X').unwrap();
r2.insert_at(2, 'Y').unwrap();
let mut r1_merged = r1.clone();
r1_merged.merge(&r2);
let mut r2_merged = r2.clone();
r2_merged.merge(&r1);
assert_eq!(r1_merged.to_vec(), r2_merged.to_vec());
assert_eq!(r1_merged.len(), 4);
assert_eq!(r1_merged.get(0), Some(&'H'));
assert_eq!(r1_merged.get(1), Some(&'e'));
}
#[test]
fn merge_with_deletions() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'a').unwrap();
r1.insert_at(1, 'b').unwrap();
r1.insert_at(2, 'c').unwrap();
let mut r2 = r1.fork(2);
r1.remove(1).unwrap();
r2.insert_at(3, 'd').unwrap();
r1.merge(&r2);
assert!(!r1.to_vec().contains(&'b'));
assert!(r1.to_vec().contains(&'d'));
assert_eq!(r1.len(), 3);
}
#[test]
fn merge_is_commutative() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 1).unwrap();
r1.insert_at(1, 2).unwrap();
let mut r2 = Rga::new(2);
r2.insert_at(0, 3).unwrap();
r2.insert_at(1, 4).unwrap();
let mut left = r1.clone();
left.merge(&r2);
let mut right = r2.clone();
right.merge(&r1);
assert_eq!(left.to_vec(), right.to_vec());
}
#[test]
fn merge_is_idempotent() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'x').unwrap();
r1.insert_at(1, 'y').unwrap();
let mut r2 = Rga::new(2);
r2.insert_at(0, 'z').unwrap();
r1.merge(&r2);
let after_first = r1.clone();
r1.merge(&r2);
assert_eq!(r1, after_first);
}
#[test]
fn delta_apply_equivalent_to_merge() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'H').unwrap();
r1.insert_at(1, 'i').unwrap();
let mut r2 = Rga::new(2);
r2.insert_at(0, '!').unwrap();
let mut via_merge = r2.clone();
via_merge.merge(&r1);
let mut via_delta = r2.clone();
let d = r1.delta(&r2);
via_delta.apply_delta(&d);
assert_eq!(via_merge.to_vec(), via_delta.to_vec());
}
#[test]
fn fork_creates_independent_replica() {
let mut r1 = Rga::new(1);
r1.insert_at(0, 'x').unwrap();
r1.insert_at(1, 'y').unwrap();
let mut r2 = r1.fork(2);
r2.insert_at(2, 'z').unwrap();
assert_eq!(r1.len(), 2);
assert_eq!(r2.len(), 3);
r1.merge(&r2);
assert_eq!(r1.to_vec(), vec!['x', 'y', 'z']);
}
}