convergent 0.1.0

Lightweight, composable CRDTs for decentralized systems
Documentation
//! Last-Writer-Wins Register (LWW-Register).
//!
//! A register that holds a single value. When two nodes write concurrently,
//! the write with the most recent HLC timestamp wins.
//! Uses [`hlc_id`] for causally-ordered timestamps.

use chrono::Utc;
use hlc_id::clock::HybridLogicalClock;
use hlc_id::id::HLCId;

use crate::Merge;

/// A distributed register where the last writer wins.
///
/// Each write is tagged with an HLC timestamp from [`hlc_id`].
/// On merge, the value with the most recent timestamp is kept.
///
/// # Example
///
/// ```rust
/// use convergent::{LWWRegister, Merge};
///
/// let mut node_a = LWWRegister::new(1);
/// let mut node_b = LWWRegister::new(2);
///
/// node_a.set("hello");
/// node_b.set("world");
///
/// node_a.merge(&node_b);
/// // The most recent write wins
/// assert!(node_a.get().is_some());
/// ```
#[derive(Clone)]
pub struct LWWRegister<V: Clone> {
    value: Option<V>,
    timestamp: Option<HLCId>,
    clock: HybridLogicalClock,
}

impl<V: Clone> LWWRegister<V> {
    /// Create a new empty register for the given node.
    pub fn new(node_id: u16) -> Self {
        Self {
            value: None,
            timestamp: None,
            clock: HybridLogicalClock::new(node_id),
        }
    }

    /// Write a value to the register, generating a new HLC timestamp.
    pub fn set(&mut self, value: V) {
        let ts = Utc::now().timestamp_millis() as u64;
        let id = HLCId::generate(&mut self.clock, ts);
        self.value = Some(value);
        self.timestamp = Some(id);
    }

    /// Read the current value, if any.
    pub fn get(&self) -> Option<&V> {
        self.value.as_ref()
    }
}

impl<V: Clone> Merge for LWWRegister<V> {
    /// Merge another register into this one.
    ///
    /// The value with the most recent HLC timestamp is kept.
    /// This operation is commutative, associative, and idempotent.
    fn merge(&mut self, other: &Self) {
        match (&self.timestamp, &other.timestamp) {
            (None, Some(_)) => {
                self.value = other.value.clone();
                self.timestamp = other.timestamp;
            }
            (Some(mine), Some(theirs)) => {
                if mine.is_before(theirs) {
                    self.value = other.value.clone();
                    self.timestamp = other.timestamp;
                }
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_register_is_empty() {
        let reg: LWWRegister<&str> = LWWRegister::new(1);
        assert_eq!(reg.get(), None);
    }

    #[test]
    fn test_set_and_get() {
        let mut reg = LWWRegister::new(1);
        reg.set("hello");
        assert_eq!(reg.get(), Some(&"hello"));
    }

    #[test]
    fn test_merge_empty_with_value() {
        let mut a: LWWRegister<&str> = LWWRegister::new(1);
        let mut b = LWWRegister::new(2);
        b.set("hello");

        a.merge(&b);
        assert_eq!(a.get(), Some(&"hello"));
    }

    #[test]
    fn test_last_writer_wins() {
        let mut a = LWWRegister::new(1);
        let mut b = LWWRegister::new(2);

        a.set("first");
        std::thread::sleep(std::time::Duration::from_millis(2));
        b.set("second");

        a.merge(&b);
        assert_eq!(a.get(), Some(&"second"));
    }

    #[test]
    fn test_merge_is_idempotent() {
        let mut a = LWWRegister::new(1);
        let mut b = LWWRegister::new(2);

        a.set("hello");
        b.set("world");

        a.merge(&b);
        let value_after_first = a.get().cloned();
        a.merge(&b);
        assert_eq!(a.get().cloned(), value_after_first);
    }
}