Skip to main content

crdt_kit/
or_set.rs

1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::string::String;
3
4use crate::{Crdt, DeltaCrdt};
5
6/// An observed-remove set (OR-Set), also known as an add-wins set.
7///
8/// Unlike the 2P-Set, elements can be freely added and removed, and
9/// re-added after removal. Each add operation generates a unique tag.
10/// Remove only removes the tags that the remover has observed, so
11/// concurrent adds are preserved.
12///
13/// # Example
14///
15/// ```
16/// use crdt_kit::prelude::*;
17///
18/// let mut s1 = ORSet::new("node-1");
19/// s1.insert("apple");
20/// s1.insert("banana");
21/// s1.remove(&"banana");
22///
23/// let mut s2 = ORSet::new("node-2");
24/// s2.insert("banana"); // concurrent add
25///
26/// s1.merge(&s2);
27/// // banana is present because s2's add was concurrent with s1's remove
28/// assert!(s1.contains(&"banana"));
29/// assert!(s1.contains(&"apple"));
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct ORSet<T: Ord + Clone> {
34    actor: String,
35    counter: u64,
36    /// element -> set of unique tags (actor, counter)
37    elements: BTreeMap<T, BTreeSet<(String, u64)>>,
38    /// Tombstones: tags that have been removed
39    tombstones: BTreeSet<(String, u64)>,
40}
41
42impl<T: Ord + Clone> ORSet<T> {
43    /// Create a new empty OR-Set for the given actor.
44    pub fn new(actor: impl Into<String>) -> Self {
45        Self {
46            actor: actor.into(),
47            counter: 0,
48            elements: BTreeMap::new(),
49            tombstones: BTreeSet::new(),
50        }
51    }
52
53    /// Insert an element into the set.
54    ///
55    /// Generates a unique tag for this insertion. Even if the element
56    /// was previously removed, this new tag allows it to be re-added.
57    pub fn insert(&mut self, value: T) {
58        self.counter += 1;
59        let tag = (self.actor.clone(), self.counter);
60        self.elements.entry(value).or_default().insert(tag);
61    }
62
63    /// Remove an element from the set.
64    ///
65    /// Only removes the tags that this replica has observed. Concurrent
66    /// adds on other replicas will survive the merge.
67    ///
68    /// Returns `true` if the element was present and removed.
69    pub fn remove(&mut self, value: &T) -> bool {
70        if let Some(tags) = self.elements.remove(value) {
71            self.tombstones.extend(tags);
72            true
73        } else {
74            false
75        }
76    }
77
78    /// Check if the set contains an element.
79    #[must_use]
80    pub fn contains(&self, value: &T) -> bool {
81        self.elements
82            .get(value)
83            .is_some_and(|tags| !tags.is_empty())
84    }
85
86    /// Get the number of distinct elements in the set.
87    #[must_use]
88    pub fn len(&self) -> usize {
89        self.elements
90            .values()
91            .filter(|tags| !tags.is_empty())
92            .count()
93    }
94
95    /// Check if the set is empty.
96    #[must_use]
97    pub fn is_empty(&self) -> bool {
98        self.len() == 0
99    }
100
101    /// Iterate over the elements in the set.
102    pub fn iter(&self) -> impl Iterator<Item = &T> {
103        self.elements
104            .iter()
105            .filter(|(_, tags)| !tags.is_empty())
106            .map(|(v, _)| v)
107    }
108
109    /// Get this replica's actor ID.
110    #[must_use]
111    pub fn actor(&self) -> &str {
112        &self.actor
113    }
114}
115
116impl<T: Ord + Clone> IntoIterator for ORSet<T> {
117    type Item = T;
118    type IntoIter = alloc::vec::IntoIter<T>;
119
120    fn into_iter(self) -> Self::IntoIter {
121        let items: alloc::vec::Vec<T> = self
122            .elements
123            .into_iter()
124            .filter(|(_, tags)| !tags.is_empty())
125            .map(|(v, _)| v)
126            .collect();
127        items.into_iter()
128    }
129}
130
131impl<T: Ord + Clone> Crdt for ORSet<T> {
132    fn merge(&mut self, other: &Self) {
133        // Merge all elements and their tags
134        for (value, other_tags) in &other.elements {
135            let self_tags = self.elements.entry(value.clone()).or_default();
136            for tag in other_tags {
137                // Only add tag if it's not in our tombstones
138                if !self.tombstones.contains(tag) {
139                    self_tags.insert(tag.clone());
140                }
141            }
142        }
143
144        // Apply other's tombstones to our elements
145        for tag in &other.tombstones {
146            for tags in self.elements.values_mut() {
147                tags.remove(tag);
148            }
149        }
150
151        // Merge tombstones
152        self.tombstones.extend(other.tombstones.iter().cloned());
153
154        // Clean up empty tag sets
155        self.elements.retain(|_, tags| !tags.is_empty());
156
157        // Update counter to be at least as high as the other
158        self.counter = self.counter.max(other.counter);
159    }
160}
161
162/// Delta for [`ORSet`]: new element tags and new tombstones since another state.
163#[derive(Debug, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165pub struct ORSetDelta<T: Ord + Clone> {
166    /// New element-tag pairs that the other replica doesn't have.
167    additions: BTreeMap<T, BTreeSet<(String, u64)>>,
168    /// New tombstones that the other replica doesn't have.
169    tombstones: BTreeSet<(String, u64)>,
170}
171
172impl<T: Ord + Clone> DeltaCrdt for ORSet<T> {
173    type Delta = ORSetDelta<T>;
174
175    fn delta(&self, other: &Self) -> ORSetDelta<T> {
176        let mut additions = BTreeMap::new();
177        for (value, self_tags) in &self.elements {
178            let other_tags = other.elements.get(value);
179            let new_tags: BTreeSet<_> = self_tags
180                .iter()
181                .filter(|tag| {
182                    other_tags.map_or(true, |ot| !ot.contains(*tag))
183                        && !other.tombstones.contains(*tag)
184                })
185                .cloned()
186                .collect();
187            if !new_tags.is_empty() {
188                additions.insert(value.clone(), new_tags);
189            }
190        }
191
192        let tombstones: BTreeSet<_> = self
193            .tombstones
194            .difference(&other.tombstones)
195            .cloned()
196            .collect();
197
198        ORSetDelta {
199            additions,
200            tombstones,
201        }
202    }
203
204    fn apply_delta(&mut self, delta: &ORSetDelta<T>) {
205        // Apply additions
206        for (value, tags) in &delta.additions {
207            let self_tags = self.elements.entry(value.clone()).or_default();
208            for tag in tags {
209                if !self.tombstones.contains(tag) {
210                    self_tags.insert(tag.clone());
211                }
212            }
213        }
214
215        // Apply tombstones
216        for tag in &delta.tombstones {
217            for tags in self.elements.values_mut() {
218                tags.remove(tag);
219            }
220        }
221        self.tombstones.extend(delta.tombstones.iter().cloned());
222
223        // Clean up empty tag sets
224        self.elements.retain(|_, tags| !tags.is_empty());
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn new_set_is_empty() {
234        let s = ORSet::<String>::new("a");
235        assert!(s.is_empty());
236        assert_eq!(s.len(), 0);
237    }
238
239    #[test]
240    fn insert_and_contains() {
241        let mut s = ORSet::new("a");
242        s.insert("x");
243        assert!(s.contains(&"x"));
244        assert_eq!(s.len(), 1);
245    }
246
247    #[test]
248    fn remove_element() {
249        let mut s = ORSet::new("a");
250        s.insert("x");
251        assert!(s.remove(&"x"));
252        assert!(!s.contains(&"x"));
253        assert_eq!(s.len(), 0);
254    }
255
256    #[test]
257    fn can_readd_after_remove() {
258        let mut s = ORSet::new("a");
259        s.insert("x");
260        s.remove(&"x");
261        assert!(!s.contains(&"x"));
262
263        s.insert("x");
264        assert!(s.contains(&"x"));
265    }
266
267    #[test]
268    fn concurrent_add_survives_remove() {
269        let mut s1 = ORSet::new("a");
270        s1.insert("x");
271
272        // s1 removes x
273        s1.remove(&"x");
274
275        // s2 concurrently adds x (new unique tag from different replica)
276        let mut s2 = ORSet::new("b");
277        s2.insert("x");
278
279        s1.merge(&s2);
280        // s2's add should survive because s1 didn't observe that tag
281        assert!(s1.contains(&"x"));
282    }
283
284    #[test]
285    fn merge_is_commutative() {
286        let mut s1 = ORSet::new("a");
287        s1.insert("x");
288        s1.insert("y");
289
290        let mut s2 = ORSet::new("b");
291        s2.insert("y");
292        s2.insert("z");
293
294        let mut left = s1.clone();
295        left.merge(&s2);
296
297        let mut right = s2.clone();
298        right.merge(&s1);
299
300        let left_elems: BTreeSet<_> = left.iter().collect();
301        let right_elems: BTreeSet<_> = right.iter().collect();
302        assert_eq!(left_elems, right_elems);
303    }
304
305    #[test]
306    fn merge_is_idempotent() {
307        let mut s1 = ORSet::new("a");
308        s1.insert("x");
309
310        let mut s2 = ORSet::new("b");
311        s2.insert("y");
312
313        s1.merge(&s2);
314        let after_first = s1.clone();
315        s1.merge(&s2);
316
317        assert_eq!(s1, after_first);
318    }
319
320    #[test]
321    fn add_wins_semantics() {
322        // Simulate: s1 has "x" and removes it, s2 adds "x" concurrently
323        let mut s1 = ORSet::new("a");
324        s1.insert("x");
325        s1.remove(&"x");
326
327        // Different node adds "x" concurrently (new unique tag)
328        let mut s2 = ORSet::new("b");
329        s2.insert("x");
330
331        s1.merge(&s2);
332        // Add wins: "x" should be present because of s2_new's concurrent add
333        assert!(s1.contains(&"x"));
334    }
335
336    #[test]
337    fn remove_nonexistent_returns_false() {
338        let mut s = ORSet::<&str>::new("a");
339        assert!(!s.remove(&"x"));
340    }
341
342    #[test]
343    fn iterate_elements() {
344        let mut s = ORSet::new("a");
345        s.insert(1);
346        s.insert(2);
347        s.insert(3);
348        s.remove(&2);
349
350        let elems: Vec<&i32> = s.iter().collect();
351        assert_eq!(elems, vec![&1, &3]);
352    }
353
354    #[test]
355    fn delta_apply_equivalent_to_merge() {
356        let mut s1 = ORSet::new("a");
357        s1.insert("x");
358        s1.insert("y");
359        s1.remove(&"x");
360
361        let mut s2 = ORSet::new("b");
362        s2.insert("y");
363        s2.insert("z");
364
365        let mut full = s2.clone();
366        full.merge(&s1);
367
368        let mut via_delta = s2.clone();
369        let d = s1.delta(&s2);
370        via_delta.apply_delta(&d);
371
372        let full_elems: BTreeSet<_> = full.iter().collect();
373        let delta_elems: BTreeSet<_> = via_delta.iter().collect();
374        assert_eq!(full_elems, delta_elems);
375    }
376
377    #[test]
378    fn delta_is_empty_when_equal() {
379        let mut s1 = ORSet::new("a");
380        s1.insert("x");
381
382        let s2 = s1.clone();
383        let d = s1.delta(&s2);
384        assert!(d.additions.is_empty());
385        assert!(d.tombstones.is_empty());
386    }
387
388    #[test]
389    fn delta_carries_tombstones() {
390        let mut s1 = ORSet::new("a");
391        s1.insert("x");
392
393        let s2 = s1.clone();
394        s1.remove(&"x");
395
396        let d = s1.delta(&s2);
397        assert!(!d.tombstones.is_empty());
398
399        let mut via_delta = s2.clone();
400        via_delta.apply_delta(&d);
401        assert!(!via_delta.contains(&"x"));
402    }
403}