use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{self, Debug, Display};
use std::hash::Hash;
use std::mem;
use serde::{Deserialize, Serialize};
use crate::ctx::{AddCtx, ReadCtx, RmCtx};
use crate::{CmRDT, CvRDT, Dot, ResetRemove, VClock};
pub trait Val<A: Ord>: Clone + Default + ResetRemove<A> + CmRDT {}
impl<A, T> Val<A> for T
where
A: Ord,
T: Clone + Default + ResetRemove<A> + CmRDT,
{
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Map<K: Ord, V: Val<A>, A: Ord + Hash> {
clock: VClock<A>,
entries: BTreeMap<K, Entry<V, A>>,
deferred: HashMap<VClock<A>, BTreeSet<K>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct Entry<V: Val<A>, A: Ord> {
clock: VClock<A>,
val: V,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Op<K: Ord, V: Val<A>, A: Ord> {
Rm {
clock: VClock<A>,
keyset: BTreeSet<K>,
},
Up {
dot: Dot<A>,
key: K,
op: V::Op,
},
}
impl<V: Val<A>, A: Ord> Default for Entry<V, A> {
fn default() -> Self {
Self {
clock: VClock::default(),
val: V::default(),
}
}
}
impl<K: Ord, V: Val<A>, A: Ord + Hash> Default for Map<K, V, A> {
fn default() -> Self {
Self {
clock: Default::default(),
entries: Default::default(),
deferred: Default::default(),
}
}
}
impl<K: Ord, V: Val<A>, A: Ord + Hash> ResetRemove<A> for Map<K, V, A> {
fn reset_remove(&mut self, clock: &VClock<A>) {
self.entries = mem::take(&mut self.entries)
.into_iter()
.filter_map(|(key, mut entry)| {
entry.clock.reset_remove(&clock);
entry.val.reset_remove(&clock);
if entry.clock.is_empty() {
None
} else {
Some((key, entry))
}
})
.collect();
self.deferred = mem::take(&mut self.deferred)
.into_iter()
.filter_map(|(mut rm_clock, key)| {
rm_clock.reset_remove(&clock);
if rm_clock.is_empty() {
None
} else {
Some((rm_clock, key))
}
})
.collect();
self.clock.reset_remove(&clock);
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum CmRDTValidation<V: CmRDT, A> {
SourceOrder(crate::DotRange<A>),
Value(V::Validation),
}
impl<V: CmRDT + Debug, A: Debug> Display for CmRDTValidation<V, A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Debug::fmt(&self, f)
}
}
impl<V: CmRDT + Debug, A: Debug> std::error::Error for CmRDTValidation<V, A> {}
#[derive(Debug, PartialEq, Eq)]
pub enum CvRDTValidation<K, V: CvRDT, A> {
DoubleSpentDot {
dot: Dot<A>,
our_key: K,
their_key: K,
},
Value(V::Validation),
}
impl<K: Debug, V: CvRDT + Debug, A: Debug> Display for CvRDTValidation<K, V, A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Debug::fmt(&self, f)
}
}
impl<K: Debug, V: CvRDT + Debug, A: Debug> std::error::Error for CvRDTValidation<K, V, A> {}
impl<K: Ord, V: Val<A> + Debug, A: Ord + Hash + Clone + Debug> CmRDT for Map<K, V, A> {
type Op = Op<K, V, A>;
type Validation = CmRDTValidation<V, A>;
fn validate_op(&self, op: &Self::Op) -> Result<(), Self::Validation> {
match op {
Op::Rm { .. } => Ok(()),
Op::Up { dot, key, op } => {
self.clock
.validate_op(&dot)
.map_err(CmRDTValidation::SourceOrder)?;
let entry = self.entries.get(&key).cloned().unwrap_or_default();
entry
.clock
.validate_op(&dot)
.map_err(CmRDTValidation::SourceOrder)?;
entry.val.validate_op(&op).map_err(CmRDTValidation::Value)
}
}
}
fn apply(&mut self, op: Self::Op) {
match op {
Op::Rm { clock, keyset } => self.apply_keyset_rm(keyset, clock),
Op::Up { dot, key, op } => {
if self.clock.get(&dot.actor) >= dot.counter {
return;
}
let entry = self.entries.entry(key).or_default();
entry.clock.apply(dot.clone());
entry.val.apply(op);
self.clock.apply(dot);
self.apply_deferred();
}
}
}
}
impl<K: Ord + Clone + Debug, V: Val<A> + CvRDT + Debug, A: Ord + Hash + Clone + Debug> CvRDT
for Map<K, V, A>
{
type Validation = CvRDTValidation<K, V, A>;
fn validate_merge(&self, other: &Self) -> Result<(), Self::Validation> {
for (key, entry) in self.entries.iter() {
for (other_key, other_entry) in other.entries.iter() {
for Dot { actor, counter } in entry.clock.iter() {
if other_key != key && other_entry.clock.get(&actor) == counter {
return Err(CvRDTValidation::DoubleSpentDot {
dot: Dot::new(actor.clone(), counter),
our_key: key.clone(),
their_key: other_key.clone(),
});
}
}
if key == other_key && entry.clock.concurrent(&other_entry.clock) {
entry
.val
.validate_merge(&other_entry.val)
.map_err(CvRDTValidation::Value)?;
}
}
}
Ok(())
}
fn merge(&mut self, other: Self) {
self.entries = mem::take(&mut self.entries)
.into_iter()
.filter_map(|(key, mut entry)| {
if !other.entries.contains_key(&key) {
if other.clock >= entry.clock {
None
} else {
entry.clock.reset_remove(&other.clock);
let mut removed_information = other.clock.clone();
removed_information.reset_remove(&entry.clock);
entry.val.reset_remove(&removed_information);
Some((key, entry))
}
} else {
Some((key, entry))
}
})
.collect();
for (key, mut entry) in other.entries {
if let Some(our_entry) = self.entries.get_mut(&key) {
let mut common = VClock::intersection(&entry.clock, &our_entry.clock);
common.merge(entry.clock.clone_without(&self.clock));
common.merge(our_entry.clock.clone_without(&other.clock));
if common.is_empty() {
self.entries.remove(&key).unwrap();
} else {
our_entry.val.merge(entry.val);
let mut information_that_was_deleted = entry.clock.clone();
information_that_was_deleted.merge(our_entry.clock.clone());
information_that_was_deleted.reset_remove(&common);
our_entry.val.reset_remove(&information_that_was_deleted);
our_entry.clock = common;
}
} else {
if self.clock >= entry.clock {
} else {
entry.clock.reset_remove(&self.clock);
let mut information_we_deleted = self.clock.clone();
information_we_deleted.reset_remove(&entry.clock);
entry.val.reset_remove(&information_we_deleted);
self.entries.insert(key, entry);
}
}
}
for (rm_clock, keys) in other.deferred {
self.apply_keyset_rm(keys, rm_clock);
}
self.clock.merge(other.clock);
self.apply_deferred();
}
}
impl<K: Ord, V: Val<A>, A: Ord + Hash + Clone> Map<K, V, A> {
pub fn new() -> Self {
Default::default()
}
pub fn is_empty(&self) -> ReadCtx<bool, A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: self.entries.is_empty(),
}
}
pub fn len(&self) -> ReadCtx<usize, A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: self.entries.len(),
}
}
pub fn get(&self, key: &K) -> ReadCtx<Option<V>, A> {
let add_clock = self.clock.clone();
let entry_opt = self.entries.get(&key);
ReadCtx {
add_clock,
rm_clock: entry_opt
.map(|map_entry| map_entry.clock.clone())
.unwrap_or_default(),
val: entry_opt.map(|map_entry| map_entry.val.clone()),
}
}
pub fn update<F>(&self, key: impl Into<K>, ctx: AddCtx<A>, f: F) -> Op<K, V, A>
where
F: FnOnce(&V, AddCtx<A>) -> V::Op,
{
let key = key.into();
let dot = ctx.dot.clone();
let op = match self.entries.get(&key).map(|e| &e.val) {
Some(data) => f(&data, ctx),
None => f(&V::default(), ctx),
};
Op::Up { dot, key, op }
}
pub fn rm(&self, key: impl Into<K>, ctx: RmCtx<A>) -> Op<K, V, A> {
let mut keyset = BTreeSet::new();
keyset.insert(key.into());
Op::Rm {
clock: ctx.clock,
keyset,
}
}
pub fn read_ctx(&self) -> ReadCtx<(), A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: (),
}
}
fn apply_deferred(&mut self) {
let deferred = mem::take(&mut self.deferred);
for (clock, keys) in deferred {
self.apply_keyset_rm(keys, clock);
}
}
fn apply_keyset_rm(&mut self, mut keyset: BTreeSet<K>, clock: VClock<A>) {
for key in keyset.iter() {
if let Some(entry) = self.entries.get_mut(&key) {
entry.clock.reset_remove(&clock);
if entry.clock.is_empty() {
self.entries.remove(&key);
} else {
entry.val.reset_remove(&clock);
}
}
}
match self.clock.partial_cmp(&clock) {
None | Some(Ordering::Less) => {
let deferred_set = self.deferred.entry(clock).or_default();
deferred_set.append(&mut keyset);
}
_ => { }
}
}
pub fn keys(&self) -> impl Iterator<Item = ReadCtx<&K, A>> {
self.entries.iter().map(move |(k, v)| ReadCtx {
add_clock: self.clock.clone(),
rm_clock: v.clock.clone(),
val: k,
})
}
pub fn values(&self) -> impl Iterator<Item = ReadCtx<&V, A>> {
self.entries.values().map(move |v| ReadCtx {
add_clock: self.clock.clone(),
rm_clock: v.clock.clone(),
val: &v.val,
})
}
pub fn iter(&self) -> impl Iterator<Item = ReadCtx<(&K, &V), A>> {
self.entries.iter().map(move |(k, v)| ReadCtx {
add_clock: self.clock.clone(),
rm_clock: v.clock.clone(),
val: (k, &v.val),
})
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::mvreg::{self, MVReg};
use crate::orswot::Orswot;
type TestActor = u8;
type TestKey = u8;
type TestVal = MVReg<u8, TestActor>;
type TestMap = Map<TestKey, Map<TestKey, TestVal, TestActor>, TestActor>;
#[test]
fn test_get() {
let mut m: TestMap = Map::new();
assert_eq!(m.get(&0).val, None);
m.clock.apply(m.clock.inc(1));
m.entries.insert(
0,
Entry {
clock: m.clock.clone(),
val: Map::default(),
},
);
assert_eq!(m.get(&0).val, Some(Map::new()));
}
#[test]
fn test_op_exchange_converges_quickcheck1() {
let op_actor1 = Op::Up {
dot: Dot::new(0, 3),
key: 9,
op: Op::Up {
dot: Dot::new(0, 3),
key: 0,
op: mvreg::Op::Put {
clock: Dot::new(0, 3).into(),
val: 0,
},
},
};
let op_1_actor2 = Op::Up {
dot: Dot::new(1, 1),
key: 9,
op: Op::Rm {
clock: Dot::new(1, 1).into(),
keyset: vec![0].into_iter().collect(),
},
};
let op_2_actor2 = Op::Rm {
clock: Dot::new(1, 2).into(),
keyset: vec![9].into_iter().collect(),
};
let mut m1: TestMap = Map::new();
let mut m2: TestMap = Map::new();
m1.apply(op_actor1.clone());
assert_eq!(m1.clock, Dot::new(0, 3).into());
assert_eq!(m1.entries[&9].clock, Dot::new(0, 3).into());
assert_eq!(m1.entries[&9].val.deferred.len(), 0);
m2.apply(op_1_actor2.clone());
m2.apply(op_2_actor2.clone());
assert_eq!(m2.clock, Dot::new(1, 1).into());
assert_eq!(m2.entries.get(&9), None);
assert_eq!(
m2.deferred.get(&Dot::new(1, 2).into()),
Some(&vec![9].into_iter().collect())
);
m1.apply(op_1_actor2);
m1.apply(op_2_actor2);
m2.apply(op_actor1);
assert_eq!(m1, m2);
}
#[test]
fn merge_error() {
let mut m1: Map<u8, Orswot<u8, u8>, u8> = Map {
clock: VClock::from(Dot::new(75, 1)),
entries: BTreeMap::new(),
deferred: HashMap::new(),
};
let mut m2: Map<u8, Orswot<u8, u8>, u8> = Map {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
entries: vec![(
101,
Entry {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
val: Orswot {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
entries: vec![
(1, VClock::from(Dot::new(75, 1))),
(2, VClock::from(Dot::new(93, 1))),
]
.into_iter()
.collect(),
deferred: HashMap::new(),
},
},
)]
.into_iter()
.collect(),
deferred: HashMap::new(),
};
m1.merge(m2.clone());
assert_eq!(
m1,
Map {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
entries: vec![(
101,
Entry {
clock: Dot::new(93, 1).into(),
val: Orswot {
clock: vec![Dot::new(93, 1)].into_iter().collect(),
entries: vec![(2, VClock::from(Dot::new(93, 1)))]
.into_iter()
.collect(),
deferred: HashMap::new()
}
}
)]
.into_iter()
.collect(),
deferred: HashMap::new()
}
);
m2.merge(m1.clone());
assert_eq!(m1, m2);
}
}