use hashbrown::HashMap;
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::mem;
use serde::{Deserialize, Serialize};
use crate::ctx::{AddCtx, ReadCtx, RmCtx};
use crate::traits::{Causal, CmRDT, CvRDT};
use crate::vclock::{Actor, Dot, VClock};
pub trait Key: Debug + Ord + Clone {}
impl<T: Debug + Ord + Clone> Key for T {}
pub trait Val<A: Actor>: Debug + Default + Clone + Causal<A> + CmRDT + CvRDT {}
impl<A, T> Val<A> for T
where
A: Actor,
T: Debug + Default + Clone + Causal<A> + CmRDT + CvRDT,
{
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Map<K: Key, V: Val<A>, A: Actor> {
clock: VClock<A>,
entries: BTreeMap<K, Entry<V, A>>,
deferred: HashMap<VClock<A>, BTreeSet<K>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct Entry<V: Val<A>, A: Actor> {
clock: VClock<A>,
val: V,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Op<K: Key, V: Val<A>, A: Actor> {
Nop,
Rm {
clock: VClock<A>,
keyset: BTreeSet<K>,
},
Up {
dot: Dot<A>,
key: K,
op: V::Op,
},
}
impl<V: Val<A>, A: Actor> Default for Entry<V, A> {
fn default() -> Self {
Entry {
clock: VClock::default(),
val: V::default(),
}
}
}
impl<K: Key, V: Val<A>, A: Actor> Default for Map<K, V, A> {
fn default() -> Self {
Map::new()
}
}
impl<K: Key, V: Val<A>, A: Actor> Causal<A> for Map<K, V, A> {
fn forget(&mut self, clock: &VClock<A>) {
self.entries = mem::replace(&mut self.entries, BTreeMap::new())
.into_iter()
.filter_map(|(key, mut entry)| {
entry.clock.forget(&clock);
entry.val.forget(&clock);
if entry.clock.is_empty() {
None
} else {
Some((key, entry))
}
})
.collect();
self.deferred = mem::replace(&mut self.deferred, HashMap::new())
.into_iter()
.filter_map(|(mut rm_clock, key)| {
rm_clock.forget(&clock);
if rm_clock.is_empty() {
None
} else {
Some((rm_clock, key))
}
})
.collect();
self.clock.forget(&clock);
}
}
impl<K: Key, V: Val<A>, A: Actor> CmRDT for Map<K, V, A> {
type Op = Op<K, V, A>;
fn apply(&mut self, op: Self::Op) {
match op {
Op::Nop => { }
Op::Rm { clock, keyset } => self.apply_keyset_rm(keyset, clock),
Op::Up { dot, key, op } => {
if self.clock.get(&dot.actor) >= dot.counter {
return;
}
let entry = self.entries.entry(key).or_default();
entry.clock.apply(dot.clone());
entry.val.apply(op);
self.clock.apply(dot);
self.apply_deferred();
}
}
}
}
impl<K: Key, V: Val<A>, A: Actor> CvRDT for Map<K, V, A> {
fn merge(&mut self, other: Self) {
self.entries = mem::replace(&mut self.entries, BTreeMap::new())
.into_iter()
.filter_map(|(key, mut entry)| {
if !other.entries.contains_key(&key) {
if other.clock >= entry.clock {
None
} else {
entry.clock.forget(&other.clock);
let mut removed_information = other.clock.clone();
removed_information.forget(&entry.clock);
entry.val.forget(&removed_information);
Some((key, entry))
}
} else {
Some((key, entry))
}
})
.collect();
for (key, mut entry) in other.entries {
if let Some(our_entry) = self.entries.get_mut(&key) {
let mut common = VClock::intersection(&entry.clock, &our_entry.clock);
common.merge(entry.clock.clone_without(&self.clock));
common.merge(our_entry.clock.clone_without(&other.clock));
if common.is_empty() {
self.entries.remove(&key).unwrap();
} else {
our_entry.val.merge(entry.val);
let mut information_that_was_deleted = entry.clock.clone();
information_that_was_deleted.merge(our_entry.clock.clone());
information_that_was_deleted.forget(&common);
our_entry.val.forget(&information_that_was_deleted);
our_entry.clock = common;
}
} else {
if self.clock >= entry.clock {
} else {
entry.clock.forget(&self.clock);
let mut information_we_deleted = self.clock.clone();
information_we_deleted.forget(&entry.clock);
entry.val.forget(&information_we_deleted);
self.entries.insert(key, entry);
}
}
}
for (rm_clock, keys) in other.deferred {
self.apply_keyset_rm(keys, rm_clock);
}
self.clock.merge(other.clock);
self.apply_deferred();
}
}
impl<K: Key, V: Val<A>, A: Actor> Map<K, V, A> {
pub fn new() -> Self {
Map {
clock: VClock::new(),
entries: BTreeMap::new(),
deferred: HashMap::new(),
}
}
pub fn is_empty(&self) -> ReadCtx<bool, A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: self.entries.is_empty(),
}
}
pub fn len(&self) -> ReadCtx<usize, A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: self.entries.len(),
}
}
pub fn get(&self, key: &K) -> ReadCtx<Option<V>, A> {
let add_clock = self.clock.clone();
let entry_opt = self.entries.get(&key);
ReadCtx {
add_clock,
rm_clock: entry_opt
.map(|map_entry| map_entry.clock.clone())
.unwrap_or_default(),
val: entry_opt.map(|map_entry| map_entry.val.clone()),
}
}
pub fn update<F, I>(&self, key: I, ctx: AddCtx<A>, f: F) -> Op<K, V, A>
where
F: FnOnce(&V, AddCtx<A>) -> V::Op,
I: Into<K>,
{
let key = key.into();
let dot = ctx.dot.clone();
let op = match self.entries.get(&key).map(|e| &e.val) {
Some(data) => f(&data, ctx),
None => f(&V::default(), ctx),
};
Op::Up { dot, key, op }
}
pub fn rm(&self, key: impl Into<K>, ctx: RmCtx<A>) -> Op<K, V, A> {
let mut keyset = BTreeSet::new();
keyset.insert(key.into());
Op::Rm {
clock: ctx.clock,
keyset: keyset,
}
}
fn apply_deferred(&mut self) {
let deferred = mem::replace(&mut self.deferred, HashMap::new());
for (clock, keys) in deferred {
self.apply_keyset_rm(keys, clock);
}
}
fn apply_keyset_rm(&mut self, mut keyset: BTreeSet<K>, clock: VClock<A>) {
for key in keyset.iter() {
if let Some(entry) = self.entries.get_mut(&key) {
entry.clock.forget(&clock);
if entry.clock.is_empty() {
self.entries.remove(&key);
} else {
entry.val.forget(&clock);
}
}
}
match self.clock.partial_cmp(&clock) {
None | Some(Ordering::Less) => {
let deferred_set = self.deferred.entry(clock).or_default();
deferred_set.append(&mut keyset);
}
_ => { }
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::mvreg::{self, MVReg};
use crate::orswot::Orswot;
type TestActor = u8;
type TestKey = u8;
type TestVal = MVReg<u8, TestActor>;
type TestMap = Map<TestKey, Map<TestKey, TestVal, TestActor>, TestActor>;
#[test]
fn test_get() {
let mut m: TestMap = Map::new();
assert_eq!(m.get(&0).val, None);
m.clock.apply(m.clock.inc(1));
m.entries.insert(
0,
Entry {
clock: m.clock.clone(),
val: Map::default(),
},
);
assert_eq!(m.get(&0).val, Some(Map::new()));
}
#[test]
fn test_op_exchange_converges_quickcheck1() {
let op_actor1 = Op::Up {
dot: Dot::new(0, 3),
key: 9,
op: Op::Up {
dot: Dot::new(0, 3),
key: 0,
op: mvreg::Op::Put {
clock: Dot::new(0, 3).into(),
val: 0,
},
},
};
let op_1_actor2 = Op::Up {
dot: Dot::new(1, 1),
key: 9,
op: Op::Rm {
clock: Dot::new(1, 1).into(),
keyset: vec![0].into_iter().collect(),
},
};
let op_2_actor2 = Op::Rm {
clock: Dot::new(1, 2).into(),
keyset: vec![9].into_iter().collect(),
};
let mut m1: TestMap = Map::new();
let mut m2: TestMap = Map::new();
m1.apply(op_actor1.clone());
assert_eq!(m1.clock, Dot::new(0, 3).into());
assert_eq!(m1.entries[&9].clock, Dot::new(0, 3).into());
assert_eq!(m1.entries[&9].val.deferred.len(), 0);
m2.apply(op_1_actor2.clone());
m2.apply(op_2_actor2.clone());
assert_eq!(m2.clock, Dot::new(1, 1).into());
assert_eq!(m2.entries.get(&9), None);
assert_eq!(
m2.deferred.get(&Dot::new(1, 2).into()),
Some(&vec![9].into_iter().collect())
);
m1.apply(op_1_actor2);
m1.apply(op_2_actor2);
m2.apply(op_actor1);
assert_eq!(m1, m2);
}
#[test]
fn merge_error() {
let mut m1: Map<u8, Orswot<u8, u8>, u8> = Map {
clock: VClock::from(Dot::new(75, 1)),
entries: BTreeMap::new(),
deferred: HashMap::new(),
};
let mut m2: Map<u8, Orswot<u8, u8>, u8> = Map {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
entries: vec![(
101,
Entry {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
val: Orswot {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
entries: vec![
(1, VClock::from(Dot::new(75, 1))),
(2, VClock::from(Dot::new(93, 1))),
]
.into_iter()
.collect(),
deferred: HashMap::new(),
},
},
)]
.into_iter()
.collect(),
deferred: HashMap::new(),
};
m1.merge(m2.clone());
assert_eq!(
m1,
Map {
clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
entries: vec![(
101,
Entry {
clock: Dot::new(93, 1).into(),
val: Orswot {
clock: vec![Dot::new(93, 1)].into_iter().collect(),
entries: vec![(2, VClock::from(Dot::new(93, 1)))]
.into_iter()
.collect(),
deferred: HashMap::new()
}
}
)]
.into_iter()
.collect(),
deferred: HashMap::new()
}
);
m2.merge(m1.clone());
assert_eq!(m1, m2);
}
}