1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::vec::Vec;
3
4use crate::{Crdt, DeltaCrdt, NodeId};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
28#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
29pub struct AWMap<K: Ord + Clone, V: Clone + Eq> {
30 actor: NodeId,
31 counter: u64,
32 entries: BTreeMap<K, (V, BTreeSet<(NodeId, u64)>)>,
34 tombstones: BTreeSet<(NodeId, u64)>,
36}
37
38impl<K: Ord + Clone, V: Clone + Eq> AWMap<K, V> {
39 pub fn new(actor: NodeId) -> Self {
41 Self {
42 actor,
43 counter: 0,
44 entries: BTreeMap::new(),
45 tombstones: BTreeSet::new(),
46 }
47 }
48
49 pub fn insert(&mut self, key: K, value: V) {
55 self.counter += 1;
56 let tag = (self.actor, self.counter);
57 let entry = self.entries.entry(key).or_insert_with(|| {
58 (value.clone(), BTreeSet::new())
59 });
60 entry.0 = value;
61 entry.1.insert(tag);
62 }
63
64 pub fn remove(&mut self, key: &K) -> bool {
71 if let Some((_, tags)) = self.entries.remove(key) {
72 self.tombstones.extend(tags);
73 true
74 } else {
75 false
76 }
77 }
78
79 #[must_use]
81 pub fn get(&self, key: &K) -> Option<&V> {
82 self.entries.get(key).map(|(v, _)| v)
83 }
84
85 #[must_use]
87 pub fn contains_key(&self, key: &K) -> bool {
88 self.entries.contains_key(key)
89 }
90
91 #[must_use]
93 pub fn len(&self) -> usize {
94 self.entries.len()
95 }
96
97 #[must_use]
99 pub fn is_empty(&self) -> bool {
100 self.entries.is_empty()
101 }
102
103 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
105 self.entries.iter().map(|(k, (v, _))| (k, v))
106 }
107
108 pub fn keys(&self) -> impl Iterator<Item = &K> {
110 self.entries.keys()
111 }
112
113 pub fn values(&self) -> impl Iterator<Item = &V> {
115 self.entries.values().map(|(v, _)| v)
116 }
117
118 #[must_use]
120 pub fn actor(&self) -> NodeId {
121 self.actor
122 }
123}
124
125impl<K: Ord + Clone, V: Clone + Eq> IntoIterator for AWMap<K, V> {
126 type Item = (K, V);
127 type IntoIter = alloc::vec::IntoIter<(K, V)>;
128
129 fn into_iter(self) -> Self::IntoIter {
130 let items: Vec<(K, V)> = self
131 .entries
132 .into_iter()
133 .map(|(k, (v, _))| (k, v))
134 .collect();
135 items.into_iter()
136 }
137}
138
139impl<K: Ord + Clone, V: Clone + Eq> Crdt for AWMap<K, V> {
140 fn merge(&mut self, other: &Self) {
141 for (key, (other_value, other_tags)) in &other.entries {
143 let entry = self.entries.entry(key.clone()).or_insert_with(|| {
144 (other_value.clone(), BTreeSet::new())
145 });
146 for &tag in other_tags {
147 if !self.tombstones.contains(&tag) {
148 entry.1.insert(tag);
149 }
150 }
151 if let Some(&max_tag) = entry.1.iter().next_back() {
153 if other_tags.contains(&max_tag) {
154 entry.0 = other_value.clone();
155 }
156 }
157 }
158
159 for &tag in &other.tombstones {
161 for (_, (_, tags)) in self.entries.iter_mut() {
162 tags.remove(&tag);
163 }
164 }
165 self.tombstones.extend(&other.tombstones);
166
167 self.entries.retain(|_, (_, tags)| !tags.is_empty());
169
170 for (key, (value, tags)) in self.entries.iter_mut() {
174 if let Some(&max_tag) = tags.iter().next_back() {
175 if let Some((other_value, other_tags)) = other.entries.get(key) {
176 if other_tags.contains(&max_tag) {
177 *value = other_value.clone();
178 }
179 }
180 }
181 }
182
183 self.counter = self.counter.max(other.counter);
184 }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
189#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
190pub struct AWMapDelta<K: Ord + Clone, V: Clone + Eq> {
191 additions: Vec<(K, V, (NodeId, u64))>,
192 tombstones: BTreeSet<(NodeId, u64)>,
193}
194
195impl<K: Ord + Clone, V: Clone + Eq> DeltaCrdt for AWMap<K, V> {
196 type Delta = AWMapDelta<K, V>;
197
198 fn delta(&self, other: &Self) -> AWMapDelta<K, V> {
199 let mut additions = Vec::new();
200 for (key, (value, self_tags)) in &self.entries {
201 let other_tags = other.entries.get(key).map(|(_, t)| t);
202 for &tag in self_tags {
203 let known = other_tags.is_some_and(|ot| ot.contains(&tag))
204 || other.tombstones.contains(&tag);
205 if !known {
206 additions.push((key.clone(), value.clone(), tag));
207 }
208 }
209 }
210
211 let tombstones: BTreeSet<_> = self
212 .tombstones
213 .difference(&other.tombstones)
214 .copied()
215 .collect();
216
217 AWMapDelta {
218 additions,
219 tombstones,
220 }
221 }
222
223 fn apply_delta(&mut self, delta: &AWMapDelta<K, V>) {
224 for (key, value, tag) in &delta.additions {
225 if !self.tombstones.contains(tag) {
226 let entry = self.entries.entry(key.clone()).or_insert_with(|| {
227 (value.clone(), BTreeSet::new())
228 });
229 entry.1.insert(*tag);
230 if let Some(&max_tag) = entry.1.iter().next_back() {
232 if *tag == max_tag {
233 entry.0 = value.clone();
234 }
235 }
236 }
237 }
238
239 for &tag in &delta.tombstones {
240 for (_, (_, tags)) in self.entries.iter_mut() {
241 tags.remove(&tag);
242 }
243 }
244 self.tombstones.extend(&delta.tombstones);
245
246 self.entries.retain(|_, (_, tags)| !tags.is_empty());
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn new_map_is_empty() {
256 let m = AWMap::<String, String>::new(1);
257 assert!(m.is_empty());
258 assert_eq!(m.len(), 0);
259 }
260
261 #[test]
262 fn insert_and_get() {
263 let mut m = AWMap::new(1);
264 m.insert("key", "value");
265 assert_eq!(m.get(&"key"), Some(&"value"));
266 assert!(m.contains_key(&"key"));
267 assert_eq!(m.len(), 1);
268 }
269
270 #[test]
271 fn update_value() {
272 let mut m = AWMap::new(1);
273 m.insert("k", "v1");
274 m.insert("k", "v2");
275 assert_eq!(m.get(&"k"), Some(&"v2"));
276 }
277
278 #[test]
279 fn remove_key() {
280 let mut m = AWMap::new(1);
281 m.insert("k", "v");
282 assert!(m.remove(&"k"));
283 assert!(!m.contains_key(&"k"));
284 assert_eq!(m.len(), 0);
285 }
286
287 #[test]
288 fn remove_nonexistent_returns_false() {
289 let mut m = AWMap::<&str, &str>::new(1);
290 assert!(!m.remove(&"k"));
291 }
292
293 #[test]
294 fn readd_after_remove() {
295 let mut m = AWMap::new(1);
296 m.insert("k", "v1");
297 m.remove(&"k");
298 m.insert("k", "v2");
299 assert_eq!(m.get(&"k"), Some(&"v2"));
300 }
301
302 #[test]
303 fn concurrent_add_survives_remove() {
304 let mut m1 = AWMap::new(1);
305 m1.insert("k", "v");
306 m1.remove(&"k");
307
308 let mut m2 = AWMap::new(2);
309 m2.insert("k", "v");
310
311 m1.merge(&m2);
312 assert!(
313 m1.contains_key(&"k"),
314 "Concurrent add should survive remove (add wins)"
315 );
316 }
317
318 #[test]
319 fn merge_combines_entries() {
320 let mut m1 = AWMap::new(1);
321 m1.insert("a", 1);
322
323 let mut m2 = AWMap::new(2);
324 m2.insert("b", 2);
325
326 m1.merge(&m2);
327 assert_eq!(m1.get(&"a"), Some(&1));
328 assert_eq!(m1.get(&"b"), Some(&2));
329 assert_eq!(m1.len(), 2);
330 }
331
332 #[test]
333 fn merge_is_commutative() {
334 let mut m1 = AWMap::new(1);
335 m1.insert("a", 1);
336 m1.insert("b", 2);
337
338 let mut m2 = AWMap::new(2);
339 m2.insert("b", 3);
340 m2.insert("c", 4);
341
342 let mut left = m1.clone();
343 left.merge(&m2);
344 let left_keys: BTreeSet<_> = left.keys().collect();
345
346 let mut right = m2.clone();
347 right.merge(&m1);
348 let right_keys: BTreeSet<_> = right.keys().collect();
349
350 assert_eq!(left_keys, right_keys);
351 }
352
353 #[test]
354 fn merge_is_idempotent() {
355 let mut m1 = AWMap::new(1);
356 m1.insert("k", "v");
357
358 let m2 = m1.clone();
359 m1.merge(&m2);
360 let after = m1.clone();
361 m1.merge(&m2);
362 assert_eq!(m1, after);
363 }
364
365 #[test]
366 fn merge_propagates_remove() {
367 let mut m1 = AWMap::new(1);
368 m1.insert("k", "v");
369
370 let mut m2 = m1.clone();
371 m2.remove(&"k");
372
373 m1.merge(&m2);
374 assert!(!m1.contains_key(&"k"));
375 }
376
377 #[test]
378 fn delta_apply_equivalent_to_merge() {
379 let mut m1 = AWMap::new(1);
380 m1.insert("a", 1);
381 m1.insert("b", 2);
382
383 let mut m2 = AWMap::new(2);
384 m2.insert("b", 3);
385 m2.insert("c", 4);
386
387 let mut via_merge = m2.clone();
388 via_merge.merge(&m1);
389
390 let mut via_delta = m2.clone();
391 let d = m1.delta(&m2);
392 via_delta.apply_delta(&d);
393
394 let merge_keys: BTreeSet<_> = via_merge.keys().collect();
395 let delta_keys: BTreeSet<_> = via_delta.keys().collect();
396 assert_eq!(merge_keys, delta_keys);
397 }
398
399 #[test]
400 fn delta_carries_tombstones() {
401 let mut m1 = AWMap::new(1);
402 m1.insert("k", "v");
403
404 let m2 = m1.clone();
405 m1.remove(&"k");
406
407 let d = m1.delta(&m2);
408 assert!(!d.tombstones.is_empty());
409
410 let mut via_delta = m2.clone();
411 via_delta.apply_delta(&d);
412 assert!(!via_delta.contains_key(&"k"));
413 }
414
415 #[test]
416 fn iterate_entries() {
417 let mut m = AWMap::new(1);
418 m.insert("a", 1);
419 m.insert("b", 2);
420 m.insert("c", 3);
421 m.remove(&"b");
422
423 let keys: Vec<_> = m.keys().collect();
424 assert_eq!(keys, vec![&"a", &"c"]);
425 }
426}