convergent 0.1.0

Lightweight, composable CRDTs for decentralized systems
Documentation
//! Observed-Remove Set (OR-Set).
//!
//! A set that supports both insert and remove operations across
//! distributed nodes. Each insertion is tagged with a unique HLC identifier.
//! Removal only affects tags that have been locally observed,
//! allowing concurrent additions to survive.
use chrono::Utc;
use hlc_id::{clock::HybridLogicalClock, id::HLCId};
use std::collections::{HashMap, HashSet};

use crate::Merge;

/// A distributed set with observed-remove semantics.
///
/// Each insertion generates a unique tag via [`hlc_id`].
/// Removing an element only removes the tags seen locally.
/// Unobserved tags from other nodes survive the removal after merge.
///
/// # Example
///
/// ```rust
/// use convergent::{ORSet, Merge};
///
/// let mut node_a = ORSet::new(1);
/// let mut node_b = ORSet::new(2);
///
/// node_a.insert("apple");
/// node_b.insert("banana");
///
/// node_a.merge(&node_b);
/// assert!(node_a.contains(&"apple"));
/// assert!(node_a.contains(&"banana"));
/// ```
#[derive(Clone)]
pub struct ORSet<V: Eq + std::hash::Hash + Clone> {
    elements: HashMap<V, HashSet<u128>>,
    clock: HybridLogicalClock,
}

//Constructor and methods for the OR-Set.
impl<V: Eq + std::hash::Hash + Clone> ORSet<V> {
    /// Create a new empty OR-Set for the given node.
    pub fn new(node_id: u16) -> Self {
        Self {
            elements: HashMap::new(),
            clock: HybridLogicalClock::new(node_id),
        }
    }

    //Insert an element into the set, generating a new HLC timestamp.
    pub fn insert(&mut self, value: V) {
        let ts = Utc::now().timestamp_millis() as u64;
        let tag = HLCId::generate(&mut self.clock, ts);
        self.elements
            .entry(value)
            .or_default()
            .insert(tag.to_u128());
    }

    ///Remove an element from the set by generating a tombstone tag.
    pub fn remove(&mut self, value: &V) {
        self.elements.remove(value);
    }

    ///Check if an element is in the set.
    pub fn contains(&self, value: &V) -> bool {
        self.elements
            .get(value)
            .map(|tags| !tags.is_empty())
            .unwrap_or(false)
    }

    ///Return all current values in the set.
    pub fn values(&self) -> HashSet<V> {
        self.elements
            .iter()
            .filter(|(_, tags)| !tags.is_empty())
            .map(|(value, _)| value.clone())
            .collect()
    }
}

impl<V: Eq + std::hash::Hash + Clone> Merge for ORSet<V> {
    /// Merge another OR-Set into this one.
    ///
    /// Tags are unioned. Unobserved additions survive removals.
    fn merge(&mut self, other: &Self) {
        for (value, other_tags) in &other.elements {
            let my_tags = self.elements.entry(value.clone()).or_default();
            for tag in other_tags {
                my_tags.insert(*tag);
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_set_is_empty() {
        let set: ORSet<&str> = ORSet::new(1);
        assert!(!set.contains(&"hello"));
        assert!(set.values().is_empty());
    }

    #[test]
    fn test_insert_and_contains() {
        let mut set = ORSet::new(1);
        set.insert("hello");
        assert!(set.contains(&"hello"));
        assert!(!set.contains(&"world"));
    }

    #[test]
    fn test_remove() {
        let mut set = ORSet::new(1);
        set.insert("hello");
        set.remove(&"hello");
        assert!(!set.contains(&"hello"));
    }

    #[test]
    fn test_add_wins_over_remove() {
        let mut a = ORSet::new(1);
        let mut b = ORSet::new(2);

        a.insert("hello");
        b.merge(&a);

        a.remove(&"hello");
        std::thread::sleep(std::time::Duration::from_millis(2));
        b.insert("hello");
        a.merge(&b);
        assert!(a.contains(&"hello"));
    }

    #[test]
    fn test_merge_is_commutative() {
        let mut a = ORSet::new(1);
        let mut b = ORSet::new(2);

        a.insert("x");
        b.insert("y");

        let mut a2 = a.clone();
        let mut b2 = b.clone();

        a2.merge(&b);
        b2.merge(&a);

        assert_eq!(a2.contains(&"x"), b2.contains(&"x"));
        assert_eq!(a2.contains(&"y"), b2.contains(&"y"));
    }

    #[test]
    fn test_merge_is_idempotent() {
        let mut a = ORSet::new(1);
        let mut b = ORSet::new(2);

        a.insert("x");
        b.insert("y");

        a.merge(&b);
        let count_after_first = a.values().len();
        a.merge(&b);
        assert_eq!(a.values().len(), count_after_first);
    }
}