use crate::{
error::Error,
id::{OpId, ReplicaId},
ost::OrderTree,
version::VersionVector,
};
use smallvec::SmallVec;
use std::collections::HashMap;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Side {
Left,
Right,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ListOp<T> {
Insert {
id: OpId,
parent: Option<OpId>,
side: Side,
value: T,
},
Delete {
id: OpId,
target: OpId,
},
}
impl<T> ListOp<T> {
#[must_use]
pub fn id(&self) -> OpId {
match self {
ListOp::Insert { id, .. } | ListOp::Delete { id, .. } => *id,
}
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
struct Item<T> {
id: OpId,
parent: Option<OpId>,
side: Side,
value: T,
deletes: SmallVec<[OpId; 1]>,
left_children: Vec<OpId>,
right_children: Vec<OpId>,
#[cfg_attr(feature = "serde", serde(default))]
prev_doc: Option<OpId>,
#[cfg_attr(feature = "serde", serde(default))]
next_doc: Option<OpId>,
}
impl<T> Item<T> {
#[inline]
fn is_deleted(&self) -> bool {
!self.deletes.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct List<T: Clone> {
replica: ReplicaId,
clock: u64,
items: HashMap<OpId, Item<T>>,
root_left_children: Vec<OpId>,
root_right_children: Vec<OpId>,
version: VersionVector,
log: Vec<ListOp<T>>,
index: OrderTree<OpId>,
}
#[cfg(feature = "serde")]
#[derive(Serialize, Deserialize)]
struct ListSnapshot<T: Clone> {
replica: ReplicaId,
clock: u64,
items: Vec<Item<T>>,
root_left_children: Vec<OpId>,
root_right_children: Vec<OpId>,
version: VersionVector,
log: Vec<ListOp<T>>,
}
#[cfg(feature = "serde")]
impl<T: Clone + Serialize> Serialize for List<T> {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
let mut items: Vec<Item<T>> = self.items.values().cloned().collect();
items.sort_by_key(|i| i.id);
let snap = ListSnapshot {
replica: self.replica,
clock: self.clock,
items,
root_left_children: self.root_left_children.clone(),
root_right_children: self.root_right_children.clone(),
version: self.version.clone(),
log: self.log.clone(),
};
snap.serialize(ser)
}
}
#[cfg(feature = "serde")]
impl<'de, T: Clone + Deserialize<'de>> Deserialize<'de> for List<T> {
fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
let snap = ListSnapshot::<T>::deserialize(de)?;
let mut list = List {
replica: snap.replica,
clock: snap.clock,
items: snap.items.into_iter().map(|i| (i.id, i)).collect(),
root_left_children: snap.root_left_children,
root_right_children: snap.root_right_children,
version: snap.version,
log: snap.log,
index: OrderTree::new(),
};
list.rebuild_index();
Ok(list)
}
}
impl<T: Clone> List<T> {
#[must_use]
pub fn new(replica: ReplicaId) -> Self {
Self {
replica,
clock: 0,
items: HashMap::new(),
root_left_children: Vec::new(),
root_right_children: Vec::new(),
version: VersionVector::new(),
log: Vec::new(),
index: OrderTree::new(),
}
}
#[must_use]
pub fn new_random() -> Self {
Self::new(crate::id::new_replica_id())
}
#[must_use]
pub fn replica_id(&self) -> ReplicaId {
self.replica
}
#[must_use]
pub fn len(&self) -> usize {
self.index.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
self.index
.iter_visible()
.map(move |id| &self.items[id].value)
}
pub fn to_vec(&self) -> Vec<T> {
self.iter().cloned().collect()
}
pub fn get(&self, pos: usize) -> Option<&T> {
self.index.at_visible(pos).map(|id| &self.items[id].value)
}
#[must_use]
pub fn id_at(&self, pos: usize) -> Option<OpId> {
self.index.at_visible(pos).copied()
}
#[must_use]
pub fn op_ids(&self) -> Vec<OpId> {
self.index.iter_visible().copied().collect()
}
#[must_use]
pub fn position_of(&self, id: OpId) -> Option<usize> {
self.index.visible_position_of(&id)
}
#[must_use]
pub fn phantom_position_of(&self, id: OpId) -> Option<usize> {
self.index.phantom_visible_position_of(&id)
}
pub fn gc(&mut self, frontier: &VersionVector) -> usize {
let eligible: Vec<OpId> = self
.items
.iter()
.filter(|(_, item)| {
item.is_deleted()
&& item.left_children.is_empty()
&& item.right_children.is_empty()
&& frontier.contains(item.id)
&& item.deletes.iter().all(|d| frontier.contains(*d))
})
.map(|(id, _)| *id)
.collect();
let removed = eligible.len();
for id in eligible {
self.remove_item(id);
}
removed
}
fn remove_item(&mut self, id: OpId) {
let (parent, side, prev, next) = {
let item = self.items.get(&id).expect("remove_item: missing");
(item.parent, item.side, item.prev_doc, item.next_doc)
};
let target_list = match (parent, side) {
(None, Side::Left) => &mut self.root_left_children,
(None, Side::Right) => &mut self.root_right_children,
(Some(p), Side::Left) => {
&mut self
.items
.get_mut(&p)
.expect("remove_item: parent missing")
.left_children
}
(Some(p), Side::Right) => {
&mut self
.items
.get_mut(&p)
.expect("remove_item: parent missing")
.right_children
}
};
target_list.retain(|&x| x != id);
self.index.remove(&id);
if let Some(p) = prev {
self.items
.get_mut(&p)
.expect("remove_item: prev missing")
.next_doc = next;
}
if let Some(n) = next {
self.items
.get_mut(&n)
.expect("remove_item: next missing")
.prev_doc = prev;
}
self.items.remove(&id);
}
pub fn compact_log(&mut self, frontier: &VersionVector) -> usize {
let before = self.log.len();
self.log.retain(|op| !frontier.contains(op.id()));
before - self.log.len()
}
pub fn apply_inverse(&mut self, op: &ListOp<T>) -> Option<ListOp<T>> {
match op {
ListOp::Insert { id, .. } => {
if !self.items.contains_key(id) {
return None;
}
self.clock = self
.clock
.checked_add(1)
.expect("Lamport clock overflow (>2^64 ops)");
let inverse_id = OpId::new(self.clock, self.replica);
let inv = ListOp::Delete {
id: inverse_id,
target: *id,
};
let target_item = self.items.get_mut(id).expect("checked above");
let was_visible = target_item.deletes.is_empty();
target_item.deletes.push(inverse_id);
if was_visible {
self.index.set_visible(id, false);
}
self.version.observe(inverse_id);
self.log.push(inv.clone());
Some(inv)
}
ListOp::Delete { target, .. } => {
let value = self.items.get(target)?.value.clone();
self.clock = self
.clock
.checked_add(1)
.expect("Lamport clock overflow (>2^64 ops)");
let new_id = OpId::new(self.clock, self.replica);
let new_op = ListOp::Insert {
id: new_id,
parent: Some(*target),
side: Side::Left,
value: value.clone(),
};
let item = Item {
id: new_id,
parent: Some(*target),
side: Side::Left,
value,
deletes: SmallVec::new(),
left_children: Vec::new(),
right_children: Vec::new(),
prev_doc: None,
next_doc: None,
};
self.apply_insert_internal(item);
let total_pos = self.compute_total_position_for(new_id);
self.index.insert_at_total(total_pos, new_id, true);
self.version.observe(new_id);
self.log.push(new_op.clone());
Some(new_op)
}
}
}
#[must_use]
pub fn phantom_positions(&self) -> HashMap<OpId, usize> {
let mut map = HashMap::with_capacity(self.items.len());
let mut visible_count = 0usize;
let mut stack: Vec<TraverseFrame> = Vec::with_capacity(64);
for &child in self.root_right_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
for &child in self.root_left_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
while let Some(frame) = stack.pop() {
match frame {
TraverseFrame::EnterNode(id) => {
let item = &self.items[&id];
stack.push(TraverseFrame::EmitAndRight(id));
for &child in item.left_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
}
TraverseFrame::EmitAndRight(id) => {
let item = &self.items[&id];
map.insert(id, visible_count);
if !item.is_deleted() {
visible_count += 1;
}
for &child in item.right_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
}
}
}
map
}
pub fn next_op_id(&mut self) -> OpId {
self.clock = self
.clock
.checked_add(1)
.expect("Lamport clock overflow (>2^64 ops)");
OpId::new(self.clock, self.replica)
}
pub fn observe_external(&mut self, id: OpId) {
self.clock = self.clock.max(id.counter);
}
#[must_use]
pub fn contains_id(&self, id: OpId) -> bool {
self.items.contains_key(&id)
}
#[must_use]
pub fn is_visible(&self, id: OpId) -> Option<bool> {
self.items.get(&id).map(|i| !i.is_deleted())
}
pub fn insert(&mut self, pos: usize, value: T) -> ListOp<T> {
self.try_insert(pos, value)
.unwrap_or_else(|e| panic!("List::insert: {e}"))
}
pub fn try_insert(&mut self, pos: usize, value: T) -> Result<ListOp<T>, Error> {
let len = self.index.len();
if pos > len {
return Err(Error::OutOfBounds { pos, len });
}
let (parent, side) = self.determine_anchor_at(pos);
self.clock = self
.clock
.checked_add(1)
.expect("Lamport clock overflow (>2^64 ops)");
let id = OpId::new(self.clock, self.replica);
let op = ListOp::Insert {
id,
parent,
side,
value: value.clone(),
};
let item = Item {
id,
parent,
side,
value,
deletes: SmallVec::new(),
left_children: Vec::new(),
right_children: Vec::new(),
prev_doc: None,
next_doc: None,
};
self.apply_insert_internal(item);
let _ = pos;
let total_pos = self.compute_total_position_for(id);
self.index.insert_at_total(total_pos, id, true);
self.version.observe(id);
self.log.push(op.clone());
Ok(op)
}
pub fn delete(&mut self, pos: usize) -> ListOp<T> {
self.try_delete(pos)
.unwrap_or_else(|e| panic!("List::delete: {e}"))
}
pub fn try_delete(&mut self, pos: usize) -> Result<ListOp<T>, Error> {
let len = self.index.len();
if pos >= len {
return Err(Error::OutOfBounds { pos, len });
}
let target = *self.index.at_visible(pos).expect("pos < len");
self.clock = self
.clock
.checked_add(1)
.expect("Lamport clock overflow (>2^64 ops)");
let id = OpId::new(self.clock, self.replica);
let item = self
.items
.get_mut(&target)
.expect("visible id missing from items");
item.deletes.push(id);
self.index.set_visible(&target, false);
let op = ListOp::Delete { id, target };
self.version.observe(id);
self.log.push(op.clone());
Ok(op)
}
pub fn apply(&mut self, op: ListOp<T>) -> Result<(), Error> {
let op_id = op.id();
if self.version.contains(op_id) {
return Ok(());
}
match &op {
ListOp::Insert {
id,
parent,
side,
value,
} => {
if let Some(p) = parent {
if !self.items.contains_key(p) {
return Err(Error::MissingParent {
op: *id,
missing: *p,
});
}
}
let item = Item {
id: *id,
parent: *parent,
side: *side,
value: value.clone(),
deletes: SmallVec::new(),
left_children: Vec::new(),
right_children: Vec::new(),
prev_doc: None,
next_doc: None,
};
self.apply_insert_internal(item);
let total_pos = self.compute_total_position_for(*id);
self.index.insert_at_total(total_pos, *id, true);
}
ListOp::Delete { id: _, target } => {
let Some(item) = self.items.get_mut(target) else {
return Err(Error::UnknownTarget {
op: op_id,
target: *target,
});
};
let was_visible = item.deletes.is_empty();
if !item.deletes.contains(&op_id) {
item.deletes.push(op_id);
}
if was_visible {
self.index.set_visible(target, false);
}
}
}
self.version.observe(op_id);
self.clock = self.clock.max(op_id.counter);
self.log.push(op);
Ok(())
}
pub fn merge(&mut self, other: &Self) {
let mut to_apply: Vec<&ListOp<T>> = other
.log
.iter()
.filter(|op| !self.version.contains(op.id()))
.collect();
to_apply.sort_by_key(|op| op.id());
for op in to_apply {
self.apply(op.clone())
.expect("corrupt op log in merge source");
}
}
#[must_use]
pub fn ops(&self) -> &[ListOp<T>] {
&self.log
}
pub fn ops_since<'a>(
&'a self,
since: &'a VersionVector,
) -> impl Iterator<Item = &'a ListOp<T>> + 'a {
self.log.iter().filter(move |op| !since.contains(op.id()))
}
#[must_use]
pub fn version(&self) -> &VersionVector {
&self.version
}
fn determine_anchor_at(&self, pos: usize) -> (Option<OpId>, Side) {
let len = self.index.len();
if len == 0 {
return (None, Side::Right);
}
if pos == 0 {
return (Some(*self.index.at_visible(0).unwrap()), Side::Left);
}
let pred = *self.index.at_visible(pos - 1).unwrap();
let pred_has_right = !self.items[&pred].right_children.is_empty();
if pred_has_right && pos < len {
(Some(*self.index.at_visible(pos).unwrap()), Side::Left)
} else {
(Some(pred), Side::Right)
}
}
fn compute_total_position_for(&self, id: OpId) -> usize {
match self.items[&id].prev_doc {
Some(pred) => {
self.index
.total_position_of(&pred)
.expect("prev_doc target must be in OST")
+ 1
}
None => 0,
}
}
#[allow(dead_code)]
fn rightmost_visible_in_subtree(&self, root: OpId) -> Option<OpId> {
enum Action {
EnterSubtree(OpId),
EmitSelf(OpId),
}
let mut stack = vec![Action::EnterSubtree(root)];
while let Some(action) = stack.pop() {
match action {
Action::EnterSubtree(id) => {
let item = &self.items[&id];
for &child in &item.left_children {
stack.push(Action::EnterSubtree(child));
}
stack.push(Action::EmitSelf(id));
for &child in &item.right_children {
stack.push(Action::EnterSubtree(child));
}
}
Action::EmitSelf(id) => {
if !self.items[&id].is_deleted() {
return Some(id);
}
}
}
}
None
}
fn apply_insert_internal(&mut self, item: Item<T>) {
let id = item.id;
let parent = item.parent;
let side = item.side;
self.items.insert(id, item);
let target_list = match (parent, side) {
(None, Side::Left) => &mut self.root_left_children,
(None, Side::Right) => &mut self.root_right_children,
(Some(parent_id), Side::Left) => {
&mut self
.items
.get_mut(&parent_id)
.expect("apply_insert_internal: parent missing")
.left_children
}
(Some(parent_id), Side::Right) => {
&mut self
.items
.get_mut(&parent_id)
.expect("apply_insert_internal: parent missing")
.right_children
}
};
let pos = target_list.binary_search(&id).unwrap_or_else(|e| e);
target_list.insert(pos, id);
self.link_doc_pointers(id);
}
fn link_doc_pointers(&mut self, c_id: OpId) {
let (prev, next) = self.find_doc_neighbors(c_id);
let c_item = self
.items
.get_mut(&c_id)
.expect("link_doc_pointers: c missing");
c_item.prev_doc = prev;
c_item.next_doc = next;
if let Some(p) = prev {
self.items
.get_mut(&p)
.expect("prev_doc target missing")
.next_doc = Some(c_id);
}
if let Some(n) = next {
self.items
.get_mut(&n)
.expect("next_doc target missing")
.prev_doc = Some(c_id);
}
}
fn children_slice(&self, parent: Option<OpId>, side: Side) -> &[OpId] {
match (parent, side) {
(None, Side::Left) => &self.root_left_children,
(None, Side::Right) => &self.root_right_children,
(Some(p), Side::Left) => &self.items[&p].left_children,
(Some(p), Side::Right) => &self.items[&p].right_children,
}
}
fn subtree_first(&self, mut id: OpId) -> OpId {
loop {
match self.items[&id].left_children.first() {
Some(&first) => id = first,
None => return id,
}
}
}
fn subtree_last(&self, mut id: OpId) -> OpId {
loop {
match self.items[&id].right_children.last() {
Some(&last) => id = last,
None => return id,
}
}
}
fn find_doc_neighbors(&self, c_id: OpId) -> (Option<OpId>, Option<OpId>) {
let item = &self.items[&c_id];
let parent = item.parent;
let side = item.side;
let siblings = self.children_slice(parent, side);
let k = siblings
.iter()
.position(|&x| x == c_id)
.expect("c just inserted into siblings");
let n = siblings.len();
let prev = if k + 1 < n {
let succ_first = self.subtree_first(siblings[k + 1]);
self.items[&succ_first].prev_doc
} else if k > 0 {
Some(self.subtree_last(siblings[k - 1]))
} else {
match side {
Side::Right => match parent {
Some(p) => Some(p),
None => self
.root_left_children
.last()
.map(|&id| self.subtree_last(id)),
},
Side::Left => match parent {
Some(p) => self.items[&p].prev_doc,
None => None,
},
}
};
let next = if k + 1 < n {
Some(self.subtree_first(siblings[k + 1]))
} else if k > 0 {
let old_last = self.subtree_last(siblings[k - 1]);
self.items[&old_last].next_doc
} else {
match side {
Side::Left => match parent {
Some(p) => Some(p),
None => self
.root_right_children
.first()
.map(|&id| self.subtree_first(id)),
},
Side::Right => match parent {
Some(p) => self.items[&p].next_doc,
None => None,
},
}
};
(prev, next)
}
fn rebuild_index(&mut self) {
self.index = OrderTree::new();
let mut stack: Vec<TraverseFrame> = Vec::with_capacity(64);
for &child in self.root_right_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
for &child in self.root_left_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
let mut total_rank = 0usize;
while let Some(frame) = stack.pop() {
match frame {
TraverseFrame::EnterNode(id) => {
let item = &self.items[&id];
stack.push(TraverseFrame::EmitAndRight(id));
for &child in item.left_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
}
TraverseFrame::EmitAndRight(id) => {
let visible = !self.items[&id].is_deleted();
self.index.insert_at_total(total_rank, id, visible);
total_rank += 1;
let item = &self.items[&id];
for &child in item.right_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
}
}
}
}
#[allow(dead_code)]
fn compute_visible_ids(&self) -> Vec<OpId> {
let mut out = Vec::with_capacity(self.items.len());
let mut stack: Vec<TraverseFrame> = Vec::with_capacity(64);
for &child in self.root_right_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
for &child in self.root_left_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
while let Some(frame) = stack.pop() {
match frame {
TraverseFrame::EnterNode(id) => {
let item = &self.items[&id];
stack.push(TraverseFrame::EmitAndRight(id));
for &child in item.left_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
}
TraverseFrame::EmitAndRight(id) => {
let item = &self.items[&id];
if !item.is_deleted() {
out.push(id);
}
for &child in item.right_children.iter().rev() {
stack.push(TraverseFrame::EnterNode(child));
}
}
}
}
out
}
#[cfg(test)]
pub(crate) fn check_invariants(&self) {
for item in self.items.values() {
if let Some(parent) = item.parent {
assert!(
self.items.contains_key(&parent),
"item {:?} references missing parent {:?}",
item.id,
parent
);
}
for window in item.left_children.windows(2) {
assert!(window[0] < window[1], "left_children not sorted");
}
for window in item.right_children.windows(2) {
assert!(window[0] < window[1], "right_children not sorted");
}
}
for window in self.root_left_children.windows(2) {
assert!(window[0] < window[1], "root_left_children not sorted");
}
for window in self.root_right_children.windows(2) {
assert!(window[0] < window[1], "root_right_children not sorted");
}
for op in &self.log {
assert!(
self.version.contains(op.id()),
"version vector missing logged op {:?}",
op.id()
);
}
}
}
impl<T: Clone> Default for List<T> {
fn default() -> Self {
Self::new(0)
}
}
enum TraverseFrame {
EnterNode(OpId),
EmitAndRight(OpId),
}
impl std::fmt::Display for List<char> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for c in self.iter() {
f.write_str(c.encode_utf8(&mut [0u8; 4]))?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty() {
let list = List::<char>::new(1);
assert!(list.is_empty());
assert_eq!(list.len(), 0);
assert_eq!(list.to_vec(), Vec::<char>::new());
}
#[test]
fn single_insert() {
let mut list = List::<char>::new(1);
list.insert(0, 'a');
assert_eq!(list.to_vec(), vec!['a']);
list.check_invariants();
}
#[test]
fn append() {
let mut list = List::<char>::new(1);
for (i, c) in "Hello".chars().enumerate() {
list.insert(i, c);
}
assert_eq!(list.to_string(), "Hello");
list.check_invariants();
}
#[test]
fn insert_at_beginning() {
let mut list = List::<char>::new(1);
list.insert(0, 'b');
list.insert(0, 'a');
assert_eq!(list.to_string(), "ab");
list.check_invariants();
}
#[test]
fn insert_in_middle() {
let mut list = List::<char>::new(1);
list.insert(0, 'a');
list.insert(1, 'c');
list.insert(1, 'b');
assert_eq!(list.to_string(), "abc");
list.check_invariants();
}
#[test]
fn delete_works() {
let mut list = List::<char>::new(1);
for (i, c) in "Hello".chars().enumerate() {
list.insert(i, c);
}
list.delete(0);
assert_eq!(list.to_string(), "ello");
list.delete(3);
assert_eq!(list.to_string(), "ell");
list.check_invariants();
}
#[test]
fn insert_after_delete_at_end() {
let mut list = List::<char>::new(1);
list.insert(0, 'a');
list.insert(1, 'b');
list.insert(2, 'c');
list.delete(2);
assert_eq!(list.to_string(), "ab");
list.insert(2, 'X');
assert_eq!(list.to_string(), "abX");
list.check_invariants();
}
}