Skip to main content

crdt_kit/
twop_set.rs

1use alloc::collections::BTreeSet;
2
3use crate::Crdt;
4
5/// A two-phase set (2P-Set).
6///
7/// Elements can be added and removed, but once removed, they cannot be
8/// re-added. This is implemented with two G-Sets: one for additions
9/// and one for removals (tombstones).
10///
11/// # Example
12///
13/// ```
14/// use crdt_kit::prelude::*;
15///
16/// let mut s1 = TwoPSet::new();
17/// s1.insert("apple");
18/// s1.insert("banana");
19/// s1.remove(&"banana");
20///
21/// assert!(s1.contains(&"apple"));
22/// assert!(!s1.contains(&"banana")); // removed
23///
24/// let mut s2 = TwoPSet::new();
25/// s2.insert("banana"); // trying to re-add on another replica
26///
27/// s1.merge(&s2);
28/// assert!(!s1.contains(&"banana")); // still removed (tombstone wins)
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct TwoPSet<T: Ord + Clone> {
33    added: BTreeSet<T>,
34    removed: BTreeSet<T>,
35}
36
37impl<T: Ord + Clone> TwoPSet<T> {
38    /// Create a new empty 2P-Set.
39    #[must_use]
40    pub fn new() -> Self {
41        Self {
42            added: BTreeSet::new(),
43            removed: BTreeSet::new(),
44        }
45    }
46
47    /// Insert an element.
48    ///
49    /// Returns `true` if the element was newly added (not previously
50    /// removed). If the element was already removed, it cannot be re-added
51    /// and this returns `false`.
52    pub fn insert(&mut self, value: T) -> bool {
53        if self.removed.contains(&value) {
54            return false;
55        }
56        self.added.insert(value)
57    }
58
59    /// Remove an element.
60    ///
61    /// The element must have been added first. Once removed, it can never
62    /// be re-added. Returns `true` if the element was present and is now removed.
63    pub fn remove(&mut self, value: &T) -> bool {
64        if self.added.contains(value) && !self.removed.contains(value) {
65            self.removed.insert(value.clone());
66            true
67        } else {
68            false
69        }
70    }
71
72    /// Check if the set contains an element (added and not removed).
73    #[must_use]
74    pub fn contains(&self, value: &T) -> bool {
75        self.added.contains(value) && !self.removed.contains(value)
76    }
77
78    /// Get the number of active elements (added minus removed).
79    #[must_use]
80    pub fn len(&self) -> usize {
81        self.added.difference(&self.removed).count()
82    }
83
84    /// Check if the set has no active elements.
85    #[must_use]
86    pub fn is_empty(&self) -> bool {
87        self.len() == 0
88    }
89
90    /// Iterate over active elements (added and not removed).
91    pub fn iter(&self) -> impl Iterator<Item = &T> {
92        self.added.difference(&self.removed)
93    }
94}
95
96impl<T: Ord + Clone> Default for TwoPSet<T> {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102impl<T: Ord + Clone> IntoIterator for TwoPSet<T> {
103    type Item = T;
104    type IntoIter = alloc::vec::IntoIter<T>;
105
106    fn into_iter(self) -> Self::IntoIter {
107        let active: alloc::vec::Vec<T> = self.added.difference(&self.removed).cloned().collect();
108        active.into_iter()
109    }
110}
111
112impl<T: Ord + Clone> Crdt for TwoPSet<T> {
113    fn merge(&mut self, other: &Self) {
114        for elem in &other.added {
115            self.added.insert(elem.clone());
116        }
117        for elem in &other.removed {
118            self.removed.insert(elem.clone());
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn new_set_is_empty() {
129        let s = TwoPSet::<String>::new();
130        assert!(s.is_empty());
131    }
132
133    #[test]
134    fn insert_and_contains() {
135        let mut s = TwoPSet::new();
136        s.insert("a");
137        assert!(s.contains(&"a"));
138        assert_eq!(s.len(), 1);
139    }
140
141    #[test]
142    fn remove_element() {
143        let mut s = TwoPSet::new();
144        s.insert("a");
145        assert!(s.remove(&"a"));
146        assert!(!s.contains(&"a"));
147        assert_eq!(s.len(), 0);
148    }
149
150    #[test]
151    fn cannot_readd_removed_element() {
152        let mut s = TwoPSet::new();
153        s.insert("a");
154        s.remove(&"a");
155        assert!(!s.insert("a")); // returns false
156        assert!(!s.contains(&"a"));
157    }
158
159    #[test]
160    fn remove_wins_on_merge() {
161        let mut s1 = TwoPSet::new();
162        s1.insert("a");
163        s1.remove(&"a");
164
165        let mut s2 = TwoPSet::new();
166        s2.insert("a"); // concurrent add
167
168        s1.merge(&s2);
169        assert!(!s1.contains(&"a")); // remove wins
170    }
171
172    #[test]
173    fn merge_is_commutative() {
174        let mut s1 = TwoPSet::new();
175        s1.insert("a");
176        s1.insert("b");
177        s1.remove(&"a");
178
179        let mut s2 = TwoPSet::new();
180        s2.insert("b");
181        s2.insert("c");
182
183        let mut left = s1.clone();
184        left.merge(&s2);
185
186        let mut right = s2.clone();
187        right.merge(&s1);
188
189        assert_eq!(left, right);
190    }
191
192    #[test]
193    fn merge_is_idempotent() {
194        let mut s1 = TwoPSet::new();
195        s1.insert("a");
196
197        let mut s2 = TwoPSet::new();
198        s2.insert("b");
199
200        s1.merge(&s2);
201        let after_first = s1.clone();
202        s1.merge(&s2);
203
204        assert_eq!(s1, after_first);
205    }
206
207    #[test]
208    fn iterate_active_elements() {
209        let mut s = TwoPSet::new();
210        s.insert(1);
211        s.insert(2);
212        s.insert(3);
213        s.remove(&2);
214
215        let active: Vec<&i32> = s.iter().collect();
216        assert_eq!(active, vec![&1, &3]);
217    }
218}