crdts/map.rs
1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::collections::{BTreeMap, BTreeSet};
4use std::fmt::{self, Debug, Display};
5use std::hash::Hash;
6use std::mem;
7
8use serde::{Deserialize, Serialize};
9
10use crate::ctx::{AddCtx, ReadCtx, RmCtx};
11use crate::{CmRDT, CvRDT, Dot, ResetRemove, VClock};
12
13/// Val Trait alias to reduce redundancy in type decl.
14pub trait Val<A: Ord>: Clone + Default + ResetRemove<A> + CmRDT {}
15
16impl<A, T> Val<A> for T
17where
18 A: Ord,
19 T: Clone + Default + ResetRemove<A> + CmRDT,
20{
21}
22
23/// Map CRDT - Supports Composition of CRDT's with reset-remove semantics.
24///
25/// Reset-remove means that if one replica removes an entry while another
26/// actor concurrently edits that entry, once we sync these two maps, we
27/// will see that the entry is still in the map but all edits seen by the
28/// removing actor will be gone.
29///
30/// See examples/reset_remove.rs for an example of reset-remove semantics
31/// in action.
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct Map<K: Ord, V: Val<A>, A: Ord + Hash> {
34 // This clock stores the current version of the Map, it should
35 // be greator or equal to all Entry.clock's in the Map.
36 clock: VClock<A>,
37 entries: BTreeMap<K, Entry<V, A>>,
38 deferred: HashMap<VClock<A>, BTreeSet<K>>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42struct Entry<V: Val<A>, A: Ord> {
43 // The entry clock tells us which actors edited this entry.
44 clock: VClock<A>,
45
46 // The nested CRDT
47 val: V,
48}
49
50/// Operations which can be applied to the Map CRDT
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52pub enum Op<K: Ord, V: Val<A>, A: Ord> {
53 /// Remove a key from the map
54 Rm {
55 /// The clock under which we will perform this remove
56 clock: VClock<A>,
57 /// Key to remove
58 keyset: BTreeSet<K>,
59 },
60 /// Update an entry in the map
61 Up {
62 /// Actors version at the time of the update
63 dot: Dot<A>,
64 /// Key of the value to update
65 key: K,
66 /// The operation to apply on the value under `key`
67 op: V::Op,
68 },
69}
70
71impl<V: Val<A>, A: Ord> Default for Entry<V, A> {
72 fn default() -> Self {
73 Self {
74 clock: VClock::default(),
75 val: V::default(),
76 }
77 }
78}
79
80impl<K: Ord, V: Val<A>, A: Ord + Hash> Default for Map<K, V, A> {
81 fn default() -> Self {
82 Self {
83 clock: Default::default(),
84 entries: Default::default(),
85 deferred: Default::default(),
86 }
87 }
88}
89
90impl<K: Ord, V: Val<A>, A: Ord + Hash> ResetRemove<A> for Map<K, V, A> {
91 fn reset_remove(&mut self, clock: &VClock<A>) {
92 self.entries = mem::take(&mut self.entries)
93 .into_iter()
94 .filter_map(|(key, mut entry)| {
95 entry.clock.reset_remove(clock);
96 entry.val.reset_remove(clock);
97 if entry.clock.is_empty() {
98 None // remove this entry since its been forgotten
99 } else {
100 Some((key, entry))
101 }
102 })
103 .collect();
104
105 self.deferred = mem::take(&mut self.deferred)
106 .into_iter()
107 .filter_map(|(mut rm_clock, key)| {
108 rm_clock.reset_remove(clock);
109 if rm_clock.is_empty() {
110 None // this deferred remove has been forgotten
111 } else {
112 Some((rm_clock, key))
113 }
114 })
115 .collect();
116
117 self.clock.reset_remove(clock);
118 }
119}
120
121/// The various validation errors that may occur when using a Map CRDT.
122#[derive(Debug, PartialEq, Eq)]
123pub enum CmRDTValidation<V: CmRDT, A> {
124 /// We are missing dots specified in the DotRange
125 SourceOrder(crate::DotRange<A>),
126
127 /// There is a validation error in the nested CRDT.
128 Value(V::Validation),
129}
130
131impl<V: CmRDT + Debug, A: Debug> Display for CmRDTValidation<V, A> {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 Debug::fmt(&self, f)
134 }
135}
136
137impl<V: CmRDT + Debug, A: Debug> std::error::Error for CmRDTValidation<V, A> {}
138
139/// The various validation errors that may occur when using a Map CRDT.
140#[derive(Debug, PartialEq, Eq)]
141pub enum CvRDTValidation<K, V: CvRDT, A> {
142 /// We've detected that two different members were inserted with the same dot.
143 /// This can break associativity.
144 DoubleSpentDot {
145 /// The dot that was double spent
146 dot: Dot<A>,
147 /// Our member inserted with this dot
148 our_key: K,
149 /// Their member inserted with this dot
150 their_key: K,
151 },
152
153 /// There is a validation error in the nested CRDT.
154 Value(V::Validation),
155}
156
157impl<K: Debug, V: CvRDT + Debug, A: Debug> Display for CvRDTValidation<K, V, A> {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 Debug::fmt(&self, f)
160 }
161}
162
163impl<K: Debug, V: CvRDT + Debug, A: Debug> std::error::Error for CvRDTValidation<K, V, A> {}
164
165impl<K: Ord, V: Val<A> + Debug, A: Ord + Hash + Clone + Debug> CmRDT for Map<K, V, A> {
166 type Op = Op<K, V, A>;
167 type Validation = CmRDTValidation<V, A>;
168
169 fn validate_op(&self, op: &Self::Op) -> Result<(), Self::Validation> {
170 match op {
171 Op::Rm { .. } => Ok(()),
172 Op::Up { dot, key, op } => {
173 self.clock
174 .validate_op(dot)
175 .map_err(CmRDTValidation::SourceOrder)?;
176 let entry = self.entries.get(key).cloned().unwrap_or_default();
177 entry
178 .clock
179 .validate_op(dot)
180 .map_err(CmRDTValidation::SourceOrder)?;
181 entry.val.validate_op(op).map_err(CmRDTValidation::Value)
182 }
183 }
184 }
185
186 fn apply(&mut self, op: Self::Op) {
187 match op {
188 Op::Rm { clock, keyset } => self.apply_keyset_rm(keyset, clock),
189 Op::Up { dot, key, op } => {
190 if self.clock.get(&dot.actor) >= dot.counter {
191 // we've seen this op already
192 return;
193 }
194
195 let entry = self.entries.entry(key).or_default();
196
197 entry.clock.apply(dot.clone());
198 entry.val.apply(op);
199
200 self.clock.apply(dot);
201 self.apply_deferred();
202 }
203 }
204 }
205}
206
207impl<K: Ord + Clone + Debug, V: Val<A> + CvRDT + Debug, A: Ord + Hash + Clone + Debug> CvRDT
208 for Map<K, V, A>
209{
210 type Validation = CvRDTValidation<K, V, A>;
211
212 fn validate_merge(&self, other: &Self) -> Result<(), Self::Validation> {
213 for (key, entry) in self.entries.iter() {
214 for (other_key, other_entry) in other.entries.iter() {
215 for Dot { actor, counter } in entry.clock.iter() {
216 if other_key != key && other_entry.clock.get(actor) == counter {
217 return Err(CvRDTValidation::DoubleSpentDot {
218 dot: Dot::new(actor.clone(), counter),
219 our_key: key.clone(),
220 their_key: other_key.clone(),
221 });
222 }
223 }
224
225 if key == other_key && entry.clock.concurrent(&other_entry.clock) {
226 entry
227 .val
228 .validate_merge(&other_entry.val)
229 .map_err(CvRDTValidation::Value)?;
230 }
231 }
232 }
233
234 Ok(())
235 }
236
237 fn merge(&mut self, other: Self) {
238 self.entries = mem::take(&mut self.entries)
239 .into_iter()
240 .filter_map(|(key, mut entry)| {
241 if !other.entries.contains_key(&key) {
242 // other doesn't contain this entry because it:
243 // 1. has seen it and dropped it
244 // 2. hasn't seen it
245 if other.clock >= entry.clock {
246 // other has seen this entry and dropped it
247 None
248 } else {
249 // the other map has not seen this version of this
250 // entry, so add it. But first, we have to remove any
251 // information that may have been known at some point
252 // by the other map about this key and was removed.
253 entry.clock.reset_remove(&other.clock);
254 let mut removed_information = other.clock.clone();
255 removed_information.reset_remove(&entry.clock);
256 entry.val.reset_remove(&removed_information);
257 Some((key, entry))
258 }
259 } else {
260 Some((key, entry))
261 }
262 })
263 .collect();
264
265 for (key, mut entry) in other.entries {
266 if let Some(our_entry) = self.entries.get_mut(&key) {
267 // SUBTLE: this entry is present in both maps, BUT that doesn't mean we
268 // shouldn't drop it!
269 // Perfectly possible that an item in both sets should be dropped
270 let mut common = VClock::intersection(&entry.clock, &our_entry.clock);
271 common.merge(entry.clock.clone_without(&self.clock));
272 common.merge(our_entry.clock.clone_without(&other.clock));
273 if common.is_empty() {
274 // both maps had seen each others entry and removed them
275 self.entries.remove(&key).unwrap();
276 } else {
277 // we should not drop, as there is information still tracked in
278 // the common clock.
279 our_entry.val.merge(entry.val);
280
281 let mut information_that_was_deleted = entry.clock.clone();
282 information_that_was_deleted.merge(our_entry.clock.clone());
283 information_that_was_deleted.reset_remove(&common);
284 our_entry.val.reset_remove(&information_that_was_deleted);
285 our_entry.clock = common;
286 }
287 } else {
288 // we don't have this entry, is it because we:
289 // 1. have seen it and dropped it
290 // 2. have not seen it
291 if self.clock >= entry.clock {
292 // We've seen this entry and dropped it, we won't add it back
293 } else {
294 // We have not seen this version of this entry, so we add it.
295 // but first, we have to remove the information on this entry
296 // that we have seen and deleted
297 entry.clock.reset_remove(&self.clock);
298
299 let mut information_we_deleted = self.clock.clone();
300 information_we_deleted.reset_remove(&entry.clock);
301 entry.val.reset_remove(&information_we_deleted);
302 self.entries.insert(key, entry);
303 }
304 }
305 }
306
307 // merge deferred removals
308 for (rm_clock, keys) in other.deferred {
309 self.apply_keyset_rm(keys, rm_clock);
310 }
311
312 self.clock.merge(other.clock);
313
314 self.apply_deferred();
315 }
316}
317
318impl<K: Ord, V: Val<A>, A: Ord + Hash + Clone> Map<K, V, A> {
319 /// Constructs an empty Map
320 pub fn new() -> Self {
321 Default::default()
322 }
323
324 /// Returns true if the map has no entries, false otherwise
325 pub fn is_empty(&self) -> ReadCtx<bool, A> {
326 ReadCtx {
327 add_clock: self.clock.clone(),
328 rm_clock: self.clock.clone(),
329 val: self.entries.is_empty(),
330 }
331 }
332
333 /// Returns the number of entries in the Map
334 pub fn len(&self) -> ReadCtx<usize, A> {
335 ReadCtx {
336 add_clock: self.clock.clone(),
337 rm_clock: self.clock.clone(),
338 val: self.entries.len(),
339 }
340 }
341
342 /// Retrieve value stored under a key
343 pub fn get(&self, key: &K) -> ReadCtx<Option<V>, A> {
344 let add_clock = self.clock.clone();
345 let entry_opt = self.entries.get(key);
346 ReadCtx {
347 add_clock,
348 rm_clock: entry_opt
349 .map(|map_entry| map_entry.clock.clone())
350 .unwrap_or_default(),
351 val: entry_opt.map(|map_entry| map_entry.val.clone()),
352 }
353 }
354
355 /// Update a value under some key.
356 ///
357 /// If the key is not present in the map, the updater will be given the
358 /// result of `V::default()`. The `default` value is used to ensure
359 /// eventual consistency since our `Map`'s values are CRDTs themselves.
360 ///
361 /// The `impl Into<K>` bound provides a nice way of providing an input key that
362 /// can easily convert to the `Map`'s key. For example, we can call this function
363 /// with `"hello": &str` and it can be converted to `String`.
364 pub fn update<F>(&self, key: impl Into<K>, ctx: AddCtx<A>, f: F) -> Op<K, V, A>
365 where
366 F: FnOnce(&V, AddCtx<A>) -> V::Op,
367 {
368 let key = key.into();
369 let dot = ctx.dot.clone();
370 let op = match self.entries.get(&key).map(|e| &e.val) {
371 Some(data) => f(data, ctx),
372 None => f(&V::default(), ctx),
373 };
374
375 Op::Up { dot, key, op }
376 }
377
378 /// Remove an entry from the Map
379 ///
380 /// The `impl Into<K>` bound provides a nice way of providing an input key that
381 /// can easily convert to the `Map`'s key. For example, we can call this function
382 /// with `"hello": &str` and it can be converted to `String`.
383 pub fn rm(&self, key: impl Into<K>, ctx: RmCtx<A>) -> Op<K, V, A> {
384 let mut keyset = BTreeSet::new();
385 keyset.insert(key.into());
386 Op::Rm {
387 clock: ctx.clock,
388 keyset,
389 }
390 }
391
392 /// Retrieve the current read context
393 pub fn read_ctx(&self) -> ReadCtx<(), A> {
394 ReadCtx {
395 add_clock: self.clock.clone(),
396 rm_clock: self.clock.clone(),
397 val: (),
398 }
399 }
400
401 /// apply the pending deferred removes
402 fn apply_deferred(&mut self) {
403 let deferred = mem::take(&mut self.deferred);
404 for (clock, keys) in deferred {
405 self.apply_keyset_rm(keys, clock);
406 }
407 }
408
409 /// Apply a set of key removals given a clock.
410 fn apply_keyset_rm(&mut self, mut keyset: BTreeSet<K>, clock: VClock<A>) {
411 for key in keyset.iter() {
412 if let Some(entry) = self.entries.get_mut(key) {
413 entry.clock.reset_remove(&clock);
414 if entry.clock.is_empty() {
415 // The entry clock says we have no info on this entry.
416 // So remove the entry
417 self.entries.remove(key);
418 } else {
419 // The entry clock is not empty so this means we still
420 // have some information on this entry, keep it.
421 entry.val.reset_remove(&clock);
422 }
423 }
424 }
425
426 // now we need to decide wether we should be keeping this
427 // remove Op around to remove entries we haven't seen yet.
428 match self.clock.partial_cmp(&clock) {
429 None | Some(Ordering::Less) => {
430 // this remove clock has information we don't have,
431 // we need to log this in our deferred remove map, so
432 // that we can delete keys that we haven't seen yet but
433 // have been seen by this clock
434 let deferred_set = self.deferred.entry(clock).or_default();
435 deferred_set.append(&mut keyset);
436 }
437 _ => { /* we've seen all keys this clock has seen */ }
438 }
439 }
440
441 /// Gets an iterator over the keys of the `Map`.
442 ///
443 /// # Examples
444 ///
445 /// ```rust
446 /// use crdts::Map;
447 /// use crdts::MVReg;
448 /// use crdts::CmRDT;
449 ///
450 /// type Actor = &'static str;
451 /// type Key = &'static str;
452 ///
453 /// let actor = "actor";
454 ///
455 /// let mut map: Map<i32, MVReg<Key, Actor>, Actor> = Map::new();
456 ///
457 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
458 /// map.apply(map.update(100, add_ctx, |v, a| v.write("foo", a)));
459 ///
460 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
461 /// map.apply(map.update(50, add_ctx, |v, a| v.write("bar", a)));
462 ///
463 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
464 /// map.apply(map.update(200, add_ctx, |v, a| v.write("baz", a)));
465 ///
466 ///
467 /// let mut keys: Vec<_> = map.keys().map(|key_ctx| *key_ctx.val).collect();
468 ///
469 /// keys.sort();
470 ///
471 /// assert_eq!(keys, &[50, 100, 200]);
472 /// ```
473 pub fn keys(&self) -> impl Iterator<Item = ReadCtx<&K, A>> {
474 self.entries.iter().map(move |(k, v)| ReadCtx {
475 add_clock: self.clock.clone(),
476 rm_clock: v.clock.clone(),
477 val: k,
478 })
479 }
480
481 /// Gets an iterator over the values of the `Map`.
482 ///
483 /// # Examples
484 ///
485 /// ```rust
486 /// use crdts::Map;
487 /// use crdts::MVReg;
488 /// use crdts::CmRDT;
489 ///
490 /// type Actor = &'static str;
491 /// type Key = &'static str;
492 ///
493 /// let actor = "actor";
494 ///
495 /// let mut map: Map<i32, MVReg<Key, Actor>, Actor> = Map::new();
496 ///
497 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
498 /// map.apply(map.update(100, add_ctx, |v, a| v.write("foo", a)));
499 ///
500 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
501 /// map.apply(map.update(50, add_ctx, |v, a| v.write("bar", a)));
502 ///
503 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
504 /// map.apply(map.update(200, add_ctx, |v, a| v.write("baz", a)));
505 ///
506 ///
507 /// let mut values: Vec<_> = map
508 /// .values()
509 /// .map(|val_ctx| val_ctx.val.read().val[0])
510 /// .collect();
511 ///
512 /// values.sort();
513 ///
514 /// assert_eq!(values, &["bar", "baz", "foo"]);
515 /// ```
516 pub fn values(&self) -> impl Iterator<Item = ReadCtx<&V, A>> {
517 self.entries.values().map(move |v| ReadCtx {
518 add_clock: self.clock.clone(),
519 rm_clock: v.clock.clone(),
520 val: &v.val,
521 })
522 }
523
524 /// Gets an iterator over the entries of the `Map`.
525 ///
526 /// # Examples
527 ///
528 /// ```rust
529 /// use crdts::Map;
530 /// use crdts::MVReg;
531 /// use crdts::CmRDT;
532 ///
533 /// type Actor = &'static str;
534 /// type Key = &'static str;
535 ///
536 /// let actor = "actor";
537 ///
538 /// let mut map: Map<i32, MVReg<Key, Actor>, Actor> = Map::new();
539 ///
540 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
541 /// map.apply(map.update(100, add_ctx, |v, a| v.write("foo", a)));
542 ///
543 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
544 /// map.apply(map.update(50, add_ctx, |v, a| v.write("bar", a)));
545 ///
546 /// let add_ctx = map.read_ctx().derive_add_ctx(actor);
547 /// map.apply(map.update(200, add_ctx, |v, a| v.write("baz", a)));
548 ///
549 ///
550 /// let mut items: Vec<_> = map
551 /// .iter()
552 /// .map(|item_ctx| (*item_ctx.val.0, item_ctx.val.1.read().val[0]))
553 /// .collect();
554 ///
555 /// items.sort();
556 ///
557 /// assert_eq!(items, &[(50, "bar"), (100, "foo"), (200, "baz")]);
558 /// ```
559 pub fn iter(&self) -> impl Iterator<Item = ReadCtx<(&K, &V), A>> {
560 self.entries.iter().map(move |(k, v)| ReadCtx {
561 add_clock: self.clock.clone(),
562 rm_clock: v.clock.clone(),
563 val: (k, &v.val),
564 })
565 }
566}
567
568#[cfg(test)]
569mod test {
570 use super::*;
571 use crate::mvreg::{self, MVReg};
572 use crate::orswot::Orswot;
573
574 type TestActor = u8;
575 type TestKey = u8;
576 type TestVal = MVReg<u8, TestActor>;
577 type TestMap = Map<TestKey, Map<TestKey, TestVal, TestActor>, TestActor>;
578
579 #[test]
580 fn test_get() {
581 let mut m: TestMap = Map::new();
582
583 assert_eq!(m.get(&0).val, None);
584
585 m.clock.apply(m.clock.inc(1));
586
587 m.entries.insert(
588 0,
589 Entry {
590 clock: m.clock.clone(),
591 val: Map::default(),
592 },
593 );
594
595 assert_eq!(m.get(&0).val, Some(Map::new()));
596 }
597
598 #[test]
599 fn test_op_exchange_converges_quickcheck1() {
600 let op_actor1 = Op::Up {
601 dot: Dot::new(0, 3),
602 key: 9,
603 op: Op::Up {
604 dot: Dot::new(0, 3),
605 key: 0,
606 op: mvreg::Op::Put {
607 clock: Dot::new(0, 3).into(),
608 val: 0,
609 },
610 },
611 };
612 let op_1_actor2 = Op::Up {
613 dot: Dot::new(1, 1),
614 key: 9,
615 op: Op::Rm {
616 clock: Dot::new(1, 1).into(),
617 keyset: vec![0].into_iter().collect(),
618 },
619 };
620 let op_2_actor2 = Op::Rm {
621 clock: Dot::new(1, 2).into(),
622 keyset: vec![9].into_iter().collect(),
623 };
624
625 let mut m1: TestMap = Map::new();
626 let mut m2: TestMap = Map::new();
627
628 m1.apply(op_actor1.clone());
629 assert_eq!(m1.clock, Dot::new(0, 3).into());
630 assert_eq!(m1.entries[&9].clock, Dot::new(0, 3).into());
631 assert_eq!(m1.entries[&9].val.deferred.len(), 0);
632
633 m2.apply(op_1_actor2.clone());
634 m2.apply(op_2_actor2.clone());
635 assert_eq!(m2.clock, Dot::new(1, 1).into());
636 assert_eq!(m2.entries.get(&9), None);
637 assert_eq!(
638 m2.deferred.get(&Dot::new(1, 2).into()),
639 Some(&vec![9].into_iter().collect())
640 );
641
642 // m1 <- m2
643 m1.apply(op_1_actor2);
644 m1.apply(op_2_actor2);
645
646 // m2 <- m1
647 m2.apply(op_actor1);
648
649 // m1 <- m2 == m2 <- m1
650 assert_eq!(m1, m2);
651 }
652
653 #[test]
654 fn merge_error() {
655 let mut m1: Map<u8, Orswot<u8, u8>, u8> = Map {
656 clock: VClock::from(Dot::new(75, 1)),
657 entries: BTreeMap::new(),
658 deferred: HashMap::new(),
659 };
660
661 let mut m2: Map<u8, Orswot<u8, u8>, u8> = Map {
662 clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
663 entries: vec![(
664 101,
665 Entry {
666 clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
667 val: Orswot {
668 clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
669 entries: vec![
670 (1, VClock::from(Dot::new(75, 1))),
671 (2, VClock::from(Dot::new(93, 1))),
672 ]
673 .into_iter()
674 .collect(),
675 deferred: HashMap::new(),
676 },
677 },
678 )]
679 .into_iter()
680 .collect(),
681 deferred: HashMap::new(),
682 };
683
684 m1.merge(m2.clone());
685
686 assert_eq!(
687 m1,
688 Map {
689 clock: vec![Dot::new(75, 1), Dot::new(93, 1)].into_iter().collect(),
690 entries: vec![(
691 101,
692 Entry {
693 clock: Dot::new(93, 1).into(),
694 val: Orswot {
695 clock: vec![Dot::new(93, 1)].into_iter().collect(),
696 entries: vec![(2, VClock::from(Dot::new(93, 1)))]
697 .into_iter()
698 .collect(),
699 deferred: HashMap::new()
700 }
701 }
702 )]
703 .into_iter()
704 .collect(),
705 deferred: HashMap::new()
706 }
707 );
708
709 m2.merge(m1.clone());
710
711 assert_eq!(m1, m2);
712 }
713}