Skip to main content

crdt_kit/
twop_set.rs

1use std::collections::BTreeSet;
2
3use crate::Crdt;
4
5/// A two-phase set (2P-Set).
6///
7/// Elements can be added and removed, but once removed, they cannot be
8/// re-added. This is implemented with two G-Sets: one for additions
9/// and one for removals (tombstones).
10///
11/// # Example
12///
13/// ```
14/// use crdt_kit::prelude::*;
15///
16/// let mut s1 = TwoPSet::new();
17/// s1.insert("apple");
18/// s1.insert("banana");
19/// s1.remove(&"banana");
20///
21/// assert!(s1.contains(&"apple"));
22/// assert!(!s1.contains(&"banana")); // removed
23///
24/// let mut s2 = TwoPSet::new();
25/// s2.insert("banana"); // trying to re-add on another replica
26///
27/// s1.merge(&s2);
28/// assert!(!s1.contains(&"banana")); // still removed (tombstone wins)
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct TwoPSet<T: Ord + Clone> {
32    added: BTreeSet<T>,
33    removed: BTreeSet<T>,
34}
35
36impl<T: Ord + Clone> TwoPSet<T> {
37    /// Create a new empty 2P-Set.
38    #[must_use]
39    pub fn new() -> Self {
40        Self {
41            added: BTreeSet::new(),
42            removed: BTreeSet::new(),
43        }
44    }
45
46    /// Insert an element.
47    ///
48    /// Returns `true` if the element was newly added (not previously
49    /// removed). If the element was already removed, it cannot be re-added
50    /// and this returns `false`.
51    pub fn insert(&mut self, value: T) -> bool {
52        if self.removed.contains(&value) {
53            return false;
54        }
55        self.added.insert(value)
56    }
57
58    /// Remove an element.
59    ///
60    /// The element must have been added first. Once removed, it can never
61    /// be re-added. Returns `true` if the element was present and is now removed.
62    pub fn remove(&mut self, value: &T) -> bool {
63        if self.added.contains(value) && !self.removed.contains(value) {
64            self.removed.insert(value.clone());
65            true
66        } else {
67            false
68        }
69    }
70
71    /// Check if the set contains an element (added and not removed).
72    #[must_use]
73    pub fn contains(&self, value: &T) -> bool {
74        self.added.contains(value) && !self.removed.contains(value)
75    }
76
77    /// Get the number of active elements (added minus removed).
78    #[must_use]
79    pub fn len(&self) -> usize {
80        self.added.difference(&self.removed).count()
81    }
82
83    /// Check if the set has no active elements.
84    #[must_use]
85    pub fn is_empty(&self) -> bool {
86        self.len() == 0
87    }
88
89    /// Iterate over active elements (added and not removed).
90    pub fn iter(&self) -> impl Iterator<Item = &T> {
91        self.added.difference(&self.removed)
92    }
93}
94
95impl<T: Ord + Clone> Default for TwoPSet<T> {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl<T: Ord + Clone> Crdt for TwoPSet<T> {
102    fn merge(&mut self, other: &Self) {
103        for elem in &other.added {
104            self.added.insert(elem.clone());
105        }
106        for elem in &other.removed {
107            self.removed.insert(elem.clone());
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[test]
117    fn new_set_is_empty() {
118        let s = TwoPSet::<String>::new();
119        assert!(s.is_empty());
120    }
121
122    #[test]
123    fn insert_and_contains() {
124        let mut s = TwoPSet::new();
125        s.insert("a");
126        assert!(s.contains(&"a"));
127        assert_eq!(s.len(), 1);
128    }
129
130    #[test]
131    fn remove_element() {
132        let mut s = TwoPSet::new();
133        s.insert("a");
134        assert!(s.remove(&"a"));
135        assert!(!s.contains(&"a"));
136        assert_eq!(s.len(), 0);
137    }
138
139    #[test]
140    fn cannot_readd_removed_element() {
141        let mut s = TwoPSet::new();
142        s.insert("a");
143        s.remove(&"a");
144        assert!(!s.insert("a")); // returns false
145        assert!(!s.contains(&"a"));
146    }
147
148    #[test]
149    fn remove_wins_on_merge() {
150        let mut s1 = TwoPSet::new();
151        s1.insert("a");
152        s1.remove(&"a");
153
154        let mut s2 = TwoPSet::new();
155        s2.insert("a"); // concurrent add
156
157        s1.merge(&s2);
158        assert!(!s1.contains(&"a")); // remove wins
159    }
160
161    #[test]
162    fn merge_is_commutative() {
163        let mut s1 = TwoPSet::new();
164        s1.insert("a");
165        s1.insert("b");
166        s1.remove(&"a");
167
168        let mut s2 = TwoPSet::new();
169        s2.insert("b");
170        s2.insert("c");
171
172        let mut left = s1.clone();
173        left.merge(&s2);
174
175        let mut right = s2.clone();
176        right.merge(&s1);
177
178        assert_eq!(left, right);
179    }
180
181    #[test]
182    fn merge_is_idempotent() {
183        let mut s1 = TwoPSet::new();
184        s1.insert("a");
185
186        let mut s2 = TwoPSet::new();
187        s2.insert("b");
188
189        s1.merge(&s2);
190        let after_first = s1.clone();
191        s1.merge(&s2);
192
193        assert_eq!(s1, after_first);
194    }
195
196    #[test]
197    fn iterate_active_elements() {
198        let mut s = TwoPSet::new();
199        s.insert(1);
200        s.insert(2);
201        s.insert(3);
202        s.remove(&2);
203
204        let active: Vec<&i32> = s.iter().collect();
205        assert_eq!(active, vec![&1, &3]);
206    }
207}