Skip to main content

crdt_kit/
aw_map.rs

1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::vec::Vec;
3
4use crate::{Crdt, DeltaCrdt, NodeId};
5
6/// An add-wins map (AW-Map).
7///
8/// A key-value map where each key is tracked with OR-Set semantics: concurrent
9/// add and remove of the same key resolves in favor of add. Values are updated
10/// using a causal version vector per key.
11///
12/// # Example
13///
14/// ```
15/// use crdt_kit::prelude::*;
16///
17/// let mut m1 = AWMap::new(1);
18/// m1.insert("color", "red");
19///
20/// let mut m2 = AWMap::new(2);
21/// m2.insert("color", "blue");
22///
23/// m1.merge(&m2);
24/// // Both adds are concurrent — the latest by tag ordering wins
25/// assert!(m1.contains_key(&"color"));
26/// ```
27#[derive(Debug, Clone, PartialEq, Eq)]
28#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
29pub struct AWMap<K: Ord + Clone, V: Clone + Eq> {
30    actor: NodeId,
31    counter: u64,
32    /// key -> (value, set of unique tags)
33    entries: BTreeMap<K, (V, BTreeSet<(NodeId, u64)>)>,
34    /// Tombstones: tags that have been removed
35    tombstones: BTreeSet<(NodeId, u64)>,
36}
37
38impl<K: Ord + Clone, V: Clone + Eq> AWMap<K, V> {
39    /// Create a new empty AW-Map for the given node.
40    pub fn new(actor: NodeId) -> Self {
41        Self {
42            actor,
43            counter: 0,
44            entries: BTreeMap::new(),
45            tombstones: BTreeSet::new(),
46        }
47    }
48
49    /// Insert or update a key-value pair.
50    ///
51    /// Generates a unique tag for this write. If the key already exists,
52    /// the old tags are kept (they accumulate until a remove clears them).
53    /// The value is updated to the new value.
54    pub fn insert(&mut self, key: K, value: V) {
55        self.counter += 1;
56        let tag = (self.actor, self.counter);
57        let entry = self.entries.entry(key).or_insert_with(|| {
58            (value.clone(), BTreeSet::new())
59        });
60        entry.0 = value;
61        entry.1.insert(tag);
62    }
63
64    /// Remove a key from the map.
65    ///
66    /// Only removes the tags that this replica has observed. Concurrent
67    /// inserts on other replicas will survive the merge (add wins).
68    ///
69    /// Returns `true` if the key was present and removed.
70    pub fn remove(&mut self, key: &K) -> bool {
71        if let Some((_, tags)) = self.entries.remove(key) {
72            self.tombstones.extend(tags);
73            true
74        } else {
75            false
76        }
77    }
78
79    /// Get the value associated with a key, if present.
80    #[must_use]
81    pub fn get(&self, key: &K) -> Option<&V> {
82        self.entries.get(key).map(|(v, _)| v)
83    }
84
85    /// Check if a key is present in the map.
86    #[must_use]
87    pub fn contains_key(&self, key: &K) -> bool {
88        self.entries.contains_key(key)
89    }
90
91    /// Get the number of keys in the map.
92    #[must_use]
93    pub fn len(&self) -> usize {
94        self.entries.len()
95    }
96
97    /// Check if the map is empty.
98    #[must_use]
99    pub fn is_empty(&self) -> bool {
100        self.entries.is_empty()
101    }
102
103    /// Iterate over key-value pairs.
104    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
105        self.entries.iter().map(|(k, (v, _))| (k, v))
106    }
107
108    /// Get all keys.
109    pub fn keys(&self) -> impl Iterator<Item = &K> {
110        self.entries.keys()
111    }
112
113    /// Get all values.
114    pub fn values(&self) -> impl Iterator<Item = &V> {
115        self.entries.values().map(|(v, _)| v)
116    }
117
118    /// Get this replica's node ID.
119    #[must_use]
120    pub fn actor(&self) -> NodeId {
121        self.actor
122    }
123}
124
125impl<K: Ord + Clone, V: Clone + Eq> IntoIterator for AWMap<K, V> {
126    type Item = (K, V);
127    type IntoIter = alloc::vec::IntoIter<(K, V)>;
128
129    fn into_iter(self) -> Self::IntoIter {
130        let items: Vec<(K, V)> = self
131            .entries
132            .into_iter()
133            .map(|(k, (v, _))| (k, v))
134            .collect();
135        items.into_iter()
136    }
137}
138
139impl<K: Ord + Clone, V: Clone + Eq> Crdt for AWMap<K, V> {
140    fn merge(&mut self, other: &Self) {
141        // Add entries from other that we don't have tombstoned
142        for (key, (other_value, other_tags)) in &other.entries {
143            let entry = self.entries.entry(key.clone()).or_insert_with(|| {
144                (other_value.clone(), BTreeSet::new())
145            });
146            for &tag in other_tags {
147                if !self.tombstones.contains(&tag) {
148                    entry.1.insert(tag);
149                }
150            }
151            // Use the value from the highest tag for determinism
152            if let Some(&max_tag) = entry.1.iter().next_back() {
153                if other_tags.contains(&max_tag) {
154                    entry.0 = other_value.clone();
155                }
156            }
157        }
158
159        // Apply other's tombstones
160        for &tag in &other.tombstones {
161            for (_, (_, tags)) in self.entries.iter_mut() {
162                tags.remove(&tag);
163            }
164        }
165        self.tombstones.extend(&other.tombstones);
166
167        // Remove entries with no live tags
168        self.entries.retain(|_, (_, tags)| !tags.is_empty());
169
170        // Resolve value for each remaining key: value comes from the max tag owner
171        // We need to reconcile values when both sides contributed tags
172        // Use the value from the side that has the globally highest tag
173        for (key, (value, tags)) in self.entries.iter_mut() {
174            if let Some(&max_tag) = tags.iter().next_back() {
175                if let Some((other_value, other_tags)) = other.entries.get(key) {
176                    if other_tags.contains(&max_tag) {
177                        *value = other_value.clone();
178                    }
179                }
180            }
181        }
182
183        self.counter = self.counter.max(other.counter);
184    }
185}
186
187/// Delta for [`AWMap`]: new tags and tombstones since another state.
188#[derive(Debug, Clone, PartialEq, Eq)]
189#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
190pub struct AWMapDelta<K: Ord + Clone, V: Clone + Eq> {
191    additions: Vec<(K, V, (NodeId, u64))>,
192    tombstones: BTreeSet<(NodeId, u64)>,
193}
194
195impl<K: Ord + Clone, V: Clone + Eq> DeltaCrdt for AWMap<K, V> {
196    type Delta = AWMapDelta<K, V>;
197
198    fn delta(&self, other: &Self) -> AWMapDelta<K, V> {
199        let mut additions = Vec::new();
200        for (key, (value, self_tags)) in &self.entries {
201            let other_tags = other.entries.get(key).map(|(_, t)| t);
202            for &tag in self_tags {
203                let known = other_tags.is_some_and(|ot| ot.contains(&tag))
204                    || other.tombstones.contains(&tag);
205                if !known {
206                    additions.push((key.clone(), value.clone(), tag));
207                }
208            }
209        }
210
211        let tombstones: BTreeSet<_> = self
212            .tombstones
213            .difference(&other.tombstones)
214            .copied()
215            .collect();
216
217        AWMapDelta {
218            additions,
219            tombstones,
220        }
221    }
222
223    fn apply_delta(&mut self, delta: &AWMapDelta<K, V>) {
224        for (key, value, tag) in &delta.additions {
225            if !self.tombstones.contains(tag) {
226                let entry = self.entries.entry(key.clone()).or_insert_with(|| {
227                    (value.clone(), BTreeSet::new())
228                });
229                entry.1.insert(*tag);
230                // Update value if this is the new max tag
231                if let Some(&max_tag) = entry.1.iter().next_back() {
232                    if *tag == max_tag {
233                        entry.0 = value.clone();
234                    }
235                }
236            }
237        }
238
239        for &tag in &delta.tombstones {
240            for (_, (_, tags)) in self.entries.iter_mut() {
241                tags.remove(&tag);
242            }
243        }
244        self.tombstones.extend(&delta.tombstones);
245
246        self.entries.retain(|_, (_, tags)| !tags.is_empty());
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn new_map_is_empty() {
256        let m = AWMap::<String, String>::new(1);
257        assert!(m.is_empty());
258        assert_eq!(m.len(), 0);
259    }
260
261    #[test]
262    fn insert_and_get() {
263        let mut m = AWMap::new(1);
264        m.insert("key", "value");
265        assert_eq!(m.get(&"key"), Some(&"value"));
266        assert!(m.contains_key(&"key"));
267        assert_eq!(m.len(), 1);
268    }
269
270    #[test]
271    fn update_value() {
272        let mut m = AWMap::new(1);
273        m.insert("k", "v1");
274        m.insert("k", "v2");
275        assert_eq!(m.get(&"k"), Some(&"v2"));
276    }
277
278    #[test]
279    fn remove_key() {
280        let mut m = AWMap::new(1);
281        m.insert("k", "v");
282        assert!(m.remove(&"k"));
283        assert!(!m.contains_key(&"k"));
284        assert_eq!(m.len(), 0);
285    }
286
287    #[test]
288    fn remove_nonexistent_returns_false() {
289        let mut m = AWMap::<&str, &str>::new(1);
290        assert!(!m.remove(&"k"));
291    }
292
293    #[test]
294    fn readd_after_remove() {
295        let mut m = AWMap::new(1);
296        m.insert("k", "v1");
297        m.remove(&"k");
298        m.insert("k", "v2");
299        assert_eq!(m.get(&"k"), Some(&"v2"));
300    }
301
302    #[test]
303    fn concurrent_add_survives_remove() {
304        let mut m1 = AWMap::new(1);
305        m1.insert("k", "v");
306        m1.remove(&"k");
307
308        let mut m2 = AWMap::new(2);
309        m2.insert("k", "v");
310
311        m1.merge(&m2);
312        assert!(
313            m1.contains_key(&"k"),
314            "Concurrent add should survive remove (add wins)"
315        );
316    }
317
318    #[test]
319    fn merge_combines_entries() {
320        let mut m1 = AWMap::new(1);
321        m1.insert("a", 1);
322
323        let mut m2 = AWMap::new(2);
324        m2.insert("b", 2);
325
326        m1.merge(&m2);
327        assert_eq!(m1.get(&"a"), Some(&1));
328        assert_eq!(m1.get(&"b"), Some(&2));
329        assert_eq!(m1.len(), 2);
330    }
331
332    #[test]
333    fn merge_is_commutative() {
334        let mut m1 = AWMap::new(1);
335        m1.insert("a", 1);
336        m1.insert("b", 2);
337
338        let mut m2 = AWMap::new(2);
339        m2.insert("b", 3);
340        m2.insert("c", 4);
341
342        let mut left = m1.clone();
343        left.merge(&m2);
344        let left_keys: BTreeSet<_> = left.keys().collect();
345
346        let mut right = m2.clone();
347        right.merge(&m1);
348        let right_keys: BTreeSet<_> = right.keys().collect();
349
350        assert_eq!(left_keys, right_keys);
351    }
352
353    #[test]
354    fn merge_is_idempotent() {
355        let mut m1 = AWMap::new(1);
356        m1.insert("k", "v");
357
358        let m2 = m1.clone();
359        m1.merge(&m2);
360        let after = m1.clone();
361        m1.merge(&m2);
362        assert_eq!(m1, after);
363    }
364
365    #[test]
366    fn merge_propagates_remove() {
367        let mut m1 = AWMap::new(1);
368        m1.insert("k", "v");
369
370        let mut m2 = m1.clone();
371        m2.remove(&"k");
372
373        m1.merge(&m2);
374        assert!(!m1.contains_key(&"k"));
375    }
376
377    #[test]
378    fn delta_apply_equivalent_to_merge() {
379        let mut m1 = AWMap::new(1);
380        m1.insert("a", 1);
381        m1.insert("b", 2);
382
383        let mut m2 = AWMap::new(2);
384        m2.insert("b", 3);
385        m2.insert("c", 4);
386
387        let mut via_merge = m2.clone();
388        via_merge.merge(&m1);
389
390        let mut via_delta = m2.clone();
391        let d = m1.delta(&m2);
392        via_delta.apply_delta(&d);
393
394        let merge_keys: BTreeSet<_> = via_merge.keys().collect();
395        let delta_keys: BTreeSet<_> = via_delta.keys().collect();
396        assert_eq!(merge_keys, delta_keys);
397    }
398
399    #[test]
400    fn delta_carries_tombstones() {
401        let mut m1 = AWMap::new(1);
402        m1.insert("k", "v");
403
404        let m2 = m1.clone();
405        m1.remove(&"k");
406
407        let d = m1.delta(&m2);
408        assert!(!d.tombstones.is_empty());
409
410        let mut via_delta = m2.clone();
411        via_delta.apply_delta(&d);
412        assert!(!via_delta.contains_key(&"k"));
413    }
414
415    #[test]
416    fn iterate_entries() {
417        let mut m = AWMap::new(1);
418        m.insert("a", 1);
419        m.insert("b", 2);
420        m.insert("c", 3);
421        m.remove(&"b");
422
423        let keys: Vec<_> = m.keys().collect();
424        assert_eq!(keys, vec![&"a", &"c"]);
425    }
426}