Skip to main content

crdt_kit/
twop_set.rs

1use alloc::collections::BTreeSet;
2
3use crate::{Crdt, DeltaCrdt};
4
5/// A two-phase set (2P-Set).
6///
7/// Elements can be added and removed, but once removed, they cannot be
8/// re-added. This is implemented with two G-Sets: one for additions
9/// and one for removals (tombstones).
10///
11/// # Example
12///
13/// ```
14/// use crdt_kit::prelude::*;
15///
16/// let mut s1 = TwoPSet::new();
17/// s1.insert("apple");
18/// s1.insert("banana");
19/// s1.remove(&"banana");
20///
21/// assert!(s1.contains(&"apple"));
22/// assert!(!s1.contains(&"banana")); // removed
23///
24/// let mut s2 = TwoPSet::new();
25/// s2.insert("banana"); // trying to re-add on another replica
26///
27/// s1.merge(&s2);
28/// assert!(!s1.contains(&"banana")); // still removed (tombstone wins)
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct TwoPSet<T: Ord + Clone> {
33    added: BTreeSet<T>,
34    removed: BTreeSet<T>,
35}
36
37impl<T: Ord + Clone> TwoPSet<T> {
38    /// Create a new empty 2P-Set.
39    #[must_use]
40    pub fn new() -> Self {
41        Self {
42            added: BTreeSet::new(),
43            removed: BTreeSet::new(),
44        }
45    }
46
47    /// Insert an element.
48    ///
49    /// Returns `true` if the element was newly added (not previously
50    /// removed). If the element was already removed, it cannot be re-added
51    /// and this returns `false`.
52    pub fn insert(&mut self, value: T) -> bool {
53        if self.removed.contains(&value) {
54            return false;
55        }
56        self.added.insert(value)
57    }
58
59    /// Remove an element.
60    ///
61    /// The element must have been added first. Once removed, it can never
62    /// be re-added. Returns `true` if the element was present and is now removed.
63    pub fn remove(&mut self, value: &T) -> bool {
64        if self.added.contains(value) && !self.removed.contains(value) {
65            self.removed.insert(value.clone());
66            true
67        } else {
68            false
69        }
70    }
71
72    /// Check if the set contains an element (added and not removed).
73    #[must_use]
74    pub fn contains(&self, value: &T) -> bool {
75        self.added.contains(value) && !self.removed.contains(value)
76    }
77
78    /// Get the number of active elements (added minus removed).
79    #[must_use]
80    pub fn len(&self) -> usize {
81        self.added.difference(&self.removed).count()
82    }
83
84    /// Check if the set has no active elements.
85    #[must_use]
86    pub fn is_empty(&self) -> bool {
87        self.len() == 0
88    }
89
90    /// Iterate over active elements (added and not removed).
91    pub fn iter(&self) -> impl Iterator<Item = &T> {
92        self.added.difference(&self.removed)
93    }
94}
95
96impl<T: Ord + Clone> Default for TwoPSet<T> {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102impl<T: Ord + Clone> IntoIterator for TwoPSet<T> {
103    type Item = T;
104    type IntoIter = alloc::vec::IntoIter<T>;
105
106    fn into_iter(self) -> Self::IntoIter {
107        let active: alloc::vec::Vec<T> = self.added.difference(&self.removed).cloned().collect();
108        active.into_iter()
109    }
110}
111
112/// Delta for [`TwoPSet`]: new additions and new removals.
113#[derive(Debug, Clone, PartialEq, Eq)]
114#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
115pub struct TwoPSetDelta<T: Ord + Clone> {
116    /// Elements added that the other replica doesn't have.
117    pub added: BTreeSet<T>,
118    /// Elements removed that the other replica doesn't know about.
119    pub removed: BTreeSet<T>,
120}
121
122impl<T: Ord + Clone> DeltaCrdt for TwoPSet<T> {
123    type Delta = TwoPSetDelta<T>;
124
125    fn delta(&self, other: &Self) -> TwoPSetDelta<T> {
126        TwoPSetDelta {
127            added: self.added.difference(&other.added).cloned().collect(),
128            removed: self.removed.difference(&other.removed).cloned().collect(),
129        }
130    }
131
132    fn apply_delta(&mut self, delta: &TwoPSetDelta<T>) {
133        for elem in &delta.added {
134            self.added.insert(elem.clone());
135        }
136        for elem in &delta.removed {
137            self.removed.insert(elem.clone());
138        }
139    }
140}
141
142impl<T: Ord + Clone> Crdt for TwoPSet<T> {
143    fn merge(&mut self, other: &Self) {
144        for elem in &other.added {
145            self.added.insert(elem.clone());
146        }
147        for elem in &other.removed {
148            self.removed.insert(elem.clone());
149        }
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn new_set_is_empty() {
159        let s = TwoPSet::<String>::new();
160        assert!(s.is_empty());
161    }
162
163    #[test]
164    fn insert_and_contains() {
165        let mut s = TwoPSet::new();
166        s.insert("a");
167        assert!(s.contains(&"a"));
168        assert_eq!(s.len(), 1);
169    }
170
171    #[test]
172    fn remove_element() {
173        let mut s = TwoPSet::new();
174        s.insert("a");
175        assert!(s.remove(&"a"));
176        assert!(!s.contains(&"a"));
177        assert_eq!(s.len(), 0);
178    }
179
180    #[test]
181    fn cannot_readd_removed_element() {
182        let mut s = TwoPSet::new();
183        s.insert("a");
184        s.remove(&"a");
185        assert!(!s.insert("a")); // returns false
186        assert!(!s.contains(&"a"));
187    }
188
189    #[test]
190    fn remove_wins_on_merge() {
191        let mut s1 = TwoPSet::new();
192        s1.insert("a");
193        s1.remove(&"a");
194
195        let mut s2 = TwoPSet::new();
196        s2.insert("a"); // concurrent add
197
198        s1.merge(&s2);
199        assert!(!s1.contains(&"a")); // remove wins
200    }
201
202    #[test]
203    fn merge_is_commutative() {
204        let mut s1 = TwoPSet::new();
205        s1.insert("a");
206        s1.insert("b");
207        s1.remove(&"a");
208
209        let mut s2 = TwoPSet::new();
210        s2.insert("b");
211        s2.insert("c");
212
213        let mut left = s1.clone();
214        left.merge(&s2);
215
216        let mut right = s2.clone();
217        right.merge(&s1);
218
219        assert_eq!(left, right);
220    }
221
222    #[test]
223    fn merge_is_idempotent() {
224        let mut s1 = TwoPSet::new();
225        s1.insert("a");
226
227        let mut s2 = TwoPSet::new();
228        s2.insert("b");
229
230        s1.merge(&s2);
231        let after_first = s1.clone();
232        s1.merge(&s2);
233
234        assert_eq!(s1, after_first);
235    }
236
237    #[test]
238    fn delta_apply_equivalent_to_merge() {
239        let mut s1 = TwoPSet::new();
240        s1.insert("a");
241        s1.insert("b");
242        s1.remove(&"a");
243
244        let mut s2 = TwoPSet::new();
245        s2.insert("b");
246        s2.insert("c");
247
248        let mut full = s2.clone();
249        full.merge(&s1);
250
251        let mut via_delta = s2.clone();
252        let d = s1.delta(&s2);
253        via_delta.apply_delta(&d);
254
255        assert_eq!(full, via_delta);
256    }
257
258    #[test]
259    fn delta_carries_removals() {
260        let mut s1 = TwoPSet::new();
261        s1.insert("x");
262        s1.remove(&"x");
263
264        let mut s2 = TwoPSet::new();
265        s2.insert("x");
266
267        let d = s1.delta(&s2);
268        assert!(!d.removed.is_empty());
269
270        s2.apply_delta(&d);
271        assert!(!s2.contains(&"x"));
272    }
273
274    #[test]
275    fn iterate_active_elements() {
276        let mut s = TwoPSet::new();
277        s.insert(1);
278        s.insert(2);
279        s.insert(3);
280        s.remove(&2);
281
282        let active: Vec<&i32> = s.iter().collect();
283        assert_eq!(active, vec![&1, &3]);
284    }
285}