1use std::collections::btree_map::Entry;
2use std::collections::{BTreeMap, HashMap, HashSet};
3use std::fmt::Debug;
4use std::time::Duration;
5use std::{cmp, mem};
6
7#[cfg(feature = "rkyv-support")]
8use rkyv::{Archive, Deserialize, Serialize};
9
10use crate::timestamp::HLCTimestamp;
11
12pub type Key = u64;
13pub type StateChanges = Vec<(Key, HLCTimestamp)>;
14
15pub const FORGIVENESS_PERIOD: Duration = if cfg!(test) {
22 Duration::from_secs(0)
23} else {
24 Duration::from_secs(3_600)
25};
26
27#[cfg(feature = "rkyv")]
28#[derive(Debug, thiserror::Error)]
29#[error("The set cannot be (de)serialized from the provided set of bytes.")]
30pub struct BadState;
32
33#[derive(Debug, Clone)]
34#[repr(C)]
35#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
36#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
37pub struct NodeVersions<const N: usize> {
38 nodes_max_stamps: [BTreeMap<u8, HLCTimestamp>; N],
39 safe_last_stamps: BTreeMap<u8, HLCTimestamp>,
40}
41
42impl<const N: usize> Default for NodeVersions<N> {
43 fn default() -> Self {
44 let stamps_template = [(); N];
45
46 Self {
47 nodes_max_stamps: stamps_template.map(|_| BTreeMap::new()),
48 safe_last_stamps: BTreeMap::new(),
49 }
50 }
51}
52
53impl<const N: usize> NodeVersions<N> {
54 fn merge(&mut self, other: NodeVersions<N>) {
56 let mut nodes = HashSet::new();
57 for (source, other_nodes) in other.nodes_max_stamps.into_iter().enumerate() {
58 let existing_nodes = &mut self.nodes_max_stamps[source];
59 for (node, ts) in other_nodes {
60 nodes.insert(node);
61 match existing_nodes.entry(node) {
62 Entry::Occupied(mut entry) => {
63 if &ts < entry.get() {
66 continue;
67 }
68
69 entry.insert(ts);
70 },
71 Entry::Vacant(v) => {
72 v.insert(ts);
73 },
74 }
75 }
76 }
77
78 for node in nodes {
79 self.compute_safe_last_stamp(node);
80 }
81 }
82
83 fn try_update_max_stamp(&mut self, source: usize, ts: HLCTimestamp) -> bool {
85 match self.nodes_max_stamps[source].entry(ts.node()) {
86 Entry::Occupied(mut entry) => {
87 if &ts < entry.get() {
90 self.compute_safe_last_stamp(ts.node());
91 return false;
92 }
93
94 entry.insert(ts);
95 },
96 Entry::Vacant(v) => {
97 v.insert(ts);
98 },
99 }
100
101 self.compute_safe_last_stamp(ts.node());
102
103 true
104 }
105
106 fn compute_safe_last_stamp(&mut self, node: u8) {
108 let min = self
109 .nodes_max_stamps
110 .iter()
111 .map(|stamps| {
112 stamps.get(&node).copied().unwrap_or_else(|| {
113 HLCTimestamp::new(Duration::from_secs(0), 0, node)
114 })
115 })
116 .min();
117
118 if let Some(min) = min {
119 let ts = HLCTimestamp::new(
120 min.datacake_timestamp().saturating_sub(FORGIVENESS_PERIOD),
121 min.counter(),
122 min.node(),
123 );
124 self.safe_last_stamps.insert(node, ts);
125 }
126 }
127
128 fn is_ts_before_last_observed_event(&self, ts: HLCTimestamp) -> bool {
130 self.safe_last_stamps
131 .get(&ts.node())
132 .map(|v| &ts < v)
133 .unwrap_or_default()
134 }
135}
136
137#[derive(Debug, Default, Clone)]
138#[repr(C)]
139#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
140#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
141pub struct OrSWotSet<const N: usize = 1> {
214 entries: BTreeMap<Key, HLCTimestamp>,
215 dead: HashMap<Key, HLCTimestamp>,
216 versions: NodeVersions<N>,
217}
218
219impl<const N: usize> OrSWotSet<N> {
220 #[cfg(feature = "rkyv")]
221 pub fn from_bytes(data: &[u8]) -> Result<Self, BadState> {
223 let deserialized = rkyv::from_bytes::<Self>(data).map_err(|_| BadState)?;
224 Ok(deserialized)
225 }
226
227 #[cfg(feature = "rkyv")]
228 pub fn as_bytes(&self) -> Result<Vec<u8>, BadState> {
230 Ok(rkyv::to_bytes::<_, 2048>(self)
231 .map_err(|_| BadState)?
232 .into_vec())
233 }
234
235 pub fn diff(&self, other: &OrSWotSet<N>) -> (StateChanges, StateChanges) {
244 let mut changes = Vec::new();
245 let mut removals = Vec::new();
246
247 for (key, ts) in other.entries.iter() {
248 self.check_self_then_insert_to(*key, *ts, &mut changes);
249 }
250
251 for (key, ts) in other.dead.iter() {
252 self.check_self_then_insert_to(*key, *ts, &mut removals);
253 }
254
255 (changes, removals)
256 }
257
258 fn check_self_then_insert_to(
259 &self,
260 key: Key,
261 ts: HLCTimestamp,
262 values: &mut Vec<(Key, HLCTimestamp)>,
263 ) {
264 if let Some(existing_insert) = self.entries.get(&key) {
265 if existing_insert < &ts {
266 values.push((key, ts));
267 }
268 } else if let Some(existing_delete) = self.dead.get(&key) {
269 if existing_delete < &ts {
270 values.push((key, ts));
271 }
272 } else if !self.versions.is_ts_before_last_observed_event(ts) {
273 values.push((key, ts))
274 }
275 }
276
277 pub fn merge(&mut self, other: OrSWotSet<N>) {
283 let base_entries = other.entries.into_iter().map(|(k, ts)| (k, ts, false));
284
285 let remote_versions = other.versions;
286 let mut entries_log = Vec::from_iter(base_entries);
287 entries_log.extend(other.dead.into_iter().map(|(k, ts)| (k, ts, true)));
288
289 entries_log.sort_by_key(|v| v.1);
291
292 let mut old_entries = mem::take(&mut self.entries);
293
294 for (key, ts, is_delete) in entries_log {
295 if is_delete && self.versions.is_ts_before_last_observed_event(ts) {
297 continue;
298 }
299
300 if is_delete {
301 if let Some(entry) = self.entries.remove(&key) {
302 if ts < entry {
303 self.entries.insert(key, entry);
304 continue;
305 }
306 }
307
308 self.dead
309 .entry(key)
310 .and_modify(|v| {
311 (*v) = cmp::max(*v, ts);
312 })
313 .or_insert_with(|| ts);
314
315 continue;
316 }
317
318 let mut timestamp = ts;
319
320 if let Some(existing_ts) = old_entries.remove(&key) {
322 timestamp = cmp::max(timestamp, existing_ts);
323 }
324
325 if let Some(deleted_ts) = self.dead.remove(&key) {
327 if timestamp < deleted_ts {
328 self.dead.insert(key, deleted_ts);
329 continue;
330 }
331 }
332
333 self.entries.insert(key, timestamp);
334 }
335
336 for (key, ts) in old_entries {
339 if remote_versions.is_ts_before_last_observed_event(ts) {
345 continue;
346 }
347
348 if let Some(deleted) = self.dead.remove(&key) {
349 if ts < deleted {
350 self.dead.insert(key, deleted);
351 continue;
352 }
353 }
354
355 self.entries.insert(key, ts);
356 }
357
358 self.versions.merge(remote_versions);
359 }
360
361 pub fn get(&self, k: &Key) -> Option<&HLCTimestamp> {
365 self.entries.get(k)
366 }
367
368 pub fn purge_old_deletes(&mut self) -> StateChanges {
373 let mut deleted_keys = vec![];
374 for (k, stamp) in mem::take(&mut self.dead) {
375 if !self.versions.is_ts_before_last_observed_event(stamp) {
376 self.dead.insert(k, stamp);
377 } else {
378 deleted_keys.push((k, stamp));
379 }
380 }
381
382 deleted_keys
383 }
384
385 pub fn add_raw_tombstones(&mut self, tombstones: StateChanges) {
393 for (key, stamp) in tombstones {
394 self.dead.insert(key, stamp);
395 }
396 }
397
398 pub fn will_apply(&self, key: Key, ts: HLCTimestamp) -> bool {
400 if self.versions.is_ts_before_last_observed_event(ts) {
401 return false;
402 }
403
404 if let Some(entry) = self.entries.get(&key) {
405 return entry < &ts;
406 }
407
408 if let Some(entry) = self.dead.get(&key) {
409 return entry < &ts;
410 }
411
412 true
413 }
414
415 pub fn insert(&mut self, k: Key, ts: HLCTimestamp) -> bool {
423 self.insert_with_source(0, k, ts)
424 }
425
426 pub fn insert_with_source(
434 &mut self,
435 source: usize,
436 k: Key,
437 ts: HLCTimestamp,
438 ) -> bool {
439 debug_assert!(source < N);
440
441 let mut has_set = false;
442
443 if !self.versions.try_update_max_stamp(source, ts) {
444 return has_set;
445 }
446
447 if let Some(deleted_ts) = self.dead.remove(&k) {
448 if ts < deleted_ts {
450 self.dead.insert(k, deleted_ts);
451 return has_set;
452 }
453 }
454
455 self.entries
456 .entry(k)
457 .and_modify(|v| {
458 if *v < ts {
459 has_set = true;
460 (*v) = ts;
461 }
462 })
463 .or_insert_with(|| {
464 has_set = true;
465 ts
466 });
467
468 has_set
469 }
470
471 pub fn delete(&mut self, k: Key, ts: HLCTimestamp) -> bool {
479 self.delete_with_source(0, k, ts)
480 }
481
482 pub fn delete_with_source(
490 &mut self,
491 source: usize,
492 k: Key,
493 ts: HLCTimestamp,
494 ) -> bool {
495 debug_assert!(source < N);
496
497 let mut has_set = false;
498
499 if !self.versions.try_update_max_stamp(source, ts) {
500 return has_set;
501 }
502
503 if let Some(existing_ts) = self.entries.remove(&k) {
504 if ts <= existing_ts {
507 self.entries.insert(k, existing_ts);
508 return has_set;
509 }
510 }
511
512 self.dead
513 .entry(k)
514 .and_modify(|v| {
515 if *v < ts {
516 has_set = true;
517 (*v) = ts;
518 }
519 })
520 .or_insert_with(|| {
521 has_set = true;
522 ts
523 });
524
525 has_set
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use std::time::Duration;
532
533 use super::*;
534
535 #[test]
536 fn test_op_order() {
537 let mut node_a = HLCTimestamp::now(0, 0);
538 let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
539
540 let mut node_a_set = OrSWotSet::<1>::default();
541
542 let ts_a = node_a.send().unwrap();
543 let ts_b = node_b.send().unwrap();
544
545 node_a_set.insert(1, ts_a);
546 node_a_set.insert(1, ts_b);
547
548 let retrieved = node_a_set.get(&1);
549 assert_eq!(retrieved, Some(&ts_b), "Node B should win the operation.");
550
551 let mut node_a_set = OrSWotSet::<1>::default();
552 let mut node_b_set = OrSWotSet::<1>::default();
553
554 node_a_set.insert(1, ts_a);
555 node_b_set.insert(1, ts_b);
556
557 node_a_set.merge(node_b_set.clone());
558 node_b_set.merge(node_a_set.clone());
559
560 let retrieved = node_a_set.get(&1);
561 assert_eq!(
562 retrieved,
563 Some(&ts_b),
564 "Node B should win the operation after merging set A."
565 );
566
567 let retrieved = node_b_set.get(&1);
568 assert_eq!(
569 retrieved,
570 Some(&ts_b),
571 "Node B should win the operation after merging set B."
572 );
573 }
574
575 #[test]
576 fn test_basic_insert_merge() {
577 let mut node_a = HLCTimestamp::now(0, 0);
578 let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
579
580 let mut node_a_set = OrSWotSet::<1>::default();
582
583 node_a_set.insert(1, node_a.send().unwrap());
585 node_a_set.insert(2, node_a.send().unwrap());
586 node_a_set.insert(3, node_a.send().unwrap());
587
588 let mut node_b_set = OrSWotSet::<1>::default();
590
591 node_b_set.insert(1, node_b.send().unwrap());
593 node_b_set.insert(4, node_b.send().unwrap());
594
595 node_a_set.merge(node_b_set);
596
597 assert!(
598 node_a_set.dead.is_empty(),
599 "Expected no entries to be marked as dead."
600 );
601
602 assert!(
603 node_a_set.entries.get(&1).is_some(),
604 "Expected entry with key 1 to exist."
605 );
606 assert!(
607 node_a_set.entries.get(&2).is_some(),
608 "Expected entry with key 2 to exist."
609 );
610 assert!(
611 node_a_set.entries.get(&3).is_some(),
612 "Expected entry with key 3 to exist."
613 );
614 assert!(
615 node_a_set.entries.get(&4).is_some(),
616 "Expected entry with key 4 to exist."
617 );
618 }
619
620 #[test]
621 fn test_same_time_conflict_convergence() {
622 let mut node_a = HLCTimestamp::now(0, 0);
623 let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
624
625 let mut node_a_set = OrSWotSet::<1>::default();
627
628 node_a_set.insert(3, node_a.send().unwrap());
632 node_a_set.insert(1, node_a.send().unwrap());
633 node_a_set.insert(2, node_a.send().unwrap());
634
635 let mut node_b_set = OrSWotSet::<1>::default();
637
638 node_b_set.insert(1, node_b.send().unwrap());
642 node_b_set.delete(3, node_b.send().unwrap());
643
644 node_a_set.merge(node_b_set.clone());
647
648 assert!(
649 node_a_set.dead.contains_key(&3),
650 "SET A: Expected key 3 to be marked as dead."
651 );
652
653 assert!(
654 node_a_set.entries.get(&1).is_some(),
655 "SET A: Expected entry with key 1 to exist."
656 );
657 assert!(
658 node_a_set.entries.get(&2).is_some(),
659 "SET A: Expected entry with key 2 to exist."
660 );
661 assert!(
662 node_a_set.entries.get(&3).is_none(),
663 "SET A: Expected entry with key 3 to NOT exist."
664 );
665
666 node_b_set.merge(node_a_set);
668
669 assert!(
670 node_b_set.dead.contains_key(&3),
671 "SET B: Expected key 3 to be marked as dead."
672 );
673
674 assert!(
675 node_b_set.entries.get(&1).is_some(),
676 "SET B: Expected entry with key 1 to exist."
677 );
678 assert!(
679 node_b_set.entries.get(&2).is_some(),
680 "SET B: Expected entry with key 2 to exist."
681 );
682 assert!(
683 node_b_set.entries.get(&3).is_none(),
684 "SET B: Expected entry with key 3 to NOT exist."
685 );
686 }
687
688 #[test]
689 fn test_basic_delete_merge() {
690 let mut node_a = HLCTimestamp::now(0, 0);
691 let mut node_b = HLCTimestamp::new(
692 node_a.datacake_timestamp() + Duration::from_secs(1),
693 0,
694 1,
695 );
696
697 let mut node_a_set = OrSWotSet::<1>::default();
699
700 node_a_set.insert(1, node_a.send().unwrap());
702 node_a_set.insert(2, node_a.send().unwrap());
703 node_a_set.insert(3, node_a.send().unwrap());
704
705 let mut node_b_set = OrSWotSet::<1>::default();
707
708 node_b_set.insert(1, node_b.send().unwrap());
710 node_b_set.delete(3, node_b.send().unwrap());
711
712 node_a_set.merge(node_b_set.clone());
713
714 assert!(
715 node_a_set.dead.contains_key(&3),
716 "Expected key 3 to be marked as dead."
717 );
718
719 assert!(
720 node_a_set.entries.get(&1).is_some(),
721 "Expected entry with key 1 to exist."
722 );
723 assert!(
724 node_a_set.entries.get(&2).is_some(),
725 "Expected entry with key 2 to exist."
726 );
727 assert!(
728 node_a_set.entries.get(&3).is_none(),
729 "Expected entry with key 3 to NOT exist."
730 );
731 }
732
733 #[test]
734 fn test_purge_delete_merge() {
735 let mut node_a = HLCTimestamp::now(0, 0);
736 let mut node_b = HLCTimestamp::new(
737 node_a.datacake_timestamp() + Duration::from_secs(1),
738 0,
739 1,
740 );
741
742 let mut node_a_set = OrSWotSet::<1>::default();
744
745 node_a_set.insert(1, node_a.send().unwrap());
747 node_a_set.insert(2, node_a.send().unwrap());
748 node_a_set.insert(3, node_a.send().unwrap());
749
750 let mut node_b_set = OrSWotSet::<1>::default();
752
753 node_b_set.insert(1, node_b.send().unwrap());
755 node_b_set.delete(3, node_b.send().unwrap());
756
757 node_a_set.merge(node_b_set.clone());
758
759 node_a_set.insert(4, node_a.send().unwrap());
760
761 node_b_set.insert(4, node_b.send().unwrap());
763 node_a_set.merge(node_b_set.clone());
764
765 node_a_set.purge_old_deletes();
766
767 assert!(
768 node_a_set.dead.is_empty(),
769 "Expected dead entries to be empty."
770 );
771
772 assert!(
773 node_a_set.entries.get(&1).is_some(),
774 "Expected entry with key 1 to exist."
775 );
776 assert!(
777 node_a_set.entries.get(&2).is_some(),
778 "Expected entry with key 2 to exist."
779 );
780 assert!(
781 node_a_set.entries.get(&3).is_none(),
782 "Expected entry with key 3 to NOT exist."
783 );
784 assert!(
785 node_a_set.entries.get(&4).is_some(),
786 "Expected entry with key 4 to exist."
787 );
788 }
789
790 #[test]
791 fn test_purge_some_entries() {
792 let mut node_a = HLCTimestamp::now(0, 0);
793 let mut node_b = HLCTimestamp::new(
794 node_a.datacake_timestamp() + Duration::from_secs(1),
795 0,
796 1,
797 );
798
799 let mut node_a_set = OrSWotSet::<1>::default();
801
802 node_a_set.insert(1, node_a.send().unwrap());
804 node_a_set.insert(2, node_a.send().unwrap());
805 node_a_set.insert(3, node_a.send().unwrap());
806
807 std::thread::sleep(Duration::from_millis(1));
808
809 let mut node_b_set = OrSWotSet::<1>::default();
811
812 node_b_set.insert(1, node_b.send().unwrap());
814 node_b_set.delete(3, node_b.send().unwrap());
815
816 node_a_set.merge(node_b_set.clone());
817
818 node_a_set.insert(4, node_a.send().unwrap());
819
820 node_a_set.delete(2, node_a.send().unwrap());
822
823 node_a_set.insert(5, node_a.send().unwrap());
825
826 node_a_set.merge(node_b_set.clone());
827
828 node_a_set.purge_old_deletes();
830
831 node_b_set.merge(node_a_set.clone());
832 node_a_set.purge_old_deletes();
833
834 assert!(
835 node_a_set.dead.get(&3).is_some(),
836 "SET A: Expected key 3 to be left in dead set."
837 );
838 assert!(
839 node_a_set.dead.get(&2).is_none(),
840 "SET A: Expected key 2 to be purged from dead set."
841 );
842
843 assert!(
844 node_a_set.entries.get(&1).is_some(),
845 "SET A: Expected entry with key 1 to exist."
846 );
847 assert!(
848 node_a_set.entries.get(&2).is_none(),
849 "SET A: Expected entry with key 2 to exist."
850 );
851 assert!(
852 node_a_set.entries.get(&3).is_none(),
853 "SET A: Expected entry with key 3 to NOT exist."
854 );
855 assert!(
856 node_a_set.entries.get(&4).is_some(),
857 "SET A: Expected entry with key 4 to exist."
858 );
859
860 assert!(
861 node_b_set.dead.get(&3).is_some(),
862 "SET B: Expected key 3 to be left in dead set."
863 );
864 assert!(
865 node_b_set.dead.get(&2).is_none(),
866 "SET B: Expected key 2 to be purged from dead set."
867 );
868
869 assert!(
870 node_b_set.entries.get(&1).is_some(),
871 "SET B: Expected entry with key 1 to exist."
872 );
873 assert!(
874 node_b_set.entries.get(&2).is_none(),
875 "SET B: Expected entry with key 2 to exist."
876 );
877 assert!(
878 node_b_set.entries.get(&3).is_none(),
879 "SET B: Expected entry with key 3 to NOT exist."
880 );
881 assert!(
882 node_b_set.entries.get(&4).is_some(),
883 "SET B: Expected entry with key 4 to exist."
884 );
885 }
886
887 #[test]
888 fn test_insert_no_op() {
889 let mut node_a = HLCTimestamp::now(0, 0);
890 let old_ts = node_a.send().unwrap();
891
892 let mut node_a_set = OrSWotSet::<1>::default();
894
895 let did_add = node_a_set.insert(1, node_a.send().unwrap());
896 assert!(did_add, "Expected entry insert to be added.");
897
898 let did_add = node_a_set.insert(1, old_ts);
899 assert!(
900 !did_add,
901 "Expected entry insert with old timestamp to be ignored"
902 );
903 }
904
905 #[test]
906 fn test_delete_no_op() {
907 let mut node_a = HLCTimestamp::now(0, 0);
908 let old_ts = node_a.send().unwrap();
909
910 let mut node_a_set = OrSWotSet::<1>::default();
912
913 let did_add = node_a_set.insert(1, node_a.send().unwrap());
914 assert!(did_add, "Expected entry insert to be added.");
915
916 let did_add = node_a_set.delete(1, old_ts);
917 assert!(
918 !did_add,
919 "Expected entry delete with old timestamp to be ignored"
920 );
921 }
922
923 #[test]
924 fn test_set_diff() {
925 let mut node_a = HLCTimestamp::now(0, 0);
926 let mut node_b = HLCTimestamp::new(
927 node_a.datacake_timestamp() + Duration::from_secs(5),
928 0,
929 1,
930 );
931
932 let mut node_a_set = OrSWotSet::<1>::default();
933 let mut node_b_set = OrSWotSet::<1>::default();
934
935 let insert_ts_1 = node_a.send().unwrap();
936 node_a_set.insert(1, insert_ts_1);
937
938 let (changed, removed) = OrSWotSet::<1>::default().diff(&node_a_set);
939 assert_eq!(
940 changed,
941 vec![(1, insert_ts_1)],
942 "Expected set diff to contain key `1`."
943 );
944 assert!(
945 removed.is_empty(),
946 "Expected there to be no difference between sets."
947 );
948
949 let delete_ts_3 = node_a.send().unwrap();
950 node_a_set.delete(3, delete_ts_3);
951
952 let insert_ts_2 = node_b.send().unwrap();
953 node_b_set.insert(2, insert_ts_2);
954
955 let (changed, removed) = node_a_set.diff(&node_b_set);
956
957 assert_eq!(
958 changed,
959 vec![(2, insert_ts_2)],
960 "Expected set a to only be marked as missing key `2`"
961 );
962 assert!(
963 removed.is_empty(),
964 "Expected set a to not be missing any delete markers."
965 );
966
967 let (changed, removed) = node_b_set.diff(&node_a_set);
968 assert_eq!(
969 changed,
970 vec![(1, insert_ts_1)],
971 "Expected set b to have key `1` marked as changed."
972 );
973 assert_eq!(
974 removed,
975 vec![(3, delete_ts_3)],
976 "Expected set b to have key `3` marked as deleted."
977 );
978 }
979
980 #[test]
981 fn test_set_diff_with_conflicts() {
982 let mut node_a = HLCTimestamp::now(0, 0);
983 let mut node_b = HLCTimestamp::new(
984 node_a.datacake_timestamp() + Duration::from_secs(5),
985 0,
986 1,
987 );
988
989 let mut node_a_set = OrSWotSet::<1>::default();
990 let mut node_b_set = OrSWotSet::<1>::default();
991
992 node_a_set.insert(1, node_a.send().unwrap());
994 node_a_set.insert(2, node_a.send().unwrap());
995
996 std::thread::sleep(Duration::from_millis(500));
997
998 let delete_ts_3 = node_a.send().unwrap();
999 node_a_set.delete(3, delete_ts_3);
1000
1001 let insert_ts_2 = node_b.send().unwrap();
1002 node_b_set.insert(2, insert_ts_2);
1003
1004 let insert_ts_1 = node_b.send().unwrap();
1005 node_b_set.insert(1, insert_ts_1);
1006
1007 let (changed, removed) = node_a_set.diff(&node_b_set);
1008
1009 assert_eq!(
1010 changed,
1011 vec![(1, insert_ts_1), (2, insert_ts_2)],
1012 "Expected set a to be marked as updating keys `1, 2`"
1013 );
1014 assert!(
1015 removed.is_empty(),
1016 "Expected set a to not be missing any delete markers."
1017 );
1018
1019 let (changed, removed) = node_b_set.diff(&node_a_set);
1020 assert_eq!(
1021 changed,
1022 vec![],
1023 "Expected set b to have no changed keys marked."
1024 );
1025 assert_eq!(
1026 removed,
1027 vec![(3, delete_ts_3)],
1028 "Expected set b to have key `3` marked as deleted."
1029 );
1030 }
1031
1032 #[test]
1033 fn test_tie_breakers() {
1034 let node_a = HLCTimestamp::now(0, 0);
1035 let node_b = HLCTimestamp::new(node_a.datacake_timestamp(), 0, 1);
1036
1037 let mut node_a_set = OrSWotSet::<1>::default();
1038 let mut node_b_set = OrSWotSet::<1>::default();
1039
1040 node_a_set.insert(1, node_a);
1043 node_b_set.delete(1, node_b);
1044
1045 let (changed, removed) = node_a_set.diff(&node_b_set);
1046 assert_eq!(changed, vec![]);
1047 assert_eq!(removed, vec![(1, node_b)]);
1048
1049 let (changed, removed) = node_b_set.diff(&node_a_set);
1050 assert_eq!(changed, vec![]);
1051 assert_eq!(removed, vec![]);
1052
1053 node_a_set.merge(node_b_set.clone());
1054 node_b_set.merge(node_a_set.clone());
1055
1056 assert!(
1057 node_a_set.get(&1).is_none(),
1058 "Set a should no longer have key 1."
1059 );
1060 assert!(node_b_set.get(&1).is_none(), "Set b should not have key 1.");
1061
1062 let (changed, removed) = node_b_set.diff(&node_a_set);
1063 assert_eq!(changed, vec![]);
1064 assert_eq!(removed, vec![]);
1065
1066 let (changed, removed) = node_a_set.diff(&node_b_set);
1067 assert_eq!(changed, vec![]);
1068 assert_eq!(removed, vec![]);
1069
1070 let has_changed = node_a_set.insert(1, node_a);
1071 assert!(!has_changed, "Set a should not insert the value.");
1072 let has_changed = node_b_set.insert(1, node_a);
1073 assert!(
1074 !has_changed,
1075 "Set b should not insert the value with node a's timestamp."
1076 );
1077 let has_changed = node_a_set.insert(1, node_b);
1078 assert!(
1079 has_changed,
1080 "Set a should insert the value with node b's timestamp."
1081 );
1082 let has_changed = node_b_set.insert(1, node_b);
1083 assert!(has_changed, "Set b should insert the value.");
1084
1085 let mut node_a_set = OrSWotSet::<1>::default();
1086 let mut node_b_set = OrSWotSet::<1>::default();
1087
1088 node_a_set.delete(1, node_a);
1091 node_b_set.insert(1, node_b);
1092
1093 let (changed, removed) = node_a_set.diff(&node_b_set);
1094 assert_eq!(changed, vec![(1, node_b)]);
1095 assert_eq!(removed, vec![]);
1096
1097 let (changed, removed) = node_b_set.diff(&node_a_set);
1098 assert_eq!(changed, vec![]);
1099 assert_eq!(removed, vec![]);
1100
1101 node_a_set.merge(node_b_set.clone());
1102 node_b_set.merge(node_a_set.clone());
1103
1104 assert!(
1105 node_a_set.get(&1).is_some(),
1106 "Set a should no longer have key 1."
1107 );
1108 assert!(node_b_set.get(&1).is_some(), "Set b should not have key 1.");
1109 }
1110
1111 #[test]
1112 fn test_multi_source_handling() {
1113 let mut clock = HLCTimestamp::now(0, 0);
1114 let mut node_set = OrSWotSet::<1>::default();
1115
1116 node_set.insert_with_source(0, 1, clock.send().unwrap());
1118
1119 node_set.delete_with_source(0, 1, clock.send().unwrap());
1120
1121 node_set.insert_with_source(0, 3, clock.send().unwrap());
1122 node_set.insert_with_source(0, 4, clock.send().unwrap());
1123
1124 let purged = node_set
1126 .purge_old_deletes()
1127 .into_iter()
1128 .map(|(key, _)| key)
1129 .collect::<Vec<_>>();
1130 assert_eq!(purged, vec![1]);
1131
1132 let mut node_set = OrSWotSet::<2>::default();
1133
1134 node_set.insert_with_source(0, 1, clock.send().unwrap());
1136 node_set.insert_with_source(1, 2, clock.send().unwrap());
1137
1138 node_set.delete_with_source(0, 1, clock.send().unwrap());
1140
1141 node_set.insert_with_source(0, 3, clock.send().unwrap());
1143 node_set.insert_with_source(0, 4, clock.send().unwrap());
1144
1145 let purged = node_set.purge_old_deletes();
1149 assert!(purged.is_empty());
1150
1151 node_set.insert_with_source(1, 3, clock.send().unwrap());
1153
1154 let purged = node_set
1156 .purge_old_deletes()
1157 .into_iter()
1158 .map(|(key, _)| key)
1159 .collect::<Vec<_>>();
1160 assert_eq!(purged, vec![1]);
1161
1162 let old_ts = clock.send().unwrap();
1163 let initial_ts = clock.send().unwrap();
1164
1165 assert!(node_set.delete_with_source(0, 4, initial_ts));
1167 assert!(node_set.delete_with_source(1, 3, old_ts));
1168 assert!(!node_set.delete_with_source(0, 3, old_ts));
1169
1170 assert!(node_set.insert_with_source(0, 5, initial_ts));
1171 assert!(node_set.insert_with_source(1, 6, old_ts));
1172 assert!(!node_set.insert_with_source(0, 5, old_ts));
1173
1174 assert!(node_set.insert_with_source(0, 6, initial_ts));
1175 assert!(node_set.delete_with_source(1, 4, clock.send().unwrap()));
1176 assert!(node_set.delete_with_source(0, 3, clock.send().unwrap()));
1177 assert!(!node_set.delete_with_source(1, 4, initial_ts));
1178 }
1179
1180 #[test]
1181 fn test_will_apply() {
1182 let ts = Duration::from_secs(1);
1183 let mut node_set = OrSWotSet::<1>::default();
1184
1185 assert!(node_set.will_apply(1, HLCTimestamp::new(ts, 0, 0)));
1186 node_set.insert(1, HLCTimestamp::new(ts, 0, 0));
1187 assert!(!node_set.will_apply(1, HLCTimestamp::new(ts, 0, 0)));
1188
1189 assert!(node_set.will_apply(3, HLCTimestamp::new(Duration::from_secs(3), 0, 0)));
1190 node_set.delete(3, HLCTimestamp::new(Duration::from_secs(5), 0, 0));
1191 assert!(!node_set.will_apply(3, HLCTimestamp::new(Duration::from_secs(4), 0, 0)));
1192 }
1193}