#![allow(missing_docs)]
use std::cmp::Ordering;
use std::collections::*;
use serde::{self, Deserialize, Serialize};
use crate::{Actor, Dot};
#[derive(Debug, Serialize, Deserialize)]
pub struct CausalityBarrier<A: Actor, T: CausalOp<A>> {
peers: HashMap<A, VectorEntry>,
pub buffer: HashMap<Dot<A>, T>,
}
type LogTime = u64;
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct VectorEntry {
next_version: LogTime,
exceptions: HashSet<LogTime>,
}
impl VectorEntry {
pub fn new() -> Self {
VectorEntry::default()
}
pub fn increment(&mut self, clk: LogTime) {
match clk.cmp(&self.next_version) {
Ordering::Less => {
self.exceptions.remove(&clk);
}
Ordering::Equal => self.next_version += 1,
Ordering::Greater => (self.next_version + 1..clk).for_each(|i| {
self.exceptions.insert(i);
}),
};
}
pub fn is_ready(&self, clk: LogTime) -> bool {
clk < self.next_version && self.no_exceptions(clk)
}
pub fn diff_from(&self, other: &Self) -> HashSet<LogTime> {
let local_ops =
(other.next_version..self.next_version).filter(|ix: &LogTime| self.no_exceptions(*ix));
let mut local_exceptions = other.exceptions.difference(&self.exceptions).cloned();
local_ops.chain(&mut local_exceptions).collect()
}
fn no_exceptions(&self, clk: LogTime) -> bool {
!self.exceptions.contains(&clk)
}
}
pub trait CausalOp<A> {
fn happens_after(&self) -> Option<Dot<A>>;
fn dot(&self) -> Dot<A>;
}
impl<A: Actor, T: CausalOp<A>> Default for CausalityBarrier<A, T> {
fn default() -> Self {
CausalityBarrier {
peers: HashMap::new(),
buffer: HashMap::new(),
}
}
}
impl<A: Actor, T: CausalOp<A>> CausalityBarrier<A, T> {
pub fn new() -> Self {
CausalityBarrier::default()
}
pub fn ingest(&mut self, op: T) -> Option<T> {
let v = self.peers.entry(op.dot().actor).or_default();
if v.is_ready(op.dot().counter) {
return None;
}
v.increment(op.dot().counter);
match op.happens_after() {
Some(dot) => {
if !self.saw_site_dot(&dot) {
self.buffer.insert(dot, op);
None
} else {
Some(op)
}
}
None => {
match self.buffer.remove(&op.dot()) {
Some(_) => None,
None => Some(op),
}
}
}
}
fn saw_site_dot(&self, dot: &Dot<A>) -> bool {
match self.peers.get(&dot.actor) {
Some(ent) => ent.is_ready(dot.counter),
None => false,
}
}
pub fn expel(&mut self, op: T) -> T {
let v = self.peers.entry(op.dot().actor).or_default();
v.increment(op.dot().counter);
op
}
pub fn diff_from(&self, other: &HashMap<A, VectorEntry>) -> HashMap<A, HashSet<LogTime>> {
let mut ret = HashMap::new();
for (site_id, entry) in self.peers.iter() {
let e_diff = match other.get(site_id) {
Some(remote_entry) => entry.diff_from(remote_entry),
None => (0..entry.next_version).collect(),
};
ret.insert(site_id.clone(), e_diff);
}
ret
}
pub fn vvwe(&self) -> HashMap<A, VectorEntry> {
self.peers.clone()
}
}
#[cfg(test)]
mod test {
use super::*;
type SiteId = u32;
#[derive(PartialEq, Debug, Hash, Clone)]
enum Op {
Insert(u64),
Delete(SiteId, LogTime),
}
#[derive(PartialEq, Debug, Hash, Clone)]
pub struct CausalMessage {
time: LogTime,
local_id: SiteId,
op: Op,
}
impl CausalOp<SiteId> for CausalMessage {
fn happens_after(&self) -> Option<Dot<SiteId>> {
match self.op {
Op::Insert(_) => None,
Op::Delete(s, l) => Some(Dot::new(s, l)),
}
}
fn dot(&self) -> Dot<SiteId> {
Dot::new(self.local_id, self.time)
}
}
#[test]
fn delete_before_insert() {
let mut barrier = CausalityBarrier::new();
let ins = CausalMessage {
time: 0,
local_id: 1,
op: Op::Insert(0),
};
let del = CausalMessage {
time: 1,
local_id: 1,
op: Op::Delete(1, 0),
};
assert_eq!(barrier.ingest(ins.clone()), Some(ins));
assert_eq!(barrier.ingest(del.clone()), Some(del));
}
#[test]
fn out_of_order() {
let mut barrier = CausalityBarrier::new();
let ins = CausalMessage {
time: 0,
local_id: 1,
op: Op::Insert(0),
};
let del = CausalMessage {
time: 1,
local_id: 1,
op: Op::Delete(1, 0),
};
assert_eq!(barrier.ingest(del), None);
assert_eq!(barrier.ingest(ins), None);
}
#[test]
fn insert() {
let mut barrier = CausalityBarrier::new();
let ins = CausalMessage {
time: 1,
local_id: 1,
op: Op::Insert(0),
};
assert_eq!(barrier.ingest(ins.clone()), Some(ins.clone()));
}
#[test]
fn insert_then_delete() {
let mut barrier = CausalityBarrier::new();
let ins = CausalMessage {
time: 0,
local_id: 1,
op: Op::Insert(0),
};
let del = CausalMessage {
time: 1,
local_id: 1,
op: Op::Delete(1, 1),
};
assert_eq!(barrier.ingest(ins.clone()), Some(ins));
assert_eq!(barrier.ingest(del.clone()), Some(del));
}
#[test]
fn delete_before_insert_multiple_sites() {
let mut barrier = CausalityBarrier::new();
let del = CausalMessage {
time: 0,
local_id: 2,
op: Op::Delete(1, 5),
};
let ins = CausalMessage {
time: 5,
local_id: 1,
op: Op::Insert(0),
};
assert_eq!(barrier.ingest(del), None);
assert_eq!(barrier.ingest(ins), None);
}
#[test]
fn entry_diff_new_entries() {
let a = VectorEntry::new();
let b = VectorEntry {
next_version: 10,
exceptions: HashSet::new(),
};
let c: HashSet<LogTime> = (0..10).into_iter().collect();
assert_eq!(b.diff_from(&a), c);
}
#[test]
fn entry_diff_found_exceptions() {
let a = VectorEntry {
next_version: 10,
exceptions: [1, 2, 3, 4].iter().cloned().collect(),
};
let b = VectorEntry {
next_version: 5,
exceptions: HashSet::new(),
};
let c: HashSet<LogTime> = [1, 2, 3, 4].iter().cloned().collect();
assert_eq!(b.diff_from(&a), c);
}
#[test]
fn entry_diff_complex() {
let a = VectorEntry {
next_version: 6,
exceptions: [1, 2, 3, 4].iter().cloned().collect(),
};
let b = VectorEntry {
next_version: 9,
exceptions: [2, 3, 4].iter().cloned().collect(),
};
let c: HashSet<LogTime> = [1, 6, 7, 8].iter().cloned().collect();
assert_eq!(b.diff_from(&a), c);
}
}