Skip to main content

crdt_kit/
or_set.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::Crdt;
4
5/// An observed-remove set (OR-Set), also known as an add-wins set.
6///
7/// Unlike the 2P-Set, elements can be freely added and removed, and
8/// re-added after removal. Each add operation generates a unique tag.
9/// Remove only removes the tags that the remover has observed, so
10/// concurrent adds are preserved.
11///
12/// # Example
13///
14/// ```
15/// use crdt_kit::prelude::*;
16///
17/// let mut s1 = ORSet::new("node-1");
18/// s1.insert("apple");
19/// s1.insert("banana");
20/// s1.remove(&"banana");
21///
22/// let mut s2 = ORSet::new("node-2");
23/// s2.insert("banana"); // concurrent add
24///
25/// s1.merge(&s2);
26/// // banana is present because s2's add was concurrent with s1's remove
27/// assert!(s1.contains(&"banana"));
28/// assert!(s1.contains(&"apple"));
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct ORSet<T: Ord + Clone> {
32    actor: String,
33    counter: u64,
34    /// element -> set of unique tags (actor, counter)
35    elements: BTreeMap<T, BTreeSet<(String, u64)>>,
36    /// Tombstones: tags that have been removed
37    tombstones: BTreeSet<(String, u64)>,
38}
39
40impl<T: Ord + Clone> ORSet<T> {
41    /// Create a new empty OR-Set for the given actor.
42    pub fn new(actor: impl Into<String>) -> Self {
43        Self {
44            actor: actor.into(),
45            counter: 0,
46            elements: BTreeMap::new(),
47            tombstones: BTreeSet::new(),
48        }
49    }
50
51    /// Insert an element into the set.
52    ///
53    /// Generates a unique tag for this insertion. Even if the element
54    /// was previously removed, this new tag allows it to be re-added.
55    pub fn insert(&mut self, value: T) {
56        self.counter += 1;
57        let tag = (self.actor.clone(), self.counter);
58        self.elements.entry(value).or_default().insert(tag);
59    }
60
61    /// Remove an element from the set.
62    ///
63    /// Only removes the tags that this replica has observed. Concurrent
64    /// adds on other replicas will survive the merge.
65    ///
66    /// Returns `true` if the element was present and removed.
67    pub fn remove(&mut self, value: &T) -> bool {
68        if let Some(tags) = self.elements.remove(value) {
69            self.tombstones.extend(tags);
70            true
71        } else {
72            false
73        }
74    }
75
76    /// Check if the set contains an element.
77    #[must_use]
78    pub fn contains(&self, value: &T) -> bool {
79        self.elements
80            .get(value)
81            .is_some_and(|tags| !tags.is_empty())
82    }
83
84    /// Get the number of distinct elements in the set.
85    #[must_use]
86    pub fn len(&self) -> usize {
87        self.elements
88            .values()
89            .filter(|tags| !tags.is_empty())
90            .count()
91    }
92
93    /// Check if the set is empty.
94    #[must_use]
95    pub fn is_empty(&self) -> bool {
96        self.len() == 0
97    }
98
99    /// Iterate over the elements in the set.
100    pub fn iter(&self) -> impl Iterator<Item = &T> {
101        self.elements
102            .iter()
103            .filter(|(_, tags)| !tags.is_empty())
104            .map(|(v, _)| v)
105    }
106
107    /// Get this replica's actor ID.
108    #[must_use]
109    pub fn actor(&self) -> &str {
110        &self.actor
111    }
112}
113
114impl<T: Ord + Clone> Crdt for ORSet<T> {
115    fn merge(&mut self, other: &Self) {
116        // Merge all elements and their tags
117        for (value, other_tags) in &other.elements {
118            let self_tags = self.elements.entry(value.clone()).or_default();
119            for tag in other_tags {
120                // Only add tag if it's not in our tombstones
121                if !self.tombstones.contains(tag) {
122                    self_tags.insert(tag.clone());
123                }
124            }
125        }
126
127        // Apply other's tombstones to our elements
128        for tag in &other.tombstones {
129            for tags in self.elements.values_mut() {
130                tags.remove(tag);
131            }
132        }
133
134        // Merge tombstones
135        self.tombstones.extend(other.tombstones.iter().cloned());
136
137        // Clean up empty tag sets
138        self.elements.retain(|_, tags| !tags.is_empty());
139
140        // Update counter to be at least as high as the other
141        self.counter = self.counter.max(other.counter);
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn new_set_is_empty() {
151        let s = ORSet::<String>::new("a");
152        assert!(s.is_empty());
153        assert_eq!(s.len(), 0);
154    }
155
156    #[test]
157    fn insert_and_contains() {
158        let mut s = ORSet::new("a");
159        s.insert("x");
160        assert!(s.contains(&"x"));
161        assert_eq!(s.len(), 1);
162    }
163
164    #[test]
165    fn remove_element() {
166        let mut s = ORSet::new("a");
167        s.insert("x");
168        assert!(s.remove(&"x"));
169        assert!(!s.contains(&"x"));
170        assert_eq!(s.len(), 0);
171    }
172
173    #[test]
174    fn can_readd_after_remove() {
175        let mut s = ORSet::new("a");
176        s.insert("x");
177        s.remove(&"x");
178        assert!(!s.contains(&"x"));
179
180        s.insert("x");
181        assert!(s.contains(&"x"));
182    }
183
184    #[test]
185    fn concurrent_add_survives_remove() {
186        let mut s1 = ORSet::new("a");
187        s1.insert("x");
188
189        // s1 removes x
190        s1.remove(&"x");
191
192        // s2 concurrently adds x (new unique tag from different replica)
193        let mut s2 = ORSet::new("b");
194        s2.insert("x");
195
196        s1.merge(&s2);
197        // s2's add should survive because s1 didn't observe that tag
198        assert!(s1.contains(&"x"));
199    }
200
201    #[test]
202    fn merge_is_commutative() {
203        let mut s1 = ORSet::new("a");
204        s1.insert("x");
205        s1.insert("y");
206
207        let mut s2 = ORSet::new("b");
208        s2.insert("y");
209        s2.insert("z");
210
211        let mut left = s1.clone();
212        left.merge(&s2);
213
214        let mut right = s2.clone();
215        right.merge(&s1);
216
217        let left_elems: BTreeSet<_> = left.iter().collect();
218        let right_elems: BTreeSet<_> = right.iter().collect();
219        assert_eq!(left_elems, right_elems);
220    }
221
222    #[test]
223    fn merge_is_idempotent() {
224        let mut s1 = ORSet::new("a");
225        s1.insert("x");
226
227        let mut s2 = ORSet::new("b");
228        s2.insert("y");
229
230        s1.merge(&s2);
231        let after_first = s1.clone();
232        s1.merge(&s2);
233
234        assert_eq!(s1, after_first);
235    }
236
237    #[test]
238    fn add_wins_semantics() {
239        // Simulate: s1 has "x" and removes it, s2 adds "x" concurrently
240        let mut s1 = ORSet::new("a");
241        s1.insert("x");
242        s1.remove(&"x");
243
244        // Different node adds "x" concurrently (new unique tag)
245        let mut s2 = ORSet::new("b");
246        s2.insert("x");
247
248        s1.merge(&s2);
249        // Add wins: "x" should be present because of s2_new's concurrent add
250        assert!(s1.contains(&"x"));
251    }
252
253    #[test]
254    fn remove_nonexistent_returns_false() {
255        let mut s = ORSet::<&str>::new("a");
256        assert!(!s.remove(&"x"));
257    }
258
259    #[test]
260    fn iterate_elements() {
261        let mut s = ORSet::new("a");
262        s.insert(1);
263        s.insert(2);
264        s.insert(3);
265        s.remove(&2);
266
267        let elems: Vec<&i32> = s.iter().collect();
268        assert_eq!(elems, vec![&1, &3]);
269    }
270}