use hashbrown::{HashMap, HashSet};
use std::cmp::Ordering;
use std::fmt::Debug;
use std::hash::Hash;
use std::mem;
use serde::{Deserialize, Serialize};
use crate::ctx::{AddCtx, ReadCtx, RmCtx};
use crate::traits::{Causal, CmRDT, CvRDT};
use crate::vclock::{Actor, Dot, VClock};
pub trait Member: Debug + Clone + Hash + Eq {}
impl<T: Debug + Clone + Hash + Eq> Member for T {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Orswot<M: Member, A: Actor> {
pub(crate) clock: VClock<A>,
pub(crate) entries: HashMap<M, VClock<A>>,
pub(crate) deferred: hashbrown::HashMap<VClock<A>, HashSet<M>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Op<M: Member, A: Actor> {
Add {
dot: Dot<A>,
member: M,
},
Rm {
clock: VClock<A>,
members: HashSet<M>,
},
}
impl<M: Member, A: Actor> Default for Orswot<M, A> {
fn default() -> Self {
Orswot::new()
}
}
impl<M: Member, A: Actor> CmRDT for Orswot<M, A> {
type Op = Op<M, A>;
fn apply(&mut self, op: Self::Op) {
match op {
Op::Add { dot, member } => {
if self.clock.get(&dot.actor) >= dot.counter {
return;
}
let member_vclock = self.entries.entry(member).or_default();
member_vclock.apply(dot.clone());
self.clock.apply(dot);
self.apply_deferred();
}
Op::Rm { clock, members } => {
self.apply_rm(members, clock);
}
}
}
}
impl<M: Member, A: Actor> CvRDT for Orswot<M, A> {
fn merge(&mut self, other: Self) {
self.entries = mem::replace(&mut self.entries, HashMap::new())
.into_iter()
.filter_map(|(entry, mut clock)| {
if !other.entries.contains_key(&entry) {
if other.clock >= clock {
None
} else {
clock.forget(&other.clock);
Some((entry, clock))
}
} else {
Some((entry, clock))
}
})
.collect();
for (entry, mut clock) in other.entries {
if let Some(our_clock) = self.entries.get_mut(&entry) {
let mut common = VClock::intersection(&clock, &our_clock);
common.merge(clock.clone_without(&self.clock));
common.merge(our_clock.clone_without(&other.clock));
if common.is_empty() {
self.entries.remove(&entry).unwrap();
} else {
*our_clock = common;
}
} else {
if self.clock >= clock {
} else {
clock.forget(&self.clock);
self.entries.insert(entry, clock);
}
}
}
for (rm_clock, members) in other.deferred {
self.apply_rm(members, rm_clock);
}
self.clock.merge(other.clock);
self.apply_deferred();
}
}
impl<M: Member, A: Actor> Causal<A> for Orswot<M, A> {
fn forget(&mut self, clock: &VClock<A>) {
self.clock.forget(&clock);
self.entries = self
.entries
.clone()
.into_iter()
.filter_map(|(val, mut val_clock)| {
val_clock.forget(&clock);
if val_clock.is_empty() {
None
} else {
Some((val, val_clock))
}
})
.collect();
self.deferred = self
.deferred
.clone()
.into_iter()
.filter_map(|(mut vclock, deferred)| {
vclock.forget(&clock);
if vclock.is_empty() {
None
} else {
Some((vclock, deferred))
}
})
.collect();
}
}
impl<M: Member, A: Actor> Orswot<M, A> {
pub fn new() -> Self {
Orswot {
clock: VClock::new(),
entries: HashMap::new(),
deferred: HashMap::new(),
}
}
pub fn add(&self, member: M, ctx: AddCtx<A>) -> Op<M, A> {
Op::Add {
dot: ctx.dot,
member,
}
}
pub fn rm(&self, member: M, ctx: RmCtx<A>) -> Op<M, A> {
let mut members = HashSet::new();
members.insert(member);
Op::Rm {
clock: ctx.clock,
members,
}
}
fn apply_rm(&mut self, members: HashSet<M>, clock: VClock<A>) {
for member in members.iter() {
if let Some(member_clock) = self.entries.get_mut(&member) {
member_clock.forget(&clock);
if member_clock.is_empty() {
self.entries.remove(&member);
}
}
}
match clock.partial_cmp(&self.clock) {
None | Some(Ordering::Greater) => {
if let Some(existing_deferred) = self.deferred.get_mut(&clock) {
existing_deferred.extend(members);
} else {
self.deferred.insert(clock, members);
}
}
_ => { }
}
}
pub fn contains(&self, member: &M) -> ReadCtx<bool, A> {
let member_clock_opt = self.entries.get(&member);
let exists = member_clock_opt.is_some();
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: member_clock_opt.cloned().unwrap_or_default(),
val: exists,
}
}
pub fn read(&self) -> ReadCtx<HashSet<M>, A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: self.entries.keys().cloned().collect(),
}
}
fn apply_deferred(&mut self) {
let deferred = mem::replace(&mut self.deferred, HashMap::new());
for (clock, entries) in deferred.into_iter() {
self.apply_rm(entries, clock)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
extern crate rand;
#[test]
fn ensure_deferred_merges() {
let mut a = Orswot::new();
let mut b = Orswot::new();
b.apply(b.add("element 1", b.read().derive_add_ctx("A")));
b.apply(b.rm(
"element 1",
RmCtx {
clock: Dot::new("A", 4).into(),
},
));
a.apply(a.add("element 4", a.read().derive_add_ctx("B")));
b.apply(b.rm(
"element 9",
RmCtx {
clock: Dot::new("C", 4).into(),
},
));
let mut merged = Orswot::new();
merged.merge(a);
merged.merge(b);
merged.merge(Orswot::new());
assert_eq!(merged.deferred.len(), 2);
}
#[test]
fn preserve_deferred_across_merges() {
let mut a = Orswot::new();
let mut b = a.clone();
let mut c = a.clone();
a.apply(a.add(5, a.read().derive_add_ctx("A")));
let mut vc = VClock::new();
vc.apply(Dot::new("A", 3));
vc.apply(Dot::new("B", 8));
b.apply(b.rm(5, RmCtx { clock: vc }));
assert_eq!(b.deferred.len(), 1);
c.merge(b);
assert_eq!(c.deferred.len(), 1);
a.merge(c);
assert!(a.read().val.is_empty());
}
#[test]
fn test_present_but_removed() {
let mut a = Orswot::new();
let mut b = Orswot::new();
a.apply(a.add(0, a.read().derive_add_ctx("A")));
let c = a.clone();
a.apply(a.rm(0, a.contains(&0).derive_rm_ctx()));
assert_eq!(a.deferred.len(), 0);
b.apply(b.add(0, b.read().derive_add_ctx("B")));
a.merge(b.clone());
b.apply(b.rm(0, b.contains(&0).derive_rm_ctx()));
a.merge(b);
a.merge(c);
assert!(a.read().val.is_empty());
}
}