Skip to main content

abyo_crdt/
map.rs

1//! Map CRDT — Last-Write-Wins keyed dictionary.
2//!
3//! Each `(key, value)` pair carries a Lamport-style timestamp. On merge, the entry
4//! with the highest timestamp wins per key. A delete is a tombstone with its
5//! own timestamp; if its timestamp dominates the latest set, the key is
6//! considered absent.
7//!
8//! ## Convergence
9//!
10//! Per-key resolution uses the `OpId` total order (counter, then replica),
11//! which is a strictly monotonic Lamport clock. Concurrent set+set on the
12//! same key resolves deterministically; concurrent set+remove resolves to
13//! whichever has the higher `OpId`.
14//!
15//! ## Quick start
16//!
17//! ```
18//! use abyo_crdt::Map;
19//!
20//! let mut alice: Map<String, i32> = Map::new(1);
21//! let mut bob: Map<String, i32> = Map::new(2);
22//!
23//! alice.set("score".to_string(), 10);
24//! bob.merge(&alice);
25//!
26//! // Concurrent edits to the same key.
27//! alice.set("score".to_string(), 20);
28//! bob.set("score".to_string(), 99);
29//!
30//! alice.merge(&bob);
31//! bob.merge(&alice);
32//!
33//! // Both replicas converge — Lamport tiebreaker decides which value wins.
34//! assert_eq!(alice.get(&"score".to_string()), bob.get(&"score".to_string()));
35//! ```
36
37use crate::{
38    error::Error,
39    id::{OpId, ReplicaId},
40    version::VersionVector,
41};
42use std::collections::HashMap;
43use std::hash::Hash;
44
45#[cfg(feature = "serde")]
46use serde::{Deserialize, Serialize};
47
48// ---------------------------------------------------------------------------
49// Public op type
50// ---------------------------------------------------------------------------
51
52/// A single [`Map`] CRDT operation.
53#[derive(Clone, Debug, PartialEq, Eq)]
54#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
55pub enum MapOp<K, V> {
56    /// Set `key` to `value`.
57    Set {
58        /// Op id (also serves as the Lamport-style timestamp for conflict resolution).
59        id: OpId,
60        /// Key being set.
61        key: K,
62        /// Value being written.
63        value: V,
64    },
65    /// Remove `key`.
66    Remove {
67        /// Op id.
68        id: OpId,
69        /// Key being removed.
70        key: K,
71    },
72}
73
74impl<K, V> MapOp<K, V> {
75    /// The id of this op.
76    #[must_use]
77    pub fn id(&self) -> OpId {
78        match self {
79            MapOp::Set { id, .. } | MapOp::Remove { id, .. } => *id,
80        }
81    }
82
83    /// The key this op affects.
84    #[must_use]
85    pub fn key(&self) -> &K {
86        match self {
87            MapOp::Set { key, .. } | MapOp::Remove { key, .. } => key,
88        }
89    }
90}
91
92// ---------------------------------------------------------------------------
93// Internal entry
94// ---------------------------------------------------------------------------
95
96#[derive(Clone, Debug)]
97#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
98struct Entry<V> {
99    /// Op id of the op that "wrote" this entry. The op with the highest
100    /// `op_id` for a given key wins.
101    op_id: OpId,
102    /// `Some(v)` if the latest op was a set; `None` if it was a remove.
103    value: Option<V>,
104}
105
106// ---------------------------------------------------------------------------
107// Map CRDT
108// ---------------------------------------------------------------------------
109
110/// LWW-Map CRDT. See the module docs for semantics.
111#[derive(Clone, Debug)]
112pub struct Map<K: Eq + Hash + Clone, V: Clone> {
113    replica: ReplicaId,
114    clock: u64,
115    entries: HashMap<K, Entry<V>>,
116    log: Vec<MapOp<K, V>>,
117    version: VersionVector,
118}
119
120impl<K: Eq + Hash + Clone, V: Clone> Map<K, V> {
121    /// Create an empty map for the given replica.
122    #[must_use]
123    pub fn new(replica: ReplicaId) -> Self {
124        Self {
125            replica,
126            clock: 0,
127            entries: HashMap::new(),
128            log: Vec::new(),
129            version: VersionVector::new(),
130        }
131    }
132
133    /// Create a new instance with a random [`ReplicaId`] from OS entropy.
134    /// See [`crate::new_replica_id`].
135    #[must_use]
136    pub fn new_random() -> Self {
137        Self::new(crate::id::new_replica_id())
138    }
139
140    /// This replica's id.
141    #[must_use]
142    pub fn replica_id(&self) -> ReplicaId {
143        self.replica
144    }
145
146    /// Number of visible (non-tombstoned) entries.
147    #[must_use]
148    pub fn len(&self) -> usize {
149        self.entries.values().filter(|e| e.value.is_some()).count()
150    }
151
152    /// Are there any visible entries?
153    #[must_use]
154    pub fn is_empty(&self) -> bool {
155        self.len() == 0
156    }
157
158    /// Look up the value for `key`, or `None` if absent / tombstoned.
159    pub fn get(&self, key: &K) -> Option<&V> {
160        self.entries.get(key).and_then(|e| e.value.as_ref())
161    }
162
163    /// Iterate visible `(key, value)` pairs.
164    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> + '_ {
165        self.entries
166            .iter()
167            .filter_map(|(k, e)| e.value.as_ref().map(|v| (k, v)))
168    }
169
170    /// Iterate visible keys.
171    pub fn keys(&self) -> impl Iterator<Item = &K> + '_ {
172        self.iter().map(|(k, _)| k)
173    }
174
175    /// Iterate visible values.
176    pub fn values(&self) -> impl Iterator<Item = &V> + '_ {
177        self.iter().map(|(_, v)| v)
178    }
179
180    /// Does the map contain `key`?
181    pub fn contains_key(&self, key: &K) -> bool {
182        self.get(key).is_some()
183    }
184
185    /// Set `key` to `value`. Returns the generated [`MapOp`].
186    pub fn set(&mut self, key: K, value: V) -> MapOp<K, V> {
187        self.clock = self
188            .clock
189            .checked_add(1)
190            .expect("Lamport clock overflow (>2^64 ops)");
191        let id = OpId::new(self.clock, self.replica);
192        let op = MapOp::Set {
193            id,
194            key: key.clone(),
195            value: value.clone(),
196        };
197        self.upsert(id, key, Some(value));
198        self.version.observe(id);
199        self.log.push(op.clone());
200        op
201    }
202
203    /// Remove `key`. Always emits an op (even if the key was absent), so the
204    /// remove can be replicated.
205    pub fn remove(&mut self, key: K) -> MapOp<K, V> {
206        self.clock = self
207            .clock
208            .checked_add(1)
209            .expect("Lamport clock overflow (>2^64 ops)");
210        let id = OpId::new(self.clock, self.replica);
211        let op = MapOp::Remove {
212            id,
213            key: key.clone(),
214        };
215        self.upsert(id, key, None);
216        self.version.observe(id);
217        self.log.push(op.clone());
218        op
219    }
220
221    /// Apply a remote operation. Idempotent.
222    pub fn apply(&mut self, op: MapOp<K, V>) -> Result<(), Error> {
223        let op_id = op.id();
224        if self.version.contains(op_id) {
225            return Ok(());
226        }
227        match &op {
228            MapOp::Set { id, key, value } => {
229                self.upsert(*id, key.clone(), Some(value.clone()));
230            }
231            MapOp::Remove { id, key } => {
232                self.upsert(*id, key.clone(), None);
233            }
234        }
235        self.version.observe(op_id);
236        self.clock = self.clock.max(op_id.counter);
237        self.log.push(op);
238        Ok(())
239    }
240
241    /// Merge all of `other`'s state into `self`. Equivalent to applying
242    /// every op in `other.log` that we haven't seen, in `OpId` order.
243    pub fn merge(&mut self, other: &Self) {
244        let mut to_apply: Vec<&MapOp<K, V>> = other
245            .log
246            .iter()
247            .filter(|op| !self.version.contains(op.id()))
248            .collect();
249        to_apply.sort_by_key(|op| op.id());
250        for op in to_apply {
251            // Apply can never fail in a valid log replay.
252            self.apply(op.clone())
253                .expect("corrupt op log in merge source");
254        }
255    }
256
257    /// All ops observed by this replica, in observation order.
258    #[must_use]
259    pub fn ops(&self) -> &[MapOp<K, V>] {
260        &self.log
261    }
262
263    /// Iterate over ops not yet seen by `since`.
264    pub fn ops_since<'a>(
265        &'a self,
266        since: &'a VersionVector,
267    ) -> impl Iterator<Item = &'a MapOp<K, V>> + 'a {
268        self.log.iter().filter(move |op| !since.contains(op.id()))
269    }
270
271    /// This replica's current version vector.
272    #[must_use]
273    pub fn version(&self) -> &VersionVector {
274        &self.version
275    }
276
277    // -----------------------------------------------------------------------
278    // Internals
279    // -----------------------------------------------------------------------
280
281    /// LWW write: only updates the entry if `id` dominates the existing one.
282    fn upsert(&mut self, id: OpId, key: K, value: Option<V>) {
283        match self.entries.get_mut(&key) {
284            Some(entry) if id <= entry.op_id => {
285                // Existing entry has a higher (or equal) op_id — keep it.
286                // Equality only happens when the same op id arrives twice,
287                // which `apply` already guards via the version-vector check;
288                // included here for defense in depth.
289            }
290            Some(entry) => {
291                entry.op_id = id;
292                entry.value = value;
293            }
294            None => {
295                self.entries.insert(key, Entry { op_id: id, value });
296            }
297        }
298    }
299}
300
301impl<K: Eq + Hash + Clone, V: Clone> Default for Map<K, V> {
302    fn default() -> Self {
303        Self::new(0)
304    }
305}
306
307// ---------------------------------------------------------------------------
308// Serde
309// ---------------------------------------------------------------------------
310//
311// HashMap<K, _> with a non-string K (e.g. `(u64, u64)`) won't survive a JSON
312// round-trip. We serialize via a Vec<(K, Entry<V>)> snapshot.
313
314#[cfg(feature = "serde")]
315#[derive(Serialize, Deserialize)]
316struct MapSnapshot<K, V> {
317    replica: ReplicaId,
318    clock: u64,
319    entries: Vec<(K, Entry<V>)>,
320    version: VersionVector,
321    log: Vec<MapOp<K, V>>,
322}
323
324#[cfg(feature = "serde")]
325impl<K, V> Serialize for Map<K, V>
326where
327    K: Eq + Hash + Clone + Serialize,
328    V: Clone + Serialize,
329{
330    fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
331        let entries: Vec<(K, Entry<V>)> = self
332            .entries
333            .iter()
334            .map(|(k, v)| (k.clone(), v.clone()))
335            .collect();
336        let snap = MapSnapshot {
337            replica: self.replica,
338            clock: self.clock,
339            entries,
340            version: self.version.clone(),
341            log: self.log.clone(),
342        };
343        snap.serialize(ser)
344    }
345}
346
347#[cfg(feature = "serde")]
348impl<'de, K, V> Deserialize<'de> for Map<K, V>
349where
350    K: Eq + Hash + Clone + Deserialize<'de>,
351    V: Clone + Deserialize<'de>,
352{
353    fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
354        let snap = MapSnapshot::<K, V>::deserialize(de)?;
355        Ok(Map {
356            replica: snap.replica,
357            clock: snap.clock,
358            entries: snap.entries.into_iter().collect(),
359            version: snap.version,
360            log: snap.log,
361        })
362    }
363}
364
365// ---------------------------------------------------------------------------
366// Tests
367// ---------------------------------------------------------------------------
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn empty_map() {
375        let m: Map<String, i32> = Map::new(1);
376        assert!(m.is_empty());
377        assert_eq!(m.len(), 0);
378        assert_eq!(m.get(&"k".to_string()), None);
379    }
380
381    #[test]
382    fn set_and_get() {
383        let mut m: Map<String, i32> = Map::new(1);
384        m.set("a".into(), 1);
385        m.set("b".into(), 2);
386        assert_eq!(m.get(&"a".into()), Some(&1));
387        assert_eq!(m.get(&"b".into()), Some(&2));
388        assert_eq!(m.len(), 2);
389    }
390
391    #[test]
392    fn overwrite_in_one_replica() {
393        let mut m: Map<&'static str, i32> = Map::new(1);
394        m.set("a", 1);
395        m.set("a", 2);
396        m.set("a", 3);
397        assert_eq!(m.get(&"a"), Some(&3));
398        assert_eq!(m.len(), 1);
399    }
400
401    #[test]
402    fn remove_drops_value() {
403        let mut m: Map<&'static str, i32> = Map::new(1);
404        m.set("a", 1);
405        m.remove("a");
406        assert!(!m.contains_key(&"a"));
407        assert_eq!(m.len(), 0);
408    }
409
410    #[test]
411    fn concurrent_set_lww_resolution() {
412        let mut a: Map<&'static str, i32> = Map::new(1);
413        let mut b: Map<&'static str, i32> = Map::new(2);
414
415        a.set("k", 100);
416        b.set("k", 200);
417
418        let mut a2 = a.clone();
419        a2.merge(&b);
420        let mut b2 = b.clone();
421        b2.merge(&a);
422
423        // Both converge.
424        assert_eq!(a2.get(&"k"), b2.get(&"k"));
425        // The one with higher OpId wins.
426        // a's op had counter=1, replica=1. b's op had counter=1, replica=2.
427        // Counter ties → replica tiebreaker → b wins.
428        assert_eq!(a2.get(&"k"), Some(&200));
429    }
430
431    #[test]
432    fn set_beats_concurrent_remove_with_higher_id() {
433        let mut a: Map<&'static str, i32> = Map::new(1);
434        let mut b: Map<&'static str, i32> = Map::new(2);
435        a.set("k", 1);
436        b.merge(&a);
437
438        a.remove("k"); // counter=2, replica=1
439        b.set("k", 99); // counter=2, replica=2
440
441        let mut a2 = a.clone();
442        a2.merge(&b);
443        let mut b2 = b.clone();
444        b2.merge(&a);
445
446        assert_eq!(a2.get(&"k"), b2.get(&"k"));
447        // b's set has higher OpId (replica=2 > replica=1) → wins.
448        assert_eq!(a2.get(&"k"), Some(&99));
449    }
450
451    #[test]
452    fn idempotent_apply() {
453        let mut a: Map<&'static str, i32> = Map::new(1);
454        let op1 = a.set("k", 1);
455        let op2 = a.set("j", 2);
456
457        let mut b: Map<&'static str, i32> = Map::new(2);
458        b.apply(op1.clone()).unwrap();
459        b.apply(op2.clone()).unwrap();
460        b.apply(op1).unwrap();
461        b.apply(op2).unwrap();
462
463        assert_eq!(b.len(), 2);
464        assert_eq!(b.get(&"k"), Some(&1));
465        assert_eq!(b.get(&"j"), Some(&2));
466    }
467
468    #[test]
469    fn ops_since_returns_only_unseen() {
470        let mut a: Map<&'static str, i32> = Map::new(1);
471        a.set("k", 1);
472        let v1 = a.version().clone();
473        a.set("j", 2);
474
475        let new: Vec<&MapOp<&'static str, i32>> = a.ops_since(&v1).collect();
476        assert_eq!(new.len(), 1);
477    }
478}