use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::mem;
use serde::{Deserialize, Serialize};
use crate::ctx::{AddCtx, ReadCtx, RmCtx};
use crate::quickcheck::{Arbitrary, Gen};
use crate::{CmRDT, CvRDT, Dot, ResetRemove, VClock};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Orswot<M: Hash + Eq, A: Ord + Hash> {
pub(crate) clock: VClock<A>,
pub(crate) entries: HashMap<M, VClock<A>>,
pub(crate) deferred: HashMap<VClock<A>, HashSet<M>>,
}
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Op<M, A: Ord> {
Add {
dot: Dot<A>,
members: Vec<M>,
},
Rm {
clock: VClock<A>,
members: Vec<M>,
},
}
impl<M: Hash + Eq, A: Ord + Hash> Default for Orswot<M, A> {
fn default() -> Self {
Orswot {
clock: Default::default(),
entries: Default::default(),
deferred: Default::default(),
}
}
}
impl<M: Hash + Clone + Eq, A: Ord + Hash + Clone + Debug> CmRDT for Orswot<M, A> {
type Op = Op<M, A>;
type Validation = <VClock<A> as CmRDT>::Validation;
fn validate_op(&self, op: &Self::Op) -> Result<(), Self::Validation> {
match op {
Op::Add { dot, .. } => self.clock.validate_op(dot),
Op::Rm { .. } => Ok(()),
}
}
fn apply(&mut self, op: Self::Op) {
match op {
Op::Add { dot, members } => {
if self.clock.get(&dot.actor) >= dot.counter {
return;
}
for member in members {
let member_vclock = self.entries.entry(member).or_default();
member_vclock.apply(dot.clone());
}
self.clock.apply(dot);
self.apply_deferred();
}
Op::Rm { clock, members } => {
self.apply_rm(members.into_iter().collect(), clock);
}
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum Validation<M, A> {
DoubleSpentDot {
dot: Dot<A>,
our_member: M,
their_member: M,
},
}
impl<M: Debug, A: Debug> Display for Validation<M, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self, f)
}
}
impl<M: Debug, A: Debug> std::error::Error for Validation<M, A> {}
impl<M: Hash + Eq + Clone + Debug, A: Ord + Hash + Clone + Debug> CvRDT for Orswot<M, A> {
type Validation = Validation<M, A>;
fn validate_merge(&self, other: &Self) -> Result<(), Self::Validation> {
for (member, clock) in self.entries.iter() {
for (other_member, other_clock) in other.entries.iter() {
for Dot { actor, counter } in clock.iter() {
if other_member != member && other_clock.get(&actor) == counter {
return Err(Validation::DoubleSpentDot {
dot: Dot::new(actor.clone(), counter),
our_member: member.clone(),
their_member: other_member.clone(),
});
}
}
}
}
Ok(())
}
fn merge(&mut self, other: Self) {
self.entries = mem::take(&mut self.entries)
.into_iter()
.filter_map(|(entry, mut clock)| {
if !other.entries.contains_key(&entry) {
if other.clock >= clock {
None
} else {
clock.reset_remove(&other.clock);
Some((entry, clock))
}
} else {
Some((entry, clock))
}
})
.collect();
for (entry, mut clock) in other.entries {
if let Some(our_clock) = self.entries.get_mut(&entry) {
let mut common = VClock::intersection(&clock, &our_clock);
common.merge(clock.clone_without(&self.clock));
common.merge(our_clock.clone_without(&other.clock));
if common.is_empty() {
self.entries.remove(&entry).unwrap();
} else {
*our_clock = common;
}
} else {
if self.clock >= clock {
} else {
clock.reset_remove(&self.clock);
self.entries.insert(entry, clock);
}
}
}
for (rm_clock, members) in other.deferred {
self.apply_rm(members, rm_clock);
}
self.clock.merge(other.clock);
self.apply_deferred();
}
}
impl<M: Hash + Clone + Eq, A: Ord + Hash> ResetRemove<A> for Orswot<M, A> {
fn reset_remove(&mut self, clock: &VClock<A>) {
self.clock.reset_remove(&clock);
self.entries = mem::take(&mut self.entries)
.into_iter()
.filter_map(|(val, mut val_clock)| {
val_clock.reset_remove(&clock);
if val_clock.is_empty() {
None
} else {
Some((val, val_clock))
}
})
.collect();
self.deferred = mem::take(&mut self.deferred)
.into_iter()
.filter_map(|(mut vclock, deferred)| {
vclock.reset_remove(&clock);
if vclock.is_empty() {
None
} else {
Some((vclock, deferred))
}
})
.collect();
}
}
impl<M: Hash + Clone + Eq, A: Ord + Hash + Clone> Orswot<M, A> {
pub fn new() -> Self {
Default::default()
}
pub fn clock(&self) -> VClock<A> {
self.clock.clone()
}
pub fn add(&self, member: M, ctx: AddCtx<A>) -> Op<M, A> {
Op::Add {
dot: ctx.dot,
members: std::iter::once(member).collect(),
}
}
pub fn add_all<I: IntoIterator<Item = M>>(&self, members: I, ctx: AddCtx<A>) -> Op<M, A> {
Op::Add {
dot: ctx.dot,
members: members.into_iter().collect(),
}
}
pub fn rm(&self, member: M, ctx: RmCtx<A>) -> Op<M, A> {
Op::Rm {
clock: ctx.clock,
members: std::iter::once(member).collect(),
}
}
pub fn rm_all<I: IntoIterator<Item = M>>(&self, members: I, ctx: RmCtx<A>) -> Op<M, A> {
Op::Rm {
clock: ctx.clock,
members: members.into_iter().collect(),
}
}
fn apply_rm(&mut self, members: HashSet<M>, clock: VClock<A>) {
for member in members.iter() {
if let Some(member_clock) = self.entries.get_mut(&member) {
member_clock.reset_remove(&clock);
if member_clock.is_empty() {
self.entries.remove(&member);
}
}
}
match clock.partial_cmp(&self.clock) {
None | Some(Ordering::Greater) => {
if let Some(existing_deferred) = self.deferred.get_mut(&clock) {
existing_deferred.extend(members);
} else {
self.deferred.insert(clock, members);
}
}
_ => { }
}
}
pub fn contains(&self, member: &M) -> ReadCtx<bool, A> {
let member_clock_opt = self.entries.get(&member);
let exists = member_clock_opt.is_some();
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: member_clock_opt.cloned().unwrap_or_default(),
val: exists,
}
}
pub fn iter(&self) -> impl Iterator<Item = ReadCtx<&M, A>> {
self.entries.iter().map(move |(m, clock)| ReadCtx {
add_clock: self.clock.clone(),
rm_clock: clock.clone(),
val: m,
})
}
pub fn read(&self) -> ReadCtx<HashSet<M>, A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: self.entries.keys().cloned().collect(),
}
}
pub fn read_ctx(&self) -> ReadCtx<(), A> {
ReadCtx {
add_clock: self.clock.clone(),
rm_clock: self.clock.clone(),
val: (),
}
}
fn apply_deferred(&mut self) {
let deferred = mem::take(&mut self.deferred);
for (clock, entries) in deferred.into_iter() {
self.apply_rm(entries, clock)
}
}
}
impl<A: Ord + Hash + Arbitrary + Debug, M: Hash + Eq + Arbitrary> Arbitrary for Op<M, A> {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let dot = Dot::arbitrary(g);
let clock = VClock::arbitrary(g);
let mut members_set = HashSet::new();
for _ in 0..u8::arbitrary(g) % 10 {
members_set.insert(M::arbitrary(g));
}
let members: Vec<_> = members_set.into_iter().collect();
match u8::arbitrary(g) % 2 {
0 => Op::Add { members, dot },
1 => Op::Rm { members, clock },
_ => panic!("tried to generate invalid op"),
}
}
fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
let mut shrunk_ops = Vec::new();
match self {
Op::Add { members, dot } => {
for (i, _m) in members.iter().enumerate() {
let mut shrunk_members = members.clone();
shrunk_members.remove(i);
shrunk_ops.push(Op::Add {
members: shrunk_members,
dot: dot.clone(),
});
}
dot.shrink().for_each(|shrunk_dot| {
shrunk_ops.push(Op::Add {
members: members.clone(),
dot: shrunk_dot,
})
});
}
Op::Rm { members, clock } => {
for (i, _m) in members.iter().enumerate() {
let mut shrunk_members = members.clone();
shrunk_members.remove(i);
shrunk_ops.push(Op::Rm {
members: shrunk_members,
clock: clock.clone(),
});
}
clock.shrink().for_each(|shrunk_clock| {
shrunk_ops.push(Op::Rm {
members: members.clone(),
clock: shrunk_clock,
})
});
}
}
Box::new(shrunk_ops.into_iter())
}
}
impl<M: Debug, A: Ord + Hash + Debug> Debug for Op<M, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Op::Add { dot, members } => write!(f, "Add({:?}, {:?})", dot, members),
Op::Rm { clock, members } => write!(f, "Rm({:?}, {:?})", clock, members),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
extern crate rand;
#[test]
fn ensure_deferred_merges() {
let mut a = Orswot::new();
let mut b = Orswot::new();
b.apply(b.add("element 1", b.read().derive_add_ctx("A")));
b.apply(b.rm(
"element 1",
RmCtx {
clock: Dot::new("A", 4).into(),
},
));
a.apply(a.add("element 4", a.read().derive_add_ctx("B")));
b.apply(b.rm(
"element 9",
RmCtx {
clock: Dot::new("C", 4).into(),
},
));
let mut merged = Orswot::new();
merged.merge(a);
merged.merge(b);
merged.merge(Orswot::new());
assert_eq!(merged.deferred.len(), 2);
}
#[test]
fn preserve_deferred_across_merges() {
let mut a = Orswot::new();
let mut b = a.clone();
let mut c = a.clone();
a.apply(a.add(5, a.read().derive_add_ctx("A")));
let mut vc = VClock::new();
vc.apply(Dot::new("A", 3));
vc.apply(Dot::new("B", 8));
b.apply(b.rm(5, RmCtx { clock: vc }));
assert_eq!(b.deferred.len(), 1);
c.merge(b);
assert_eq!(c.deferred.len(), 1);
a.merge(c);
assert!(a.read().val.is_empty());
}
#[test]
fn test_present_but_removed() {
let mut a = Orswot::new();
let mut b = Orswot::new();
a.apply(a.add(0, a.read().derive_add_ctx("A")));
let c = a.clone();
a.apply(a.rm(0, a.contains(&0).derive_rm_ctx()));
assert_eq!(a.deferred.len(), 0);
b.apply(b.add(0, b.read().derive_add_ctx("B")));
a.merge(b.clone());
b.apply(b.rm(0, b.contains(&0).derive_rm_ctx()));
a.merge(b);
a.merge(c);
assert!(a.read().val.is_empty());
}
}