datacake_crdt/
orswot.rs

1use std::collections::btree_map::Entry;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt::Debug;
4use std::time::Duration;
5use std::{cmp, mem};
6
7#[cfg(feature = "rkyv-support")]
8use rkyv::{Archive, Deserialize, Serialize};
9
10use crate::timestamp::HLCTimestamp;
11
12pub type Key = u64;
13pub type StateChanges = Vec<(Key, HLCTimestamp)>;
14
15/// The period of time, to remove from the
16/// safe timestamp cut off.
17///
18/// This allows the system to essentially forgive some latency between events.
19///
20/// This is 1 hour by default.
21pub const FORGIVENESS_PERIOD: Duration = if cfg!(test) {
22    Duration::from_secs(0)
23} else {
24    Duration::from_secs(3_600)
25};
26
27#[cfg(feature = "rkyv")]
28#[derive(Debug, thiserror::Error)]
29#[error("The set cannot be (de)serialized from the provided set of bytes.")]
30/// The set cannot be (de)serialized to or from the byte buffer.
31pub struct BadState;
32
33#[derive(Debug, Clone)]
34#[repr(C)]
35#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
36#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
37pub struct NodeVersions<const N: usize> {
38    nodes_max_stamps: [BTreeMap<u8, HLCTimestamp>; N],
39    safe_last_stamps: BTreeMap<u8, HLCTimestamp>,
40}
41
42impl<const N: usize> Default for NodeVersions<N> {
43    fn default() -> Self {
44        let stamps_template = [(); N];
45
46        Self {
47            nodes_max_stamps: stamps_template.map(|_| BTreeMap::new()),
48            safe_last_stamps: BTreeMap::new(),
49        }
50    }
51}
52
53impl<const N: usize> NodeVersions<N> {
54    /// merges the current node versions with another node versions set.
55    fn merge(&mut self, other: NodeVersions<N>) {
56        let mut nodes = HashSet::new();
57        for (source, other_nodes) in other.nodes_max_stamps.into_iter().enumerate() {
58            let existing_nodes = &mut self.nodes_max_stamps[source];
59            for (node, ts) in other_nodes {
60                nodes.insert(node);
61                match existing_nodes.entry(node) {
62                    Entry::Occupied(mut entry) => {
63                        // We have already observed these events at some point from this node.
64                        // This means we can no longer trust that this key is in fact still valid.
65                        if &ts < entry.get() {
66                            continue;
67                        }
68
69                        entry.insert(ts);
70                    },
71                    Entry::Vacant(v) => {
72                        v.insert(ts);
73                    },
74                }
75            }
76        }
77
78        for node in nodes {
79            self.compute_safe_last_stamp(node);
80        }
81    }
82
83    /// Attempts to update the latest observed timestamp for a given source.
84    fn try_update_max_stamp(&mut self, source: usize, ts: HLCTimestamp) -> bool {
85        match self.nodes_max_stamps[source].entry(ts.node()) {
86            Entry::Occupied(mut entry) => {
87                // We have already observed these events at some point from this node.
88                // This means we can no longer trust that this key is in fact still valid.
89                if &ts < entry.get() {
90                    self.compute_safe_last_stamp(ts.node());
91                    return false;
92                }
93
94                entry.insert(ts);
95            },
96            Entry::Vacant(v) => {
97                v.insert(ts);
98            },
99        }
100
101        self.compute_safe_last_stamp(ts.node());
102
103        true
104    }
105
106    /// Computes the safe observed timestamp based off all known sources.
107    fn compute_safe_last_stamp(&mut self, node: u8) {
108        let min = self
109            .nodes_max_stamps
110            .iter()
111            .map(|stamps| {
112                stamps.get(&node).copied().unwrap_or_else(|| {
113                    HLCTimestamp::new(Duration::from_secs(0), 0, node)
114                })
115            })
116            .min();
117
118        if let Some(min) = min {
119            let ts = HLCTimestamp::new(
120                min.datacake_timestamp().saturating_sub(FORGIVENESS_PERIOD),
121                min.counter(),
122                min.node(),
123            );
124            self.safe_last_stamps.insert(node, ts);
125        }
126    }
127
128    /// Checks if a given timestamp happened before the last observed timestamp.
129    fn is_ts_before_last_observed_event(&self, ts: HLCTimestamp) -> bool {
130        self.safe_last_stamps
131            .get(&ts.node())
132            .map(|v| &ts < v)
133            .unwrap_or_default()
134    }
135}
136
137#[derive(Debug, Default, Clone)]
138#[repr(C)]
139#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
140#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
141/// A CRDT which supports purging of deleted entry tombstones.
142///
143/// This implementation is largely based on the Riak DB implementations
144/// of CRDTs. This set in particular is built around the [HLCTimestamp]
145/// which uses the uniqueness guarantees provided by the timestamp to
146/// resolve conflicts.
147///
148/// Entries can be marked as deleted via the standard `delete` method
149/// which internally marks the key as a tombstone.
150/// The tombstones can be purged safely once the set has observed other,
151/// newer operations from the original node which the entry is tied to.
152/// (This is tracked by checking the `node` field of the timestamp.)
153///
154/// ## Sources
155/// Sources are a way of separating two asynchronous operations which both guarantee the order
156/// of the operations is correct, but may not be ordered relative to each other.
157///
158/// I.e. if we have the following sources:
159///
160/// Source 1: [1, 2, 4, 5]
161/// Source 2: [2, 3, 8]
162///
163/// If we observe the operations from source `2` before source `1` or vice versa, we may miss
164/// operations if we assume one source, this is because of our observer pattern.
165///
166/// Sources allow us to negate this issue while still keeping the observer pattern.
167///
168/// ## Last write wins conditions
169/// If two operations occur at the same effective time, i.e. the `seconds` and `counter` are the
170/// same on two timestamps. The timestamp with the largest `node_id` wins.
171///
172/// Consistency is not guaranteed in the event that two operations with the same timestamp from the
173/// *same node* occur on the same key. This means that the node is not generating it's [HLCTimestamp]
174/// monotonically which is incorrect.
175///
176/// The set may converge in it's current state with the above situation, but this is not guaranteed
177/// or tested against. It is your responsibility to ensure that timestamps from the same node are
178/// monotonic (as ensured by [HLCTimestamp]'s `send` method.)
179///
180///
181/// ## Example
182/// ```
183/// use std::time::Duration;
184/// use datacake_crdt::{OrSWotSet, HLCTimestamp};
185///
186/// let mut node_a = HLCTimestamp::now(0, 0);
187///
188/// // Simulating a node begin slightly ahead.
189/// let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp() + Duration::from_secs(5), 0, 1);
190///
191/// // We have only 1 source.
192/// let mut node_a_set = OrSWotSet::<1>::default();
193/// let mut node_b_set = OrSWotSet::<1>::default();
194///
195/// // Insert a new key with a new timestamp in set A.
196/// node_a_set.insert(1, node_a.send().unwrap());
197///
198/// // Insert a new entry in set B.
199/// node_b_set.insert(2, node_b.send().unwrap());
200///
201/// // Set A has key `1` removed.
202/// node_a_set.delete(1, node_a.send().unwrap());
203///
204/// // Merging set B with set A and vice versa.
205/// // Our sets are now aligned without conflicts.
206/// node_b_set.merge(node_a_set.clone());
207/// node_a_set.merge(node_b_set.clone());
208///
209/// // Set A and B should both see that key `1` has been deleted.
210/// assert!(node_a_set.get(&1).is_none(), "Key should be correctly removed.");
211/// assert!(node_b_set.get(&1).is_none(), "Key should be correctly removed.");
212/// ```
213pub struct OrSWotSet<const N: usize = 1> {
214    entries: BTreeMap<Key, HLCTimestamp>,
215    dead: HashMap<Key, HLCTimestamp>,
216    versions: NodeVersions<N>,
217}
218
219impl<const N: usize> OrSWotSet<N> {
220    #[cfg(feature = "rkyv")]
221    /// Deserializes a [OrSWotSet] from a array of bytes.
222    pub fn from_bytes(data: &[u8]) -> Result<Self, BadState> {
223        let deserialized = rkyv::from_bytes::<Self>(data).map_err(|_| BadState)?;
224        Ok(deserialized)
225    }
226
227    #[cfg(feature = "rkyv")]
228    /// Serializes the set into a buffer of bytes.
229    pub fn as_bytes(&self) -> Result<Vec<u8>, BadState> {
230        Ok(rkyv::to_bytes::<_, 2048>(self)
231            .map_err(|_| BadState)?
232            .into_vec())
233    }
234
235    /// Calculates the deterministic difference between two sets, returning the
236    /// modified keys and the deleted keys.
237    ///
238    /// This follows the same logic as `set.merge(&other)` but does not modify
239    /// the state of the main set.
240    ///
241    /// NOTE:
242    ///     The difference is *not* the symmetric difference between the two sets.
243    pub fn diff(&self, other: &OrSWotSet<N>) -> (StateChanges, StateChanges) {
244        let mut changes = Vec::new();
245        let mut removals = Vec::new();
246
247        for (key, ts) in other.entries.iter() {
248            self.check_self_then_insert_to(*key, *ts, &mut changes);
249        }
250
251        for (key, ts) in other.dead.iter() {
252            self.check_self_then_insert_to(*key, *ts, &mut removals);
253        }
254
255        (changes, removals)
256    }
257
258    fn check_self_then_insert_to(
259        &self,
260        key: Key,
261        ts: HLCTimestamp,
262        values: &mut Vec<(Key, HLCTimestamp)>,
263    ) {
264        if let Some(existing_insert) = self.entries.get(&key) {
265            if existing_insert < &ts {
266                values.push((key, ts));
267            }
268        } else if let Some(existing_delete) = self.dead.get(&key) {
269            if existing_delete < &ts {
270                values.push((key, ts));
271            }
272        } else if !self.versions.is_ts_before_last_observed_event(ts) {
273            values.push((key, ts))
274        }
275    }
276
277    /// Merges another set with the current set.
278    ///
279    /// In this case any conflicts are deterministically resolved via the key's [HLCTimestamp]
280    /// any deletes are tracked or ignored depending on this timestamp due to the nature
281    /// of the ORSWOT CRDT.
282    pub fn merge(&mut self, other: OrSWotSet<N>) {
283        let base_entries = other.entries.into_iter().map(|(k, ts)| (k, ts, false));
284
285        let remote_versions = other.versions;
286        let mut entries_log = Vec::from_iter(base_entries);
287        entries_log.extend(other.dead.into_iter().map(|(k, ts)| (k, ts, true)));
288
289        // It's important we go in time/event order. Otherwise we may incorrectly merge the set.
290        entries_log.sort_by_key(|v| v.1);
291
292        let mut old_entries = mem::take(&mut self.entries);
293
294        for (key, ts, is_delete) in entries_log {
295            // We've already observed the operation.
296            if is_delete && self.versions.is_ts_before_last_observed_event(ts) {
297                continue;
298            }
299
300            if is_delete {
301                if let Some(entry) = self.entries.remove(&key) {
302                    if ts < entry {
303                        self.entries.insert(key, entry);
304                        continue;
305                    }
306                }
307
308                self.dead
309                    .entry(key)
310                    .and_modify(|v| {
311                        (*v) = cmp::max(*v, ts);
312                    })
313                    .or_insert_with(|| ts);
314
315                continue;
316            }
317
318            let mut timestamp = ts;
319
320            // If our own entry is newer, we use that.
321            if let Some(existing_ts) = old_entries.remove(&key) {
322                timestamp = cmp::max(timestamp, existing_ts);
323            }
324
325            // Have we already marked the document as dead. And if so, is it newer than this op?
326            if let Some(deleted_ts) = self.dead.remove(&key) {
327                if timestamp < deleted_ts {
328                    self.dead.insert(key, deleted_ts);
329                    continue;
330                }
331            }
332
333            self.entries.insert(key, timestamp);
334        }
335
336        // The remaining entries in our map are either entries which we need to remove,
337        // or entries that the other node is currently missing.
338        for (key, ts) in old_entries {
339            // We've observed all the events upto and beyond this timestamp.
340            // We can rely on this check to see if we delete or keep the entry,
341            // as this is a `<` bounds check rather than `<=`. Which means
342            // if the entry happens to have been the most recent event observed, it won't
343            // be `true` and therefore be kept.
344            if remote_versions.is_ts_before_last_observed_event(ts) {
345                continue;
346            }
347
348            if let Some(deleted) = self.dead.remove(&key) {
349                if ts < deleted {
350                    self.dead.insert(key, deleted);
351                    continue;
352                }
353            }
354
355            self.entries.insert(key, ts);
356        }
357
358        self.versions.merge(remote_versions);
359    }
360
361    /// Get an entry from the set.
362    ///
363    /// If the entry exists it's associated [HLCTimestamp] is returned.
364    pub fn get(&self, k: &Key) -> Option<&HLCTimestamp> {
365        self.entries.get(k)
366    }
367
368    /// Purges and returns any safe to remove tombstone markers from the set.
369    ///
370    /// This is useful for conserving memory and preventing an infinitely
371    /// growing tombstone state.
372    pub fn purge_old_deletes(&mut self) -> StateChanges {
373        let mut deleted_keys = vec![];
374        for (k, stamp) in mem::take(&mut self.dead) {
375            if !self.versions.is_ts_before_last_observed_event(stamp) {
376                self.dead.insert(k, stamp);
377            } else {
378                deleted_keys.push((k, stamp));
379            }
380        }
381
382        deleted_keys
383    }
384
385    /// Adds a set of tombstones to the state tombstone list.
386    ///
387    /// This bypasses the observation timestamp check, which is useful
388    /// for replaying purges which may not have been successful.
389    ///
390    /// WARNING:
391    /// If you do not know where to use this, you do not wan't to use this.
392    pub fn add_raw_tombstones(&mut self, tombstones: StateChanges) {
393        for (key, stamp) in tombstones {
394            self.dead.insert(key, stamp);
395        }
396    }
397
398    /// Checks if the given operation will be applied if it is in fact carried out.
399    pub fn will_apply(&self, key: Key, ts: HLCTimestamp) -> bool {
400        if self.versions.is_ts_before_last_observed_event(ts) {
401            return false;
402        }
403
404        if let Some(entry) = self.entries.get(&key) {
405            return entry < &ts;
406        }
407
408        if let Some(entry) = self.dead.get(&key) {
409            return entry < &ts;
410        }
411
412        true
413    }
414
415    /// Insert a key into the set with a given timestamp.
416    ///
417    /// If the set has already observed events from the timestamp's
418    /// node, this operation is ignored. It is otherwise inserted.
419    ///
420    /// Returns if the value has actually been inserted/updated
421    /// in the set. If `false`, the set's state has not changed.
422    pub fn insert(&mut self, k: Key, ts: HLCTimestamp) -> bool {
423        self.insert_with_source(0, k, ts)
424    }
425
426    /// Insert a key into the set with a given timestamp with a source.
427    ///
428    /// If the set has already observed events from the timestamp's
429    /// node, this operation is ignored. It is otherwise inserted.
430    ///
431    /// Returns if the value has actually been inserted/updated
432    /// in the set. If `false`, the set's state has not changed.
433    pub fn insert_with_source(
434        &mut self,
435        source: usize,
436        k: Key,
437        ts: HLCTimestamp,
438    ) -> bool {
439        debug_assert!(source < N);
440
441        let mut has_set = false;
442
443        if !self.versions.try_update_max_stamp(source, ts) {
444            return has_set;
445        }
446
447        if let Some(deleted_ts) = self.dead.remove(&k) {
448            // Our deleted timestamp is newer, so we don't want to adjust our markings.
449            if ts < deleted_ts {
450                self.dead.insert(k, deleted_ts);
451                return has_set;
452            }
453        }
454
455        self.entries
456            .entry(k)
457            .and_modify(|v| {
458                if *v < ts {
459                    has_set = true;
460                    (*v) = ts;
461                }
462            })
463            .or_insert_with(|| {
464                has_set = true;
465                ts
466            });
467
468        has_set
469    }
470
471    /// Attempts to remove a key from the set with a given timestamp.
472    ///
473    /// If the set has already observed events from the timestamp's
474    /// node, this operation is ignored. It is otherwise inserted.
475    ///
476    /// Returns if the value has actually been inserted/updated
477    /// in the set. If `false`, the set's state has not changed.
478    pub fn delete(&mut self, k: Key, ts: HLCTimestamp) -> bool {
479        self.delete_with_source(0, k, ts)
480    }
481
482    /// Attempts to remove a key from the set with a given timestamp.
483    ///
484    /// If the set has already observed events from the timestamp's
485    /// node, this operation is ignored. It is otherwise inserted.
486    ///
487    /// Returns if the value has actually been inserted/updated
488    /// in the set. If `false`, the set's state has not changed.
489    pub fn delete_with_source(
490        &mut self,
491        source: usize,
492        k: Key,
493        ts: HLCTimestamp,
494    ) -> bool {
495        debug_assert!(source < N);
496
497        let mut has_set = false;
498
499        if !self.versions.try_update_max_stamp(source, ts) {
500            return has_set;
501        }
502
503        if let Some(existing_ts) = self.entries.remove(&k) {
504            // Our deleted timestamp is newer, so we don't want to adjust our markings.
505            // Inserts *always* win on conflicting timestamps.
506            if ts <= existing_ts {
507                self.entries.insert(k, existing_ts);
508                return has_set;
509            }
510        }
511
512        self.dead
513            .entry(k)
514            .and_modify(|v| {
515                if *v < ts {
516                    has_set = true;
517                    (*v) = ts;
518                }
519            })
520            .or_insert_with(|| {
521                has_set = true;
522                ts
523            });
524
525        has_set
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use std::time::Duration;
532
533    use super::*;
534
535    #[test]
536    fn test_op_order() {
537        let mut node_a = HLCTimestamp::now(0, 0);
538        let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
539
540        let mut node_a_set = OrSWotSet::<1>::default();
541
542        let ts_a = node_a.send().unwrap();
543        let ts_b = node_b.send().unwrap();
544
545        node_a_set.insert(1, ts_a);
546        node_a_set.insert(1, ts_b);
547
548        let retrieved = node_a_set.get(&1);
549        assert_eq!(retrieved, Some(&ts_b), "Node B should win the operation.");
550
551        let mut node_a_set = OrSWotSet::<1>::default();
552        let mut node_b_set = OrSWotSet::<1>::default();
553
554        node_a_set.insert(1, ts_a);
555        node_b_set.insert(1, ts_b);
556
557        node_a_set.merge(node_b_set.clone());
558        node_b_set.merge(node_a_set.clone());
559
560        let retrieved = node_a_set.get(&1);
561        assert_eq!(
562            retrieved,
563            Some(&ts_b),
564            "Node B should win the operation after merging set A."
565        );
566
567        let retrieved = node_b_set.get(&1);
568        assert_eq!(
569            retrieved,
570            Some(&ts_b),
571            "Node B should win the operation after merging set B."
572        );
573    }
574
575    #[test]
576    fn test_basic_insert_merge() {
577        let mut node_a = HLCTimestamp::now(0, 0);
578        let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
579
580        // We create our new set for node a.
581        let mut node_a_set = OrSWotSet::<1>::default();
582
583        // We add a new set of entries into our set.
584        node_a_set.insert(1, node_a.send().unwrap());
585        node_a_set.insert(2, node_a.send().unwrap());
586        node_a_set.insert(3, node_a.send().unwrap());
587
588        // We create our new state on node b's side.
589        let mut node_b_set = OrSWotSet::<1>::default();
590
591        // We add a new set of entries into our set.
592        node_b_set.insert(1, node_b.send().unwrap());
593        node_b_set.insert(4, node_b.send().unwrap());
594
595        node_a_set.merge(node_b_set);
596
597        assert!(
598            node_a_set.dead.is_empty(),
599            "Expected no entries to be marked as dead."
600        );
601
602        assert!(
603            node_a_set.entries.get(&1).is_some(),
604            "Expected entry with key 1 to exist."
605        );
606        assert!(
607            node_a_set.entries.get(&2).is_some(),
608            "Expected entry with key 2 to exist."
609        );
610        assert!(
611            node_a_set.entries.get(&3).is_some(),
612            "Expected entry with key 3 to exist."
613        );
614        assert!(
615            node_a_set.entries.get(&4).is_some(),
616            "Expected entry with key 4 to exist."
617        );
618    }
619
620    #[test]
621    fn test_same_time_conflict_convergence() {
622        let mut node_a = HLCTimestamp::now(0, 0);
623        let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
624
625        // We create our new set for node a.
626        let mut node_a_set = OrSWotSet::<1>::default();
627
628        // We add a new set of entries into our set.
629        // It's important that our `3` key is first here, as it means the counter
630        // of the HLC timestamp will mean the delete succeeds.
631        node_a_set.insert(3, node_a.send().unwrap());
632        node_a_set.insert(1, node_a.send().unwrap());
633        node_a_set.insert(2, node_a.send().unwrap());
634
635        // We create our new state on node b's side.
636        let mut node_b_set = OrSWotSet::<1>::default();
637
638        // We add a new set of entries into our set.
639        // These entries effectively happen at the same time as node A in our test, just because
640        // of the execution speed.
641        node_b_set.insert(1, node_b.send().unwrap());
642        node_b_set.delete(3, node_b.send().unwrap());
643
644        // When merged, the set should mark key `3` as deleted
645        // and ignore the insert on the original set.
646        node_a_set.merge(node_b_set.clone());
647
648        assert!(
649            node_a_set.dead.contains_key(&3),
650            "SET A: Expected key 3 to be marked as dead."
651        );
652
653        assert!(
654            node_a_set.entries.get(&1).is_some(),
655            "SET A: Expected entry with key 1 to exist."
656        );
657        assert!(
658            node_a_set.entries.get(&2).is_some(),
659            "SET A: Expected entry with key 2 to exist."
660        );
661        assert!(
662            node_a_set.entries.get(&3).is_none(),
663            "SET A: Expected entry with key 3 to NOT exist."
664        );
665
666        // If we now merge set A with set B. They should align.
667        node_b_set.merge(node_a_set);
668
669        assert!(
670            node_b_set.dead.contains_key(&3),
671            "SET B: Expected key 3 to be marked as dead."
672        );
673
674        assert!(
675            node_b_set.entries.get(&1).is_some(),
676            "SET B: Expected entry with key 1 to exist."
677        );
678        assert!(
679            node_b_set.entries.get(&2).is_some(),
680            "SET B: Expected entry with key 2 to exist."
681        );
682        assert!(
683            node_b_set.entries.get(&3).is_none(),
684            "SET B: Expected entry with key 3 to NOT exist."
685        );
686    }
687
688    #[test]
689    fn test_basic_delete_merge() {
690        let mut node_a = HLCTimestamp::now(0, 0);
691        let mut node_b = HLCTimestamp::new(
692            node_a.datacake_timestamp() + Duration::from_secs(1),
693            0,
694            1,
695        );
696
697        // We create our new set for node a.
698        let mut node_a_set = OrSWotSet::<1>::default();
699
700        // We add a new set of entries into our set.
701        node_a_set.insert(1, node_a.send().unwrap());
702        node_a_set.insert(2, node_a.send().unwrap());
703        node_a_set.insert(3, node_a.send().unwrap());
704
705        // We create our new state on node b's side.
706        let mut node_b_set = OrSWotSet::<1>::default();
707
708        // We add a new set of entries into our set.
709        node_b_set.insert(1, node_b.send().unwrap());
710        node_b_set.delete(3, node_b.send().unwrap());
711
712        node_a_set.merge(node_b_set.clone());
713
714        assert!(
715            node_a_set.dead.contains_key(&3),
716            "Expected key 3 to be marked as dead."
717        );
718
719        assert!(
720            node_a_set.entries.get(&1).is_some(),
721            "Expected entry with key 1 to exist."
722        );
723        assert!(
724            node_a_set.entries.get(&2).is_some(),
725            "Expected entry with key 2 to exist."
726        );
727        assert!(
728            node_a_set.entries.get(&3).is_none(),
729            "Expected entry with key 3 to NOT exist."
730        );
731    }
732
733    #[test]
734    fn test_purge_delete_merge() {
735        let mut node_a = HLCTimestamp::now(0, 0);
736        let mut node_b = HLCTimestamp::new(
737            node_a.datacake_timestamp() + Duration::from_secs(1),
738            0,
739            1,
740        );
741
742        // We create our new set for node a.
743        let mut node_a_set = OrSWotSet::<1>::default();
744
745        // We add a new set of entries into our set.
746        node_a_set.insert(1, node_a.send().unwrap());
747        node_a_set.insert(2, node_a.send().unwrap());
748        node_a_set.insert(3, node_a.send().unwrap());
749
750        // We create our new state on node b's side.
751        let mut node_b_set = OrSWotSet::<1>::default();
752
753        // We add a new set of entries into our set.
754        node_b_set.insert(1, node_b.send().unwrap());
755        node_b_set.delete(3, node_b.send().unwrap());
756
757        node_a_set.merge(node_b_set.clone());
758
759        node_a_set.insert(4, node_a.send().unwrap());
760
761        // We must observe another event from node b.
762        node_b_set.insert(4, node_b.send().unwrap());
763        node_a_set.merge(node_b_set.clone());
764
765        node_a_set.purge_old_deletes();
766
767        assert!(
768            node_a_set.dead.is_empty(),
769            "Expected dead entries to be empty."
770        );
771
772        assert!(
773            node_a_set.entries.get(&1).is_some(),
774            "Expected entry with key 1 to exist."
775        );
776        assert!(
777            node_a_set.entries.get(&2).is_some(),
778            "Expected entry with key 2 to exist."
779        );
780        assert!(
781            node_a_set.entries.get(&3).is_none(),
782            "Expected entry with key 3 to NOT exist."
783        );
784        assert!(
785            node_a_set.entries.get(&4).is_some(),
786            "Expected entry with key 4 to exist."
787        );
788    }
789
790    #[test]
791    fn test_purge_some_entries() {
792        let mut node_a = HLCTimestamp::now(0, 0);
793        let mut node_b = HLCTimestamp::new(
794            node_a.datacake_timestamp() + Duration::from_secs(1),
795            0,
796            1,
797        );
798
799        // We create our new set for node a.
800        let mut node_a_set = OrSWotSet::<1>::default();
801
802        // We add a new set of entries into our set.
803        node_a_set.insert(1, node_a.send().unwrap());
804        node_a_set.insert(2, node_a.send().unwrap());
805        node_a_set.insert(3, node_a.send().unwrap());
806
807        std::thread::sleep(Duration::from_millis(1));
808
809        // We create our new state on node b's side.
810        let mut node_b_set = OrSWotSet::<1>::default();
811
812        // We add a new set of entries into our set.
813        node_b_set.insert(1, node_b.send().unwrap());
814        node_b_set.delete(3, node_b.send().unwrap());
815
816        node_a_set.merge(node_b_set.clone());
817
818        node_a_set.insert(4, node_a.send().unwrap());
819
820        // Delete entry 2 from set a.
821        node_a_set.delete(2, node_a.send().unwrap());
822
823        // 'observe' a new op happening from node a.
824        node_a_set.insert(5, node_a.send().unwrap());
825
826        node_a_set.merge(node_b_set.clone());
827
828        // We expect our deletion of key `2` to be removed, but not key `3`
829        node_a_set.purge_old_deletes();
830
831        node_b_set.merge(node_a_set.clone());
832        node_a_set.purge_old_deletes();
833
834        assert!(
835            node_a_set.dead.get(&3).is_some(),
836            "SET A: Expected key 3 to be left in dead set."
837        );
838        assert!(
839            node_a_set.dead.get(&2).is_none(),
840            "SET A: Expected key 2 to be purged from dead set."
841        );
842
843        assert!(
844            node_a_set.entries.get(&1).is_some(),
845            "SET A: Expected entry with key 1 to exist."
846        );
847        assert!(
848            node_a_set.entries.get(&2).is_none(),
849            "SET A: Expected entry with key 2 to exist."
850        );
851        assert!(
852            node_a_set.entries.get(&3).is_none(),
853            "SET A: Expected entry with key 3 to NOT exist."
854        );
855        assert!(
856            node_a_set.entries.get(&4).is_some(),
857            "SET A: Expected entry with key 4 to exist."
858        );
859
860        assert!(
861            node_b_set.dead.get(&3).is_some(),
862            "SET B: Expected key 3 to be left in dead set."
863        );
864        assert!(
865            node_b_set.dead.get(&2).is_none(),
866            "SET B: Expected key 2 to be purged from dead set."
867        );
868
869        assert!(
870            node_b_set.entries.get(&1).is_some(),
871            "SET B: Expected entry with key 1 to exist."
872        );
873        assert!(
874            node_b_set.entries.get(&2).is_none(),
875            "SET B: Expected entry with key 2 to exist."
876        );
877        assert!(
878            node_b_set.entries.get(&3).is_none(),
879            "SET B: Expected entry with key 3 to NOT exist."
880        );
881        assert!(
882            node_b_set.entries.get(&4).is_some(),
883            "SET B: Expected entry with key 4 to exist."
884        );
885    }
886
887    #[test]
888    fn test_insert_no_op() {
889        let mut node_a = HLCTimestamp::now(0, 0);
890        let old_ts = node_a.send().unwrap();
891
892        // We create our new set for node a.
893        let mut node_a_set = OrSWotSet::<1>::default();
894
895        let did_add = node_a_set.insert(1, node_a.send().unwrap());
896        assert!(did_add, "Expected entry insert to be added.");
897
898        let did_add = node_a_set.insert(1, old_ts);
899        assert!(
900            !did_add,
901            "Expected entry insert with old timestamp to be ignored"
902        );
903    }
904
905    #[test]
906    fn test_delete_no_op() {
907        let mut node_a = HLCTimestamp::now(0, 0);
908        let old_ts = node_a.send().unwrap();
909
910        // We create our new set for node a.
911        let mut node_a_set = OrSWotSet::<1>::default();
912
913        let did_add = node_a_set.insert(1, node_a.send().unwrap());
914        assert!(did_add, "Expected entry insert to be added.");
915
916        let did_add = node_a_set.delete(1, old_ts);
917        assert!(
918            !did_add,
919            "Expected entry delete with old timestamp to be ignored"
920        );
921    }
922
923    #[test]
924    fn test_set_diff() {
925        let mut node_a = HLCTimestamp::now(0, 0);
926        let mut node_b = HLCTimestamp::new(
927            node_a.datacake_timestamp() + Duration::from_secs(5),
928            0,
929            1,
930        );
931
932        let mut node_a_set = OrSWotSet::<1>::default();
933        let mut node_b_set = OrSWotSet::<1>::default();
934
935        let insert_ts_1 = node_a.send().unwrap();
936        node_a_set.insert(1, insert_ts_1);
937
938        let (changed, removed) = OrSWotSet::<1>::default().diff(&node_a_set);
939        assert_eq!(
940            changed,
941            vec![(1, insert_ts_1)],
942            "Expected set diff to contain key `1`."
943        );
944        assert!(
945            removed.is_empty(),
946            "Expected there to be no difference between sets."
947        );
948
949        let delete_ts_3 = node_a.send().unwrap();
950        node_a_set.delete(3, delete_ts_3);
951
952        let insert_ts_2 = node_b.send().unwrap();
953        node_b_set.insert(2, insert_ts_2);
954
955        let (changed, removed) = node_a_set.diff(&node_b_set);
956
957        assert_eq!(
958            changed,
959            vec![(2, insert_ts_2)],
960            "Expected set a to only be marked as missing key `2`"
961        );
962        assert!(
963            removed.is_empty(),
964            "Expected set a to not be missing any delete markers."
965        );
966
967        let (changed, removed) = node_b_set.diff(&node_a_set);
968        assert_eq!(
969            changed,
970            vec![(1, insert_ts_1)],
971            "Expected set b to have key `1` marked as changed."
972        );
973        assert_eq!(
974            removed,
975            vec![(3, delete_ts_3)],
976            "Expected set b to have key `3` marked as deleted."
977        );
978    }
979
980    #[test]
981    fn test_set_diff_with_conflicts() {
982        let mut node_a = HLCTimestamp::now(0, 0);
983        let mut node_b = HLCTimestamp::new(
984            node_a.datacake_timestamp() + Duration::from_secs(5),
985            0,
986            1,
987        );
988
989        let mut node_a_set = OrSWotSet::<1>::default();
990        let mut node_b_set = OrSWotSet::<1>::default();
991
992        // This should get overriden by node b.
993        node_a_set.insert(1, node_a.send().unwrap());
994        node_a_set.insert(2, node_a.send().unwrap());
995
996        std::thread::sleep(Duration::from_millis(500));
997
998        let delete_ts_3 = node_a.send().unwrap();
999        node_a_set.delete(3, delete_ts_3);
1000
1001        let insert_ts_2 = node_b.send().unwrap();
1002        node_b_set.insert(2, insert_ts_2);
1003
1004        let insert_ts_1 = node_b.send().unwrap();
1005        node_b_set.insert(1, insert_ts_1);
1006
1007        let (changed, removed) = node_a_set.diff(&node_b_set);
1008
1009        assert_eq!(
1010            changed,
1011            vec![(1, insert_ts_1), (2, insert_ts_2)],
1012            "Expected set a to be marked as updating keys `1, 2`"
1013        );
1014        assert!(
1015            removed.is_empty(),
1016            "Expected set a to not be missing any delete markers."
1017        );
1018
1019        let (changed, removed) = node_b_set.diff(&node_a_set);
1020        assert_eq!(
1021            changed,
1022            vec![],
1023            "Expected set b to have no changed keys marked."
1024        );
1025        assert_eq!(
1026            removed,
1027            vec![(3, delete_ts_3)],
1028            "Expected set b to have key `3` marked as deleted."
1029        );
1030    }
1031
1032    #[test]
1033    fn test_tie_breakers() {
1034        let node_a = HLCTimestamp::now(0, 0);
1035        let node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
1036
1037        let mut node_a_set = OrSWotSet::<1>::default();
1038        let mut node_b_set = OrSWotSet::<1>::default();
1039
1040        // This delete conflicts with the insert timestamp.
1041        // We expect node with the biggest ID to win.
1042        node_a_set.insert(1, node_a);
1043        node_b_set.delete(1, node_b);
1044
1045        let (changed, removed) = node_a_set.diff(&node_b_set);
1046        assert_eq!(changed, vec![]);
1047        assert_eq!(removed, vec![(1, node_b)]);
1048
1049        let (changed, removed) = node_b_set.diff(&node_a_set);
1050        assert_eq!(changed, vec![]);
1051        assert_eq!(removed, vec![]);
1052
1053        node_a_set.merge(node_b_set.clone());
1054        node_b_set.merge(node_a_set.clone());
1055
1056        assert!(
1057            node_a_set.get(&1).is_none(),
1058            "Set a should no longer have key 1."
1059        );
1060        assert!(node_b_set.get(&1).is_none(), "Set b should not have key 1.");
1061
1062        let (changed, removed) = node_b_set.diff(&node_a_set);
1063        assert_eq!(changed, vec![]);
1064        assert_eq!(removed, vec![]);
1065
1066        let (changed, removed) = node_a_set.diff(&node_b_set);
1067        assert_eq!(changed, vec![]);
1068        assert_eq!(removed, vec![]);
1069
1070        let has_changed = node_a_set.insert(1, node_a);
1071        assert!(!has_changed, "Set a should not insert the value.");
1072        let has_changed = node_b_set.insert(1, node_a);
1073        assert!(
1074            !has_changed,
1075            "Set b should not insert the value with node a's timestamp."
1076        );
1077        let has_changed = node_a_set.insert(1, node_b);
1078        assert!(
1079            has_changed,
1080            "Set a should insert the value with node b's timestamp."
1081        );
1082        let has_changed = node_b_set.insert(1, node_b);
1083        assert!(has_changed, "Set b should insert the value.");
1084
1085        let mut node_a_set = OrSWotSet::<1>::default();
1086        let mut node_b_set = OrSWotSet::<1>::default();
1087
1088        // This delete conflicts with the insert timestamp.
1089        // We expect node with the biggest ID to win.
1090        node_a_set.delete(1, node_a);
1091        node_b_set.insert(1, node_b);
1092
1093        let (changed, removed) = node_a_set.diff(&node_b_set);
1094        assert_eq!(changed, vec![(1, node_b)]);
1095        assert_eq!(removed, vec![]);
1096
1097        let (changed, removed) = node_b_set.diff(&node_a_set);
1098        assert_eq!(changed, vec![]);
1099        assert_eq!(removed, vec![]);
1100
1101        node_a_set.merge(node_b_set.clone());
1102        node_b_set.merge(node_a_set.clone());
1103
1104        assert!(
1105            node_a_set.get(&1).is_some(),
1106            "Set a should no longer have key 1."
1107        );
1108        assert!(node_b_set.get(&1).is_some(), "Set b should not have key 1.");
1109    }
1110
1111    #[test]
1112    fn test_multi_source_handling() {
1113        let mut clock = HLCTimestamp::now(0, 0);
1114        let mut node_set = OrSWotSet::<1>::default();
1115
1116        // A basic example of the purging system.
1117        node_set.insert_with_source(0, 1, clock.send().unwrap());
1118
1119        node_set.delete_with_source(0, 1, clock.send().unwrap());
1120
1121        node_set.insert_with_source(0, 3, clock.send().unwrap());
1122        node_set.insert_with_source(0, 4, clock.send().unwrap());
1123
1124        // Since we're only using one source here, we should be able to safely purge key `1`.
1125        let purged = node_set
1126            .purge_old_deletes()
1127            .into_iter()
1128            .map(|(key, _)| key)
1129            .collect::<Vec<_>>();
1130        assert_eq!(purged, vec![1]);
1131
1132        let mut node_set = OrSWotSet::<2>::default();
1133
1134        // Insert a new entry from source `1` and `0`.
1135        node_set.insert_with_source(0, 1, clock.send().unwrap());
1136        node_set.insert_with_source(1, 2, clock.send().unwrap());
1137
1138        // Delete an entry from the set. (Mark it as a tombstone.)
1139        node_set.delete_with_source(0, 1, clock.send().unwrap());
1140
1141        // Effectively 'observe' a new set of changes.
1142        node_set.insert_with_source(0, 3, clock.send().unwrap());
1143        node_set.insert_with_source(0, 4, clock.send().unwrap());
1144
1145        // No keys should be purged, because source `1` has not changed it's last
1146        // observed timestamp, which means the system cannot guarantee that it is safe
1147        // to remove the entry.
1148        let purged = node_set.purge_old_deletes();
1149        assert!(purged.is_empty());
1150
1151        // Our other source has also now observed a new timestamp.
1152        node_set.insert_with_source(1, 3, clock.send().unwrap());
1153
1154        // We should now have successfully removed the key.
1155        let purged = node_set
1156            .purge_old_deletes()
1157            .into_iter()
1158            .map(|(key, _)| key)
1159            .collect::<Vec<_>>();
1160        assert_eq!(purged, vec![1]);
1161
1162        let old_ts = clock.send().unwrap();
1163        let initial_ts = clock.send().unwrap();
1164
1165        // Deletes from one source shouldn't affect deletes from the other.
1166        assert!(node_set.delete_with_source(0, 4, initial_ts));
1167        assert!(node_set.delete_with_source(1, 3, old_ts));
1168        assert!(!node_set.delete_with_source(0, 3, old_ts));
1169
1170        assert!(node_set.insert_with_source(0, 5, initial_ts));
1171        assert!(node_set.insert_with_source(1, 6, old_ts));
1172        assert!(!node_set.insert_with_source(0, 5, old_ts));
1173
1174        assert!(node_set.insert_with_source(0, 6, initial_ts));
1175        assert!(node_set.delete_with_source(1, 4, clock.send().unwrap()));
1176        assert!(node_set.delete_with_source(0, 3, clock.send().unwrap()));
1177        assert!(!node_set.delete_with_source(1, 4, initial_ts));
1178    }
1179
1180    #[test]
1181    fn test_will_apply() {
1182        let ts = Duration::from_secs(1);
1183        let mut node_set = OrSWotSet::<1>::default();
1184
1185        assert!(node_set.will_apply(1, HLCTimestamp::new(ts, 0, 0)));
1186        node_set.insert(1, HLCTimestamp::new(ts, 0, 0));
1187        assert!(!node_set.will_apply(1, HLCTimestamp::new(ts, 0, 0)));
1188
1189        assert!(node_set.will_apply(3, HLCTimestamp::new(Duration::from_secs(3), 0, 0)));
1190        node_set.delete(3, HLCTimestamp::new(Duration::from_secs(5), 0, 0));
1191        assert!(!node_set.will_apply(3, HLCTimestamp::new(Duration::from_secs(4), 0, 0)));
1192    }
1193}