use core::cmp::{self, Ordering};
use core::convert::Infallible;
use core::fmt::{self, Debug, Display};
use core::mem;
use std::collections::{btree_map, BTreeMap};
use serde::{Deserialize, Serialize};
use crate::{CmRDT, CvRDT, Dot, DotRange, ResetRemove};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct VClock<A: Ord> {
pub dots: BTreeMap<A, u64>,
}
impl<A: Ord> Default for VClock<A> {
fn default() -> Self {
Self {
dots: BTreeMap::new(),
}
}
}
impl<A: Ord> PartialOrd for VClock<A> {
fn partial_cmp(&self, other: &VClock<A>) -> Option<Ordering> {
if self == other {
Some(Ordering::Equal)
} else if other.dots.iter().all(|(w, c)| self.get(w) >= *c) {
Some(Ordering::Greater)
} else if self.dots.iter().all(|(w, c)| other.get(w) >= *c) {
Some(Ordering::Less)
} else {
None
}
}
}
impl<A: Ord + Display> Display for VClock<A> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "<")?;
for (i, (actor, count)) in self.dots.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}:{}", actor, count)?;
}
write!(f, ">")
}
}
impl<A: Ord> ResetRemove<A> for VClock<A> {
fn reset_remove(&mut self, other: &Self) {
for Dot { actor, counter } in other.iter() {
if counter >= self.get(actor) {
self.dots.remove(actor);
}
}
}
}
impl<A: Ord + Clone + Debug> CmRDT for VClock<A> {
type Op = Dot<A>;
type Validation = DotRange<A>;
fn validate_op(&self, dot: &Self::Op) -> Result<(), Self::Validation> {
let next_counter = self.get(&dot.actor) + 1;
if dot.counter > next_counter {
Err(DotRange {
actor: dot.actor.clone(),
counter_range: next_counter..dot.counter,
})
} else {
Ok(())
}
}
fn apply(&mut self, dot: Self::Op) {
if self.get(&dot.actor) < dot.counter {
self.dots.insert(dot.actor, dot.counter);
}
}
}
impl<A: Ord + Clone + Debug> CvRDT for VClock<A> {
type Validation = Infallible;
fn validate_merge(&self, _other: &Self) -> Result<(), Self::Validation> {
Ok(())
}
fn merge(&mut self, other: Self) {
for dot in other.into_iter() {
self.apply(dot);
}
}
}
impl<A: Ord> VClock<A> {
pub fn new() -> Self {
Default::default()
}
pub fn clone_without(&self, base_clock: &VClock<A>) -> VClock<A>
where
A: Clone,
{
let mut cloned = self.clone();
cloned.reset_remove(base_clock);
cloned
}
pub fn inc(&self, actor: A) -> Dot<A>
where
A: Clone,
{
self.dot(actor).inc()
}
pub fn get(&self, actor: &A) -> u64 {
self.dots.get(actor).cloned().unwrap_or(0)
}
pub fn dot(&self, actor: A) -> Dot<A> {
let counter = self.get(&actor);
Dot::new(actor, counter)
}
pub fn concurrent(&self, other: &VClock<A>) -> bool {
self.partial_cmp(other).is_none()
}
pub fn is_empty(&self) -> bool {
self.dots.is_empty()
}
pub fn intersection(left: &VClock<A>, right: &VClock<A>) -> VClock<A>
where
A: Clone,
{
let mut dots = BTreeMap::new();
for (left_actor, left_counter) in left.dots.iter() {
let right_counter = right.get(left_actor);
if right_counter == *left_counter {
dots.insert(left_actor.clone(), *left_counter);
}
}
Self { dots }
}
pub fn glb(&mut self, other: &Self) {
self.dots = mem::take(&mut self.dots)
.into_iter()
.filter_map(|(actor, count)| {
let min_count = cmp::min(count, other.get(&actor));
match min_count {
0 => None,
_ => Some((actor, min_count)),
}
})
.collect();
}
pub fn iter(&self) -> impl Iterator<Item = Dot<&A>> {
self.dots.iter().map(|(a, c)| Dot {
actor: a,
counter: *c,
})
}
}
pub struct IntoIter<A: Ord> {
btree_iter: btree_map::IntoIter<A, u64>,
}
impl<A: Ord> std::iter::Iterator for IntoIter<A> {
type Item = Dot<A>;
fn next(&mut self) -> Option<Dot<A>> {
self.btree_iter
.next()
.map(|(actor, counter)| Dot::new(actor, counter))
}
}
impl<A: Ord> std::iter::IntoIterator for VClock<A> {
type Item = Dot<A>;
type IntoIter = IntoIter<A>;
fn into_iter(self) -> Self::IntoIter {
IntoIter {
btree_iter: self.dots.into_iter(),
}
}
}
impl<A: Ord + Clone + Debug> std::iter::FromIterator<Dot<A>> for VClock<A> {
fn from_iter<I: IntoIterator<Item = Dot<A>>>(iter: I) -> Self {
let mut clock = VClock::default();
for dot in iter {
clock.apply(dot);
}
clock
}
}
impl<A: Ord + Clone + Debug> From<Dot<A>> for VClock<A> {
fn from(dot: Dot<A>) -> Self {
let mut clock = VClock::default();
clock.apply(dot);
clock
}
}
#[cfg(feature = "quickcheck")]
use quickcheck::{Arbitrary, Gen};
#[cfg(feature = "quickcheck")]
impl<A: Ord + Clone + Debug + Arbitrary> Arbitrary for VClock<A> {
fn arbitrary(g: &mut Gen) -> Self {
let mut clock = VClock::default();
for _ in 0..u8::arbitrary(g) % 10 {
clock.apply(Dot::arbitrary(g));
}
clock
}
fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
let mut shrunk_clocks = Vec::default();
for dot in self.clone().into_iter() {
let clock_without_dot: Self = self.clone().into_iter().filter(|d| d != &dot).collect();
for shrunk_dot in dot.shrink() {
let mut clock = clock_without_dot.clone();
clock.apply(shrunk_dot);
shrunk_clocks.push(clock);
}
shrunk_clocks.push(clock_without_dot);
}
Box::new(shrunk_clocks.into_iter())
}
}