use crate::vclock::{VClock, VClockDiff};
use num::rational::Ratio;
use num::BigInt;
use serde::{Deserialize, Serialize};
use std::cmp::{Ord, PartialOrd};
use std::collections::BTreeMap;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
pub struct Key<T> {
id: T,
process: u64,
op: u64,
}
impl<T: Clone> Key<T> {
pub fn new<R: Into<T>>(id: R, process: u64, op: u64) -> Self {
Self {
id: id.into(),
process,
op,
}
}
}
pub trait ListKey: Sized + Ord + Eq + Clone {
fn between(
process: u64,
op: u64,
left: Option<&Self>,
right: Option<&Self>,
) -> Result<Self, ListError>;
}
pub trait InsertableKey: ListKey {}
impl ListKey for Key<Ratio<BigInt>> {
fn between(
process: u64,
op: u64,
left: Option<&Self>,
right: Option<&Self>,
) -> Result<Self, ListError> {
let id = match (left, right) {
(None, Some(Key { id, .. })) => id - BigInt::from(1_i64),
(Some(Key { id: id_left, .. }), Some(Key { id: id_right, .. })) => {
(id_left + id_right) / BigInt::from(2_i64)
}
(Some(Key { id, .. }), None) => id + BigInt::from(1_i64),
(None, None) => Ratio::new_raw(BigInt::from(0), BigInt::from(1)),
};
Ok(Key { id, process, op })
}
}
impl ListKey for Key<i64> {
fn between(
process: u64,
op: u64,
left: Option<&Self>,
right: Option<&Self>,
) -> Result<Self, ListError> {
let id: i64 = match (left, right) {
(None, None) => 0,
(None, Some(Key { id, .. })) => id - 1,
(Some(Key { id, .. }), None) => id + 1,
(Some(_), Some(_)) => return Err(ListError::AppendOnly),
};
Ok(Key { id, process, op })
}
}
impl InsertableKey for Key<Ratio<BigInt>> {}
#[non_exhaustive]
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum Value<Key> {
Str(String),
Bool(bool),
Float(f64),
Vec(Vec<Value<Key>>),
Tombstone(Key),
Empty,
}
impl<Key: Eq> Eq for Value<Key> {}
impl<Key> From<&str> for Value<Key> {
fn from(value: &str) -> Self {
Value::Str(value.into())
}
}
impl<Key> From<String> for Value<Key> {
fn from(value: String) -> Self {
Value::Str(value)
}
}
impl<Key> From<f64> for Value<Key> {
fn from(value: f64) -> Self {
Value::Float(value)
}
}
impl<Key> From<bool> for Value<Key> {
fn from(value: bool) -> Self {
Value::Bool(value)
}
}
impl<Key> PartialEq<str> for Value<Key> {
fn eq(&self, other: &str) -> bool {
match self {
Self::Str(s) => s.as_str().eq(other),
_ => false,
}
}
}
impl<Key> Value<Key> {
fn visible(&self) -> bool {
!matches!(self, Value::Tombstone(_) | Value::Empty)
}
}
struct Hooks<Key> {
pub update: Option<Box<dyn Fn(&Op<Key>)>>,
pub apply: Option<Box<dyn Fn()>>,
}
impl<Key> std::fmt::Debug for Hooks<Key> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let update = if self.update.is_some() {
"Some(..)"
} else {
"None"
};
let apply = if self.apply.is_some() {
"Some(..)"
} else {
"None"
};
write!(f, "Hooks{{ update: {}, apply: {}}}", update, apply)
}
}
impl<Key> Hooks<Key> {
fn new() -> Self {
Self {
update: None,
apply: None,
}
}
fn set_on_update(&mut self, hook: Box<dyn Fn(&Op<Key>)>) {
self.update = Some(hook)
}
fn unset_on_update(&mut self) {
self.update = None
}
fn set_on_apply(&mut self, hook: Box<dyn Fn()>) {
self.apply = Some(hook)
}
fn unset_on_apply(&mut self) {
self.apply = None
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct Op<Key> {
pub key: Key,
pub value: Value<Key>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ListError {
OutOfOrder {
process: u64,
current_clock: u64,
operation_clock: u64,
},
OutOfBounds,
AppendOnly,
}
impl std::fmt::Display for ListError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for ListError {}
#[derive(Debug)]
pub struct GenericList<Key> {
process: u64,
vclock: VClock,
data: BTreeMap<Key, Value<Key>>,
hooks: Hooks<Key>,
}
unsafe impl<Key> Send for GenericList<Key> {}
impl<T> GenericList<Key<T>>
where
Key<T>: ListKey,
T: Clone,
{
pub fn new(process: u64) -> Self {
Self {
process,
vclock: VClock::new(),
data: BTreeMap::new(),
hooks: Hooks::new(),
}
}
pub fn set_on_update(&mut self, hook: Box<dyn Fn(&Op<Key<T>>)>) {
self.hooks.set_on_update(hook)
}
pub fn unset_on_update(&mut self) {
self.hooks.unset_on_update()
}
pub fn set_on_apply(&mut self, hook: Box<dyn Fn()>) {
self.hooks.set_on_apply(hook)
}
pub fn unset_on_apply(&mut self) {
self.hooks.unset_on_apply()
}
pub fn insert(&mut self, index: usize, value: Value<Key<T>>) -> Result<(), ListError>
where
Key<T>: InsertableKey,
{
self.insert_at(index, value)
}
pub fn append(&mut self, value: Value<Key<T>>) -> Result<(), ListError> {
self.insert_at(self.data.len(), value)
}
pub fn prepend(&mut self, value: Value<Key<T>>) -> Result<(), ListError> {
self.insert_at(0, value)
}
pub fn delete(&mut self, index: usize) -> Result<(), ListError> {
let key = match self
.data
.iter()
.filter(|(_, value)| value.visible())
.map(|x| x.0)
.nth(index)
{
None => return Err(ListError::OutOfBounds),
Some(key) => key,
};
let tombstone_key = Key::new(
key.id.clone(),
self.process,
self.vclock.next_value(self.process),
);
let value = Value::Tombstone(key.clone());
self.on_update(&tombstone_key, &value);
self.insert_key(tombstone_key, value);
Ok(())
}
pub fn apply(&mut self, ops: &[Op<Key<T>>]) -> Result<(), ListError> {
let mut applied = 0;
for op in ops {
let op_key = op.key.clone();
match self.vclock.get_clock(op_key.process) {
clock if clock >= op_key.op => {
continue;
}
clock if clock + 1 != op_key.op => {
return Err(ListError::OutOfOrder {
process: op_key.process,
current_clock: clock,
operation_clock: op_key.op,
});
}
_ => (),
};
applied += 1;
self.insert_key(op_key, op.value.clone())
}
match (applied, &self.hooks.apply) {
(0, _) => (),
(_, None) => (),
(_, Some(hook)) => hook(),
}
Ok(())
}
pub fn to_vec(&self) -> Vec<&Value<Key<T>>> {
self.iter().collect()
}
pub fn iter(&self) -> impl Iterator<Item = &Value<Key<T>>> {
self.data.iter().map(|x| x.1).filter(|x| x.visible())
}
pub fn vclock(&self) -> &VClock {
&self.vclock
}
pub fn diff(&self, other: &VClock) -> Vec<Op<Key<T>>> {
let vdiff: VClockDiff = (self.vclock(), other).into();
let mut ops: Vec<Op<Key<T>>> = Vec::new();
for (key, value) in self.data.iter() {
match vdiff.get_range(key.process) {
Some((start, end)) if key.op > start && key.op <= end => (),
_ => continue,
};
if let Value::Tombstone(tombstone_key) = value {
match vdiff.get_range(tombstone_key.process) {
Some((start, end)) if tombstone_key.op > start && tombstone_key.op <= end => {
ops.push(Op {
key: tombstone_key.clone(),
value: Value::Empty,
});
}
_ => (),
}
}
ops.push(Op {
key: key.clone(),
value: value.clone(),
})
}
ops.sort_by(|left, right| left.key.op.cmp(&right.key.op));
ops
}
pub fn size(&self) -> usize {
self.data.values().filter(|x| x.visible()).count()
}
fn insert_at(&mut self, index: usize, value: Value<Key<T>>) -> Result<(), ListError>
where
Key<T>: ListKey,
{
let mut keys = self.data.keys();
let (left, right) = match index {
index if index > self.data.len() => return Err(ListError::OutOfBounds),
index if index == 0 => (None, keys.next()),
index if index == self.data.len() => (keys.nth(self.data.len() - 1), None),
index => {
let left = (&mut keys).nth(index - 1);
let right = keys.next();
(left, right)
}
};
let key = Key::between(
self.process,
self.vclock.next_value(self.process),
left,
right,
)?;
self.on_update(&key, &value);
self.insert_key(key, value);
Ok(())
}
fn insert_key(&mut self, key: Key<T>, value: Value<Key<T>>) {
self.vclock.inc(key.process);
match value {
Value::Tombstone(ref key) => {
self.data.remove(key);
}
Value::Empty => return,
_ => (),
}
self.data.insert(key, value);
}
fn on_update(&self, key: &Key<T>, value: &Value<Key<T>>) {
if let Some(ref u) = self.hooks.update {
u(&Op {
key: key.clone(),
value: value.clone(),
})
}
}
}
pub type List = GenericList<Key<Ratio<BigInt>>>;
pub type AppendOnlyList = GenericList<Key<i64>>;