convergent 0.1.0

Lightweight, composable CRDTs for decentralized systems
Documentation
//! Positive-Negative counter (PN-Counter).
//!
//! A counter that supports both increment and decrement operations.
//! Internally composed of two G-Counters: one for increments, one for decrements.
//! The value is the difference between the two.

use crate::gcounter::GCounter;
use crate::Merge;

/// A distributed counter that supports increment and decrement.
///
/// Composed of two [`GCounter`]s: `p` tracks increments, `n` tracks decrements.
/// The value is `p - n`.
///
/// # Example
///
/// ```rust
/// use convergent::PNCounter;
///
/// let mut counter = PNCounter::new("a");
/// counter.increment(10);
/// counter.decrement(3);
/// assert_eq!(counter.value(), 7);
/// ```
#[derive(Clone)]
pub struct PNCounter<K: Eq + std::hash::Hash + Clone> {
    p: GCounter<K>,
    n: GCounter<K>,
}

impl<K: Eq + std::hash::Hash + Clone> PNCounter<K> {
    /// Create a new PN-Counter for the given node.
    pub fn new(node_id: K) -> Self {
        let cloned = node_id.clone();
        Self {
            p: GCounter::new(node_id),
            n: GCounter::new(cloned),
        }
    }

    /// Increment the counter by `amount`.
    pub fn increment(&mut self, amount: u64) {
        self.p.increment(amount);
    }

    /// Decrement the counter by `amount`.
    pub fn decrement(&mut self, amount: u64) {
        self.n.increment(amount);
    }

    /// Return the current value (can be negative).
    pub fn value(&self) -> i64 {
        self.p.value() as i64 - self.n.value() as i64
    }
}

impl<K: Eq + std::hash::Hash + Clone> Merge for PNCounter<K> {
    /// Merge another PN-Counter into this one.
    ///
    /// Merges both internal G-Counters independently.
    fn merge(&mut self, other: &Self) {
        self.p.merge(&other.p);
        self.n.merge(&other.n);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_increment_and_decrement() {
        let mut counter = PNCounter::new("a");
        counter.increment(10);
        counter.decrement(3);
        assert_eq!(counter.value(), 7);
    }

    #[test]
    fn test_can_go_negative() {
        let mut counter = PNCounter::new("a");
        counter.increment(3);
        counter.decrement(5);
        assert_eq!(counter.value(), -2);
    }

    #[test]
    fn test_merge_two_nodes() {
        let mut a = PNCounter::new("a");
        let mut b = PNCounter::new("b");

        a.increment(10);
        a.decrement(2);
        b.increment(5);
        b.decrement(1);

        a.merge(&b);
        assert_eq!(a.value(), 12); // (10 + 5) - (2 + 1)
    }

    #[test]
    fn test_merge_is_commutative() {
        let mut a = PNCounter::new("a");
        let mut b = PNCounter::new("b");

        a.increment(5);
        a.decrement(2);
        b.increment(3);

        let mut a2 = a.clone();
        let mut b2 = b.clone();

        a2.merge(&b);
        b2.merge(&a);
        assert_eq!(a2.value(), b2.value());
    }

    #[test]
    fn test_merge_is_idempotent() {
        let mut a = PNCounter::new("a");
        let mut b = PNCounter::new("b");

        a.increment(5);
        b.decrement(3);

        a.merge(&b);
        let value_after_first = a.value();
        a.merge(&b);
        assert_eq!(a.value(), value_after_first);
    }

    #[test]
    fn test_merge_is_associative() {
        let mut a = PNCounter::new("a");
        let mut b = PNCounter::new("b");
        let mut c = PNCounter::new("c");

        a.increment(5);
        b.decrement(2);
        c.increment(3);
        c.decrement(1);

        let mut ab_c = a.clone();
        ab_c.merge(&b);
        ab_c.merge(&c);

        let mut a_bc = a.clone();
        let mut bc = b.clone();
        bc.merge(&c);
        a_bc.merge(&bc);

        assert_eq!(ab_c.value(), a_bc.value());
    }
}