convergent 0.1.0

Lightweight, composable CRDTs for decentralized systems
Documentation
//! Grow-only counter (G-Counter).
//!
//! Each node maintains its own counter. The total value is the sum
//! of all node counters. Merging takes the maximum of each node's count,
//! which guarantees convergence without coordination.

use std::collections::HashMap;
use std::hash::Hash;

use crate::Merge;

/// A grow-only distributed counter.
///
/// Each replica is identified by a `node_id` of type `K`.
/// Increments are local; the global value is obtained by summing
/// all per-node counts after merging.
///
/// # Example
///
/// ```rust
/// use convergent::{GCounter, Merge};
///
/// let mut node_a = GCounter::new("a");
/// let mut node_b = GCounter::new("b");
///
/// node_a.increment(3);
/// node_b.increment(5);
///
/// node_a.merge(&node_b);
/// assert_eq!(node_a.value(), 8);
/// ```

#[derive(Clone)]
pub struct GCounter<K: Eq + Hash + Clone> {
    node_id: K,
    counts: HashMap<K, u64>,
}

impl<K: Eq + Hash + Clone> GCounter<K> {
    /// Create a new G-Counter for the given node.
    pub fn new(node_id: K) -> Self {
        Self {
            node_id,
            counts: HashMap::new(),
        }
    }

    /// Increment this node's counter by `amount`.
    pub fn increment(&mut self, amount: u64) {
        let entry = self.counts.entry(self.node_id.clone()).or_insert(0);
        *entry += amount;
    }

    /// Return the total value across all known nodes.
    pub fn value(&self) -> u64 {
        self.counts.values().sum()
    }
}

impl<K: Eq + Hash + Clone> Merge for GCounter<K> {
    /// Merge another G-Counter into this one.
    ///
    /// For each node, the maximum count is kept.
    /// This operation is commutative, associative, and idempotent.
    fn merge(&mut self, other: &Self) {
        for (node, &count) in &other.counts {
            let entry = self.counts.entry(node.clone()).or_insert(0);
            *entry = (*entry).max(count);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_counter_is_zero() {
        let counter: GCounter<&str> = GCounter::new("a");
        assert_eq!(counter.value(), 0);
    }

    #[test]
    fn test_increment() {
        let mut counter = GCounter::new("a");
        counter.increment(5);
        assert_eq!(counter.value(), 5);
        counter.increment(3);
        assert_eq!(counter.value(), 8);
    }

    #[test]
    fn test_merge_two_nodes() {
        let mut a = GCounter::new("a");
        let mut b = GCounter::new("b");

        a.increment(3);
        b.increment(7);

        a.merge(&b);
        assert_eq!(a.value(), 10);
    }

    #[test]
    fn test_merge_is_commutative() {
        let mut a = GCounter::new("a");
        let mut b = GCounter::new("b");

        a.increment(3);
        b.increment(7);

        let mut a2 = a.clone();
        let mut b2 = b.clone();

        a2.merge(&b);
        b2.merge(&a);
        assert_eq!(a2.value(), b2.value());
    }

    #[test]
    fn test_merge_is_idempotent() {
        let mut a = GCounter::new("a");
        let mut b = GCounter::new("b");

        a.increment(3);
        b.increment(7);

        a.merge(&b);
        let value_after_first = a.value();
        a.merge(&b);
        assert_eq!(a.value(), value_after_first);
    }

    #[test]
    fn test_merge_is_associative() {
        let mut a = GCounter::new("a");
        let mut b = GCounter::new("b");
        let mut c = GCounter::new("c");

        a.increment(1);
        b.increment(2);
        c.increment(3);

        // (a merge b) merge c
        let mut ab_c = a.clone();
        ab_c.merge(&b);
        ab_c.merge(&c);

        // a merge (b merge c)
        let mut a_bc = a.clone();
        let mut bc = b.clone();
        bc.merge(&c);
        a_bc.merge(&bc);

        assert_eq!(ab_c.value(), a_bc.value());
    }
}