Skip to main content

crdt_kit/
lww_map.rs

1use alloc::collections::BTreeMap;
2use alloc::vec::Vec;
3
4use crate::clock::HybridTimestamp;
5use crate::{Crdt, DeltaCrdt};
6
7/// A last-writer-wins map (LWW-Map).
8///
9/// Each key maps to a value with a [`HybridTimestamp`]. Concurrent writes
10/// to the same key resolve by keeping the value with the highest timestamp.
11/// Keys can be removed; a remove is stored as a tombstone with a timestamp
12/// so that stale puts don't resurrect deleted keys.
13///
14/// # Example
15///
16/// ```
17/// use crdt_kit::prelude::*;
18/// use crdt_kit::clock::HybridTimestamp;
19///
20/// let ts = |ms, node| HybridTimestamp { physical: ms, logical: 0, node_id: node };
21///
22/// let mut m1 = LWWMap::new();
23/// m1.insert("color", "red", ts(100, 1));
24///
25/// let mut m2 = LWWMap::new();
26/// m2.insert("color", "blue", ts(200, 2));
27///
28/// m1.merge(&m2);
29/// assert_eq!(m1.get(&"color"), Some(&"blue")); // later timestamp wins
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct LWWMap<K: Ord + Clone, V: Clone> {
34    /// Each key maps to (value, timestamp, alive).
35    /// `alive` is `true` for puts, `false` for removes.
36    entries: BTreeMap<K, Entry<V>>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41struct Entry<V: Clone> {
42    value: Option<V>,
43    timestamp: HybridTimestamp,
44    alive: bool,
45}
46
47impl<K: Ord + Clone, V: Clone> LWWMap<K, V> {
48    /// Create a new empty LWW-Map.
49    pub fn new() -> Self {
50        Self {
51            entries: BTreeMap::new(),
52        }
53    }
54
55    /// Insert or update a key-value pair with the given timestamp.
56    ///
57    /// If the key already exists with a newer or equal timestamp, this is a no-op.
58    pub fn insert(&mut self, key: K, value: V, timestamp: HybridTimestamp) {
59        match self.entries.get(&key) {
60            Some(entry) if entry.timestamp >= timestamp => {}
61            _ => {
62                self.entries.insert(
63                    key,
64                    Entry {
65                        value: Some(value),
66                        timestamp,
67                        alive: true,
68                    },
69                );
70            }
71        }
72    }
73
74    /// Remove a key with the given timestamp.
75    ///
76    /// The removal only takes effect if its timestamp is greater than the
77    /// current entry's timestamp. Returns `true` if the key was removed.
78    pub fn remove(&mut self, key: &K, timestamp: HybridTimestamp) -> bool {
79        match self.entries.get(key) {
80            Some(entry) if entry.timestamp >= timestamp => false,
81            _ => {
82                self.entries.insert(
83                    key.clone(),
84                    Entry {
85                        value: None,
86                        timestamp,
87                        alive: false,
88                    },
89                );
90                true
91            }
92        }
93    }
94
95    /// Get the value associated with a key, if it exists and is alive.
96    #[must_use]
97    pub fn get(&self, key: &K) -> Option<&V> {
98        self.entries
99            .get(key)
100            .filter(|e| e.alive)
101            .and_then(|e| e.value.as_ref())
102    }
103
104    /// Check if a key is present and alive in the map.
105    #[must_use]
106    pub fn contains_key(&self, key: &K) -> bool {
107        self.entries.get(key).is_some_and(|e| e.alive)
108    }
109
110    /// Get the number of alive keys.
111    #[must_use]
112    pub fn len(&self) -> usize {
113        self.entries.values().filter(|e| e.alive).count()
114    }
115
116    /// Check if the map has no alive keys.
117    #[must_use]
118    pub fn is_empty(&self) -> bool {
119        self.len() == 0
120    }
121
122    /// Iterate over alive key-value pairs.
123    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
124        self.entries
125            .iter()
126            .filter_map(|(k, e)| {
127                if e.alive {
128                    e.value.as_ref().map(|v| (k, v))
129                } else {
130                    None
131                }
132            })
133    }
134
135    /// Get all alive keys.
136    pub fn keys(&self) -> impl Iterator<Item = &K> {
137        self.iter().map(|(k, _)| k)
138    }
139
140    /// Get all alive values.
141    pub fn values(&self) -> impl Iterator<Item = &V> {
142        self.iter().map(|(_, v)| v)
143    }
144}
145
146impl<K: Ord + Clone, V: Clone> Default for LWWMap<K, V> {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl<K: Ord + Clone, V: Clone> Crdt for LWWMap<K, V> {
153    fn merge(&mut self, other: &Self) {
154        for (key, other_entry) in &other.entries {
155            match self.entries.get(key) {
156                Some(self_entry) if self_entry.timestamp >= other_entry.timestamp => {}
157                _ => {
158                    self.entries.insert(key.clone(), other_entry.clone());
159                }
160            }
161        }
162    }
163}
164
165/// Delta for [`LWWMap`]: entries that are newer in the source.
166#[derive(Debug, Clone, PartialEq, Eq)]
167#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
168pub struct LWWMapDelta<K: Ord + Clone, V: Clone> {
169    entries: Vec<(K, Option<V>, HybridTimestamp, bool)>,
170}
171
172impl<K: Ord + Clone, V: Clone> DeltaCrdt for LWWMap<K, V> {
173    type Delta = LWWMapDelta<K, V>;
174
175    fn delta(&self, other: &Self) -> LWWMapDelta<K, V> {
176        let mut entries = Vec::new();
177        for (key, self_entry) in &self.entries {
178            let dominated = other
179                .entries
180                .get(key)
181                .is_some_and(|oe| oe.timestamp >= self_entry.timestamp);
182            if !dominated {
183                entries.push((
184                    key.clone(),
185                    self_entry.value.clone(),
186                    self_entry.timestamp,
187                    self_entry.alive,
188                ));
189            }
190        }
191        LWWMapDelta { entries }
192    }
193
194    fn apply_delta(&mut self, delta: &LWWMapDelta<K, V>) {
195        for (key, value, timestamp, alive) in &delta.entries {
196            match self.entries.get(key) {
197                Some(entry) if entry.timestamp >= *timestamp => {}
198                _ => {
199                    self.entries.insert(
200                        key.clone(),
201                        Entry {
202                            value: value.clone(),
203                            timestamp: *timestamp,
204                            alive: *alive,
205                        },
206                    );
207                }
208            }
209        }
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    fn ts(physical: u64, node: u16) -> HybridTimestamp {
218        HybridTimestamp {
219            physical,
220            logical: 0,
221            node_id: node,
222        }
223    }
224
225    #[test]
226    fn new_map_is_empty() {
227        let m = LWWMap::<String, String>::new();
228        assert!(m.is_empty());
229        assert_eq!(m.len(), 0);
230    }
231
232    #[test]
233    fn insert_and_get() {
234        let mut m = LWWMap::new();
235        m.insert("key", "value", ts(1, 1));
236        assert_eq!(m.get(&"key"), Some(&"value"));
237        assert!(m.contains_key(&"key"));
238        assert_eq!(m.len(), 1);
239    }
240
241    #[test]
242    fn later_write_wins() {
243        let mut m = LWWMap::new();
244        m.insert("k", "old", ts(1, 1));
245        m.insert("k", "new", ts(2, 1));
246        assert_eq!(m.get(&"k"), Some(&"new"));
247    }
248
249    #[test]
250    fn stale_write_ignored() {
251        let mut m = LWWMap::new();
252        m.insert("k", "new", ts(2, 1));
253        m.insert("k", "old", ts(1, 1));
254        assert_eq!(m.get(&"k"), Some(&"new"));
255    }
256
257    #[test]
258    fn remove_hides_key() {
259        let mut m = LWWMap::new();
260        m.insert("k", "v", ts(1, 1));
261        assert!(m.remove(&"k", ts(2, 1)));
262        assert!(!m.contains_key(&"k"));
263        assert_eq!(m.get(&"k"), None);
264        assert_eq!(m.len(), 0);
265    }
266
267    #[test]
268    fn stale_remove_ignored() {
269        let mut m = LWWMap::new();
270        m.insert("k", "v", ts(2, 1));
271        assert!(!m.remove(&"k", ts(1, 1)));
272        assert!(m.contains_key(&"k"));
273    }
274
275    #[test]
276    fn insert_after_remove() {
277        let mut m = LWWMap::new();
278        m.insert("k", "v1", ts(1, 1));
279        m.remove(&"k", ts(2, 1));
280        m.insert("k", "v2", ts(3, 1));
281        assert_eq!(m.get(&"k"), Some(&"v2"));
282    }
283
284    #[test]
285    fn merge_later_wins() {
286        let mut m1 = LWWMap::new();
287        m1.insert("k", "old", ts(1, 1));
288
289        let mut m2 = LWWMap::new();
290        m2.insert("k", "new", ts(2, 2));
291
292        m1.merge(&m2);
293        assert_eq!(m1.get(&"k"), Some(&"new"));
294    }
295
296    #[test]
297    fn merge_is_commutative() {
298        let mut m1 = LWWMap::new();
299        m1.insert("a", 1, ts(1, 1));
300        m1.insert("b", 2, ts(2, 1));
301
302        let mut m2 = LWWMap::new();
303        m2.insert("b", 3, ts(3, 2));
304        m2.insert("c", 4, ts(1, 2));
305
306        let mut left = m1.clone();
307        left.merge(&m2);
308
309        let mut right = m2.clone();
310        right.merge(&m1);
311
312        assert_eq!(left, right);
313    }
314
315    #[test]
316    fn merge_is_idempotent() {
317        let mut m1 = LWWMap::new();
318        m1.insert("k", "v", ts(1, 1));
319
320        let m2 = m1.clone();
321        m1.merge(&m2);
322        let after = m1.clone();
323        m1.merge(&m2);
324        assert_eq!(m1, after);
325    }
326
327    #[test]
328    fn merge_propagates_remove() {
329        let mut m1 = LWWMap::new();
330        m1.insert("k", "v", ts(1, 1));
331
332        let mut m2 = m1.clone();
333        m2.remove(&"k", ts(2, 2));
334
335        m1.merge(&m2);
336        assert!(!m1.contains_key(&"k"));
337    }
338
339    #[test]
340    fn delta_apply_equivalent_to_merge() {
341        let mut m1 = LWWMap::new();
342        m1.insert("a", 1, ts(1, 1));
343        m1.insert("b", 2, ts(3, 1));
344
345        let mut m2 = LWWMap::new();
346        m2.insert("b", 3, ts(2, 2));
347        m2.insert("c", 4, ts(1, 2));
348
349        let mut via_merge = m2.clone();
350        via_merge.merge(&m1);
351
352        let mut via_delta = m2.clone();
353        let d = m1.delta(&m2);
354        via_delta.apply_delta(&d);
355
356        assert_eq!(via_merge, via_delta);
357    }
358
359    #[test]
360    fn delta_is_empty_when_dominated() {
361        let mut m1 = LWWMap::new();
362        m1.insert("k", "old", ts(1, 1));
363
364        let mut m2 = LWWMap::new();
365        m2.insert("k", "new", ts(2, 2));
366
367        let d = m1.delta(&m2);
368        assert!(d.entries.is_empty());
369    }
370
371    #[test]
372    fn iterate_alive_entries() {
373        let mut m = LWWMap::new();
374        m.insert("a", 1, ts(1, 1));
375        m.insert("b", 2, ts(2, 1));
376        m.insert("c", 3, ts(3, 1));
377        m.remove(&"b", ts(4, 1));
378
379        let keys: Vec<_> = m.keys().collect();
380        assert_eq!(keys, vec![&"a", &"c"]);
381    }
382}