Skip to main content

atomr_distributed_data/
sets.rs

1//! Grow-only and observed-remove sets. akka.net: `GSet`, `ORSet`.
2
3use std::collections::{HashMap, HashSet};
4use std::hash::Hash;
5
6use serde::{Deserialize, Serialize};
7
8use crate::traits::CrdtMerge;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct GSet<T>
12where
13    T: Eq + Hash + Clone,
14{
15    items: HashSet<T>,
16}
17
18impl<T: Eq + Hash + Clone> Default for GSet<T> {
19    fn default() -> Self {
20        Self { items: HashSet::new() }
21    }
22}
23
24impl<T: Eq + Hash + Clone> GSet<T> {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn add(&mut self, item: T) {
30        self.items.insert(item);
31    }
32
33    pub fn contains(&self, item: &T) -> bool {
34        self.items.contains(item)
35    }
36
37    pub fn iter(&self) -> impl Iterator<Item = &T> {
38        self.items.iter()
39    }
40
41    pub fn len(&self) -> usize {
42        self.items.len()
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.items.is_empty()
47    }
48}
49
50impl<T: Eq + Hash + Clone> CrdtMerge for GSet<T> {
51    fn merge(&mut self, other: &Self) {
52        for item in &other.items {
53            self.items.insert(item.clone());
54        }
55    }
56}
57
58/// Observed-remove set. Each addition gets a unique tag; a removal retains
59/// all tags seen at that moment. Merge takes the union of (item, tag) pairs
60/// minus tombstones.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct OrSet<T>
63where
64    T: Eq + Hash + Clone,
65{
66    adds: HashMap<T, HashSet<u64>>,
67    removes: HashMap<T, HashSet<u64>>,
68    counter: u64,
69}
70
71impl<T: Eq + Hash + Clone> Default for OrSet<T> {
72    fn default() -> Self {
73        Self { adds: HashMap::new(), removes: HashMap::new(), counter: 0 }
74    }
75}
76
77impl<T: Eq + Hash + Clone> OrSet<T> {
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    pub fn add(&mut self, item: T) {
83        self.counter += 1;
84        self.adds.entry(item).or_default().insert(self.counter);
85    }
86
87    pub fn remove(&mut self, item: &T) {
88        if let Some(tags) = self.adds.get(item).cloned() {
89            self.removes.entry(item.clone()).or_default().extend(tags);
90        }
91    }
92
93    pub fn contains(&self, item: &T) -> bool {
94        match (self.adds.get(item), self.removes.get(item)) {
95            (Some(a), Some(r)) => a.difference(r).next().is_some(),
96            (Some(a), None) => !a.is_empty(),
97            _ => false,
98        }
99    }
100}
101
102impl<T: Eq + Hash + Clone> CrdtMerge for OrSet<T> {
103    fn merge(&mut self, other: &Self) {
104        for (k, v) in &other.adds {
105            self.adds.entry(k.clone()).or_default().extend(v.iter().copied());
106        }
107        for (k, v) in &other.removes {
108            self.removes.entry(k.clone()).or_default().extend(v.iter().copied());
109        }
110        self.counter = self.counter.max(other.counter);
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[test]
119    fn gset_merges_union() {
120        let mut a = GSet::<i32>::new();
121        let mut b = GSet::<i32>::new();
122        a.add(1);
123        b.add(2);
124        a.merge(&b);
125        assert_eq!(a.len(), 2);
126    }
127
128    #[test]
129    fn orset_add_then_remove() {
130        let mut s = OrSet::<&'static str>::new();
131        s.add("x");
132        assert!(s.contains(&"x"));
133        s.remove(&"x");
134        assert!(!s.contains(&"x"));
135    }
136
137    #[test]
138    fn orset_merge_preserves_re_add_after_concurrent_remove() {
139        let mut a = OrSet::<&'static str>::new();
140        a.add("x");
141
142        let mut b = a.clone();
143        b.remove(&"x");
144
145        a.add("x");
146        a.merge(&b);
147        assert!(a.contains(&"x"));
148    }
149}