use num::bigint::BigInt;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use crate::traits::{CmRDT, CvRDT, ResetRemove};
use crate::{Dot, GCounter, VClock};
#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)]
pub struct PNCounter<A: Ord> {
p: GCounter<A>,
n: GCounter<A>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Dir {
Pos,
Neg,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Op<A: Ord> {
pub dot: Dot<A>,
pub dir: Dir,
}
impl<A: Ord> Default for PNCounter<A> {
fn default() -> Self {
Self {
p: Default::default(),
n: Default::default(),
}
}
}
impl<A: Ord + Clone + Debug> CmRDT for PNCounter<A> {
type Op = Op<A>;
type Validation = <GCounter<A> as CmRDT>::Validation;
fn validate_op(&self, op: &Self::Op) -> Result<(), Self::Validation> {
match op {
Op { dot, dir: Dir::Pos } => self.p.validate_op(dot),
Op { dot, dir: Dir::Neg } => self.n.validate_op(dot),
}
}
fn apply(&mut self, op: Self::Op) {
match op {
Op { dot, dir: Dir::Pos } => self.p.apply(dot),
Op { dot, dir: Dir::Neg } => self.n.apply(dot),
}
}
}
impl<A: Ord + Clone + Debug> CvRDT for PNCounter<A> {
type Validation = <GCounter<A> as CvRDT>::Validation;
fn validate_merge(&self, other: &Self) -> Result<(), Self::Validation> {
self.p.validate_merge(&other.p)?;
self.n.validate_merge(&other.n)
}
fn merge(&mut self, other: Self) {
self.p.merge(other.p);
self.n.merge(other.n);
}
}
impl<A: Ord> ResetRemove<A> for PNCounter<A> {
fn reset_remove(&mut self, clock: &VClock<A>) {
self.p.reset_remove(clock);
self.n.reset_remove(clock);
}
}
impl<A: Ord + Clone> PNCounter<A> {
pub fn new() -> Self {
Default::default()
}
pub fn inc(&self, actor: A) -> Op<A> {
Op {
dot: self.p.inc(actor),
dir: Dir::Pos,
}
}
pub fn dec(&self, actor: A) -> Op<A> {
Op {
dot: self.n.inc(actor),
dir: Dir::Neg,
}
}
pub fn inc_many(&self, actor: A, steps: u64) -> Op<A> {
Op {
dot: self.p.inc_many(actor, steps),
dir: Dir::Pos,
}
}
pub fn dec_many(&self, actor: A, steps: u64) -> Op<A> {
Op {
dot: self.n.inc_many(actor, steps),
dir: Dir::Neg,
}
}
pub fn read(&self) -> BigInt {
let p: BigInt = self.p.read().into();
let n: BigInt = self.n.read().into();
p - n
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_basic_by_one() {
let mut a = PNCounter::new();
assert_eq!(a.read(), 0.into());
a.apply(a.inc("A"));
assert_eq!(a.read(), 1.into());
a.apply(a.inc("A"));
assert_eq!(a.read(), 2.into());
a.apply(a.dec("A"));
assert_eq!(a.read(), 1.into());
a.apply(a.inc("A"));
assert_eq!(a.read(), 2.into());
}
#[test]
fn test_basic_by_many() {
let mut a = PNCounter::new();
assert_eq!(a.read(), 0.into());
let steps = 3;
a.apply(a.inc_many("A", steps));
assert_eq!(a.read(), steps.into());
a.apply(a.inc_many("A", steps));
assert_eq!(a.read(), (2 * steps).into());
a.apply(a.dec_many("A", steps));
assert_eq!(a.read(), steps.into());
a.apply(a.inc_many("A", 1));
assert_eq!(a.read(), (1 + steps).into());
}
#[cfg(feature = "quickcheck")]
mod prop_tests {
use super::*;
use std::collections::BTreeSet;
use quickcheck_macros::quickcheck;
const ACTOR_MAX: u8 = 11;
#[quickcheck]
fn prop_merge_converges(op_prims: Vec<(u8, u64, bool)>) -> bool {
let ops: Vec<Op<u8>> = op_prims.into_iter().map(build_op).collect();
let mut results = BTreeSet::new();
for i in 2..ACTOR_MAX {
let mut witnesses: Vec<PNCounter<u8>> = (0..i).map(|_| PNCounter::new()).collect();
for op in ops.iter() {
let index = op.dot.actor as usize % i as usize;
let witness = &mut witnesses[index];
witness.apply(op.clone());
}
let mut merged = PNCounter::new();
for witness in witnesses.iter() {
merged.merge(witness.clone());
}
results.insert(merged.read());
if results.len() > 1 {
println!("opvec: {:?}", ops);
println!("results: {:?}", results);
println!("witnesses: {:?}", &witnesses);
println!("merged: {:?}", merged);
}
}
results.len() == 1
}
fn build_op(prims: (u8, u64, bool)) -> Op<u8> {
let (actor, counter, dir_choice) = prims;
Op {
dot: Dot { actor, counter },
dir: if dir_choice { Dir::Pos } else { Dir::Neg },
}
}
}
}