1use alloc::collections::BTreeMap;
2use alloc::vec::Vec;
3
4use crate::clock::HybridTimestamp;
5use crate::{Crdt, DeltaCrdt};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct LWWMap<K: Ord + Clone, V: Clone> {
34 entries: BTreeMap<K, Entry<V>>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41struct Entry<V: Clone> {
42 value: Option<V>,
43 timestamp: HybridTimestamp,
44 alive: bool,
45}
46
47impl<K: Ord + Clone, V: Clone> LWWMap<K, V> {
48 pub fn new() -> Self {
50 Self {
51 entries: BTreeMap::new(),
52 }
53 }
54
55 pub fn insert(&mut self, key: K, value: V, timestamp: HybridTimestamp) {
59 match self.entries.get(&key) {
60 Some(entry) if entry.timestamp >= timestamp => {}
61 _ => {
62 self.entries.insert(
63 key,
64 Entry {
65 value: Some(value),
66 timestamp,
67 alive: true,
68 },
69 );
70 }
71 }
72 }
73
74 pub fn remove(&mut self, key: &K, timestamp: HybridTimestamp) -> bool {
79 match self.entries.get(key) {
80 Some(entry) if entry.timestamp >= timestamp => false,
81 _ => {
82 self.entries.insert(
83 key.clone(),
84 Entry {
85 value: None,
86 timestamp,
87 alive: false,
88 },
89 );
90 true
91 }
92 }
93 }
94
95 #[must_use]
97 pub fn get(&self, key: &K) -> Option<&V> {
98 self.entries
99 .get(key)
100 .filter(|e| e.alive)
101 .and_then(|e| e.value.as_ref())
102 }
103
104 #[must_use]
106 pub fn contains_key(&self, key: &K) -> bool {
107 self.entries.get(key).is_some_and(|e| e.alive)
108 }
109
110 #[must_use]
112 pub fn len(&self) -> usize {
113 self.entries.values().filter(|e| e.alive).count()
114 }
115
116 #[must_use]
118 pub fn is_empty(&self) -> bool {
119 self.len() == 0
120 }
121
122 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
124 self.entries
125 .iter()
126 .filter_map(|(k, e)| {
127 if e.alive {
128 e.value.as_ref().map(|v| (k, v))
129 } else {
130 None
131 }
132 })
133 }
134
135 pub fn keys(&self) -> impl Iterator<Item = &K> {
137 self.iter().map(|(k, _)| k)
138 }
139
140 pub fn values(&self) -> impl Iterator<Item = &V> {
142 self.iter().map(|(_, v)| v)
143 }
144}
145
146impl<K: Ord + Clone, V: Clone> Default for LWWMap<K, V> {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl<K: Ord + Clone, V: Clone> Crdt for LWWMap<K, V> {
153 fn merge(&mut self, other: &Self) {
154 for (key, other_entry) in &other.entries {
155 match self.entries.get(key) {
156 Some(self_entry) if self_entry.timestamp >= other_entry.timestamp => {}
157 _ => {
158 self.entries.insert(key.clone(), other_entry.clone());
159 }
160 }
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
168pub struct LWWMapDelta<K: Ord + Clone, V: Clone> {
169 entries: Vec<(K, Option<V>, HybridTimestamp, bool)>,
170}
171
172impl<K: Ord + Clone, V: Clone> DeltaCrdt for LWWMap<K, V> {
173 type Delta = LWWMapDelta<K, V>;
174
175 fn delta(&self, other: &Self) -> LWWMapDelta<K, V> {
176 let mut entries = Vec::new();
177 for (key, self_entry) in &self.entries {
178 let dominated = other
179 .entries
180 .get(key)
181 .is_some_and(|oe| oe.timestamp >= self_entry.timestamp);
182 if !dominated {
183 entries.push((
184 key.clone(),
185 self_entry.value.clone(),
186 self_entry.timestamp,
187 self_entry.alive,
188 ));
189 }
190 }
191 LWWMapDelta { entries }
192 }
193
194 fn apply_delta(&mut self, delta: &LWWMapDelta<K, V>) {
195 for (key, value, timestamp, alive) in &delta.entries {
196 match self.entries.get(key) {
197 Some(entry) if entry.timestamp >= *timestamp => {}
198 _ => {
199 self.entries.insert(
200 key.clone(),
201 Entry {
202 value: value.clone(),
203 timestamp: *timestamp,
204 alive: *alive,
205 },
206 );
207 }
208 }
209 }
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 fn ts(physical: u64, node: u16) -> HybridTimestamp {
218 HybridTimestamp {
219 physical,
220 logical: 0,
221 node_id: node,
222 }
223 }
224
225 #[test]
226 fn new_map_is_empty() {
227 let m = LWWMap::<String, String>::new();
228 assert!(m.is_empty());
229 assert_eq!(m.len(), 0);
230 }
231
232 #[test]
233 fn insert_and_get() {
234 let mut m = LWWMap::new();
235 m.insert("key", "value", ts(1, 1));
236 assert_eq!(m.get(&"key"), Some(&"value"));
237 assert!(m.contains_key(&"key"));
238 assert_eq!(m.len(), 1);
239 }
240
241 #[test]
242 fn later_write_wins() {
243 let mut m = LWWMap::new();
244 m.insert("k", "old", ts(1, 1));
245 m.insert("k", "new", ts(2, 1));
246 assert_eq!(m.get(&"k"), Some(&"new"));
247 }
248
249 #[test]
250 fn stale_write_ignored() {
251 let mut m = LWWMap::new();
252 m.insert("k", "new", ts(2, 1));
253 m.insert("k", "old", ts(1, 1));
254 assert_eq!(m.get(&"k"), Some(&"new"));
255 }
256
257 #[test]
258 fn remove_hides_key() {
259 let mut m = LWWMap::new();
260 m.insert("k", "v", ts(1, 1));
261 assert!(m.remove(&"k", ts(2, 1)));
262 assert!(!m.contains_key(&"k"));
263 assert_eq!(m.get(&"k"), None);
264 assert_eq!(m.len(), 0);
265 }
266
267 #[test]
268 fn stale_remove_ignored() {
269 let mut m = LWWMap::new();
270 m.insert("k", "v", ts(2, 1));
271 assert!(!m.remove(&"k", ts(1, 1)));
272 assert!(m.contains_key(&"k"));
273 }
274
275 #[test]
276 fn insert_after_remove() {
277 let mut m = LWWMap::new();
278 m.insert("k", "v1", ts(1, 1));
279 m.remove(&"k", ts(2, 1));
280 m.insert("k", "v2", ts(3, 1));
281 assert_eq!(m.get(&"k"), Some(&"v2"));
282 }
283
284 #[test]
285 fn merge_later_wins() {
286 let mut m1 = LWWMap::new();
287 m1.insert("k", "old", ts(1, 1));
288
289 let mut m2 = LWWMap::new();
290 m2.insert("k", "new", ts(2, 2));
291
292 m1.merge(&m2);
293 assert_eq!(m1.get(&"k"), Some(&"new"));
294 }
295
296 #[test]
297 fn merge_is_commutative() {
298 let mut m1 = LWWMap::new();
299 m1.insert("a", 1, ts(1, 1));
300 m1.insert("b", 2, ts(2, 1));
301
302 let mut m2 = LWWMap::new();
303 m2.insert("b", 3, ts(3, 2));
304 m2.insert("c", 4, ts(1, 2));
305
306 let mut left = m1.clone();
307 left.merge(&m2);
308
309 let mut right = m2.clone();
310 right.merge(&m1);
311
312 assert_eq!(left, right);
313 }
314
315 #[test]
316 fn merge_is_idempotent() {
317 let mut m1 = LWWMap::new();
318 m1.insert("k", "v", ts(1, 1));
319
320 let m2 = m1.clone();
321 m1.merge(&m2);
322 let after = m1.clone();
323 m1.merge(&m2);
324 assert_eq!(m1, after);
325 }
326
327 #[test]
328 fn merge_propagates_remove() {
329 let mut m1 = LWWMap::new();
330 m1.insert("k", "v", ts(1, 1));
331
332 let mut m2 = m1.clone();
333 m2.remove(&"k", ts(2, 2));
334
335 m1.merge(&m2);
336 assert!(!m1.contains_key(&"k"));
337 }
338
339 #[test]
340 fn delta_apply_equivalent_to_merge() {
341 let mut m1 = LWWMap::new();
342 m1.insert("a", 1, ts(1, 1));
343 m1.insert("b", 2, ts(3, 1));
344
345 let mut m2 = LWWMap::new();
346 m2.insert("b", 3, ts(2, 2));
347 m2.insert("c", 4, ts(1, 2));
348
349 let mut via_merge = m2.clone();
350 via_merge.merge(&m1);
351
352 let mut via_delta = m2.clone();
353 let d = m1.delta(&m2);
354 via_delta.apply_delta(&d);
355
356 assert_eq!(via_merge, via_delta);
357 }
358
359 #[test]
360 fn delta_is_empty_when_dominated() {
361 let mut m1 = LWWMap::new();
362 m1.insert("k", "old", ts(1, 1));
363
364 let mut m2 = LWWMap::new();
365 m2.insert("k", "new", ts(2, 2));
366
367 let d = m1.delta(&m2);
368 assert!(d.entries.is_empty());
369 }
370
371 #[test]
372 fn iterate_alive_entries() {
373 let mut m = LWWMap::new();
374 m.insert("a", 1, ts(1, 1));
375 m.insert("b", 2, ts(2, 1));
376 m.insert("c", 3, ts(3, 1));
377 m.remove(&"b", ts(4, 1));
378
379 let keys: Vec<_> = m.keys().collect();
380 assert_eq!(keys, vec![&"a", &"c"]);
381 }
382}