Skip to main content

oxide_mesh/
crdt.rs

1//! Tiny CRDT primitives for federated state.
2//!
3//! Three building blocks ship today, all eventually-consistent under the
4//! standard CRDT assumptions (commutative + idempotent merges):
5//!
6//! * [`GSet`] — grow-only set. Adds win; deletes are out of scope.
7//! * [`LwwRegister`] — last-write-wins register. Conflicts resolved by
8//!   `(timestamp_micros, peer_id)` so concurrent writes with the same
9//!   timestamp are still deterministic.
10//! * [`PnCounter`] — positive/negative counter. Each peer tracks its own
11//!   increments and decrements; the value is `sum(p) - sum(n)`.
12//!
13//! All three are `Serialize`/`Deserialize` so they ride [`PeerMessage`]
14//! payloads naturally.
15
16use std::collections::{BTreeSet, HashMap};
17
18use serde::{Deserialize, Serialize};
19
20use crate::message::PeerId;
21
22/// Grow-only set.
23#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
24pub struct GSet<T: Ord> {
25    /// Underlying storage. Public so callers can iterate cheaply.
26    pub items: BTreeSet<T>,
27}
28
29impl<T: Ord + Clone> GSet<T> {
30    /// Build an empty set.
31    pub fn new() -> Self {
32        Self {
33            items: BTreeSet::new(),
34        }
35    }
36
37    /// Insert one element.
38    pub fn add(&mut self, item: T) {
39        self.items.insert(item);
40    }
41
42    /// Merge another set in-place. The CRDT merge is the set union.
43    pub fn merge(&mut self, other: &Self) {
44        for v in &other.items {
45            self.items.insert(v.clone());
46        }
47    }
48
49    /// Element count.
50    pub fn len(&self) -> usize {
51        self.items.len()
52    }
53
54    /// `true` when empty.
55    pub fn is_empty(&self) -> bool {
56        self.items.is_empty()
57    }
58}
59
60/// Last-write-wins register.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct LwwRegister<V> {
63    /// Most-recent value.
64    pub value: V,
65    /// Logical timestamp (caller-supplied — typically wall-clock micros).
66    pub timestamp: u64,
67    /// Peer id that wrote `value`. Used as a deterministic tie-breaker.
68    pub author: PeerId,
69}
70
71impl<V: Clone> LwwRegister<V> {
72    /// Build a register seeded with `value` written by `author` at
73    /// `timestamp`.
74    pub fn new(value: V, author: impl Into<PeerId>, timestamp: u64) -> Self {
75        Self {
76            value,
77            timestamp,
78            author: author.into(),
79        }
80    }
81
82    /// Submit a write; later timestamps win, ties broken by peer id.
83    pub fn write(&mut self, value: V, author: impl Into<PeerId>, timestamp: u64) {
84        let author = author.into();
85        if (timestamp, author.as_str()) > (self.timestamp, self.author.as_str()) {
86            self.value = value;
87            self.timestamp = timestamp;
88            self.author = author;
89        }
90    }
91
92    /// Merge another register in-place. Picks the higher
93    /// `(timestamp, author)` pair.
94    pub fn merge(&mut self, other: &Self) {
95        if (other.timestamp, other.author.as_str()) > (self.timestamp, self.author.as_str()) {
96            self.value = other.value.clone();
97            self.timestamp = other.timestamp;
98            self.author = other.author.clone();
99        }
100    }
101}
102
103/// PN-counter (positive/negative counter).
104#[derive(Debug, Clone, Default, Serialize, Deserialize)]
105pub struct PnCounter {
106    /// Per-peer positive increments.
107    pub p: HashMap<PeerId, u64>,
108    /// Per-peer negative decrements.
109    pub n: HashMap<PeerId, u64>,
110}
111
112impl PnCounter {
113    /// Build an empty counter.
114    pub fn new() -> Self {
115        Self::default()
116    }
117
118    /// Increment by `amount` (signed) attributed to `peer`. Each peer is the
119    /// authoritative writer for its own slot so concurrent writes commute.
120    pub fn change(&mut self, peer: impl Into<PeerId>, amount: i64) {
121        let peer = peer.into();
122        if amount >= 0 {
123            *self.p.entry(peer).or_default() += amount as u64;
124        } else {
125            *self.n.entry(peer).or_default() += amount.unsigned_abs();
126        }
127    }
128
129    /// Current value.
130    pub fn value(&self) -> i64 {
131        let pos: u64 = self.p.values().sum();
132        let neg: u64 = self.n.values().sum();
133        pos as i64 - neg as i64
134    }
135
136    /// Merge another counter in-place. Each (peer, side) slot takes the max
137    /// of the two values — the standard PN-counter merge.
138    pub fn merge(&mut self, other: &Self) {
139        for (k, v) in &other.p {
140            let slot = self.p.entry(k.clone()).or_default();
141            if *v > *slot {
142                *slot = *v;
143            }
144        }
145        for (k, v) in &other.n {
146            let slot = self.n.entry(k.clone()).or_default();
147            if *v > *slot {
148                *slot = *v;
149            }
150        }
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn gset_merge_is_commutative_and_idempotent() {
160        let mut a: GSet<i32> = GSet::new();
161        let mut b: GSet<i32> = GSet::new();
162        a.add(1);
163        a.add(2);
164        b.add(2);
165        b.add(3);
166
167        let mut ab = a.clone();
168        ab.merge(&b);
169        let mut ba = b.clone();
170        ba.merge(&a);
171        assert_eq!(ab.items, ba.items);
172        assert_eq!(ab.items, [1, 2, 3].into_iter().collect());
173
174        // Idempotent.
175        let before = ab.clone();
176        ab.merge(&before);
177        assert_eq!(ab.items, before.items);
178    }
179
180    #[test]
181    fn lww_register_resolves_by_timestamp_then_author() {
182        let mut reg = LwwRegister::new("v1".to_string(), "a", 10);
183        reg.write("v2".to_string(), "b", 5);
184        assert_eq!(reg.value, "v1", "older write must lose");
185        reg.write("v3".to_string(), "c", 10);
186        assert_eq!(reg.value, "v3", "tie broken by author > a");
187        reg.write("v4".to_string(), "z", 20);
188        assert_eq!(reg.value, "v4");
189
190        // Merge symmetry.
191        let mut r1: LwwRegister<String> = LwwRegister::new("x".to_string(), "a", 1u64);
192        let mut r2: LwwRegister<String> = LwwRegister::new("y".to_string(), "b", 2u64);
193        let r2_copy = r2.clone();
194        r1.merge(&r2);
195        r2.merge(&r1);
196        assert_eq!(r1.value, r2_copy.value);
197        assert_eq!(r1.value, "y");
198    }
199
200    #[test]
201    fn pn_counter_commutes_and_merges_via_max() {
202        let mut c = PnCounter::new();
203        c.change("a", 5);
204        c.change("b", 3);
205        c.change("a", -2);
206        assert_eq!(c.value(), 6);
207
208        let mut other = PnCounter::new();
209        other.change("a", 10);
210        let merged_value = {
211            let mut copy = c.clone();
212            copy.merge(&other);
213            copy.value()
214        };
215        // `a` positive slot was 5, now max(5, 10) = 10. b stays at 3. a
216        // negative slot stays at 2. Final = 10 + 3 - 2 = 11.
217        assert_eq!(merged_value, 11);
218    }
219}