Skip to main content

crdt_kit/
or_set.rs

1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::vec::Vec;
3
4use crate::{Crdt, DeltaCrdt, NodeId};
5
6/// An observed-remove set (OR-Set), also known as an add-wins set.
7///
8/// Unlike the 2P-Set, elements can be freely added and removed, and
9/// re-added after removal. Each add operation generates a unique tag.
10/// Remove only removes the tags that the remover has observed, so
11/// concurrent adds are preserved.
12///
13/// # Example
14///
15/// ```
16/// use crdt_kit::prelude::*;
17///
18/// let mut s1 = ORSet::new(1);
19/// s1.insert("apple");
20/// s1.insert("banana");
21/// s1.remove(&"banana");
22///
23/// let mut s2 = ORSet::new(2);
24/// s2.insert("banana"); // concurrent add
25///
26/// s1.merge(&s2);
27/// // banana is present because s2's add was concurrent with s1's remove
28/// assert!(s1.contains(&"banana"));
29/// assert!(s1.contains(&"apple"));
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct ORSet<T: Ord + Clone> {
34    actor: NodeId,
35    counter: u64,
36    /// element -> set of unique tags (actor, counter)
37    elements: BTreeMap<T, BTreeSet<(NodeId, u64)>>,
38    /// Tombstones: tags that have been removed
39    tombstones: BTreeSet<(NodeId, u64)>,
40}
41
42impl<T: Ord + Clone> ORSet<T> {
43    /// Create a new empty OR-Set for the given node.
44    pub fn new(actor: NodeId) -> Self {
45        Self {
46            actor,
47            counter: 0,
48            elements: BTreeMap::new(),
49            tombstones: BTreeSet::new(),
50        }
51    }
52
53    /// Insert an element into the set.
54    ///
55    /// Generates a unique tag for this insertion. Even if the element
56    /// was previously removed, this new tag allows it to be re-added.
57    pub fn insert(&mut self, value: T) {
58        self.counter += 1;
59        let tag = (self.actor, self.counter);
60        self.elements.entry(value).or_default().insert(tag);
61    }
62
63    /// Remove an element from the set.
64    ///
65    /// Only removes the tags that this replica has observed. Concurrent
66    /// adds on other replicas will survive the merge.
67    ///
68    /// Returns `true` if the element was present and removed.
69    pub fn remove(&mut self, value: &T) -> bool {
70        if let Some(tags) = self.elements.remove(value) {
71            self.tombstones.extend(tags);
72            true
73        } else {
74            false
75        }
76    }
77
78    /// Check if the set contains an element.
79    #[must_use]
80    pub fn contains(&self, value: &T) -> bool {
81        self.elements
82            .get(value)
83            .is_some_and(|tags| !tags.is_empty())
84    }
85
86    /// Get the number of distinct elements in the set.
87    #[must_use]
88    pub fn len(&self) -> usize {
89        self.elements
90            .values()
91            .filter(|tags| !tags.is_empty())
92            .count()
93    }
94
95    /// Check if the set is empty.
96    #[must_use]
97    pub fn is_empty(&self) -> bool {
98        self.len() == 0
99    }
100
101    /// Iterate over the elements in the set.
102    pub fn iter(&self) -> impl Iterator<Item = &T> {
103        self.elements
104            .iter()
105            .filter(|(_, tags)| !tags.is_empty())
106            .map(|(v, _)| v)
107    }
108
109    /// Get this replica's node ID.
110    #[must_use]
111    pub fn actor(&self) -> NodeId {
112        self.actor
113    }
114
115    /// Get the number of tombstones stored.
116    #[must_use]
117    pub fn tombstone_count(&self) -> usize {
118        self.tombstones.len()
119    }
120
121    /// Remove tombstones that are no longer needed.
122    ///
123    /// A tombstone is only needed while it might suppress a tag that hasn't
124    /// been propagated yet. Once no live element carries the same tag, the
125    /// tombstone is safe to discard.
126    ///
127    /// **Note:** this is a local-only GC. It is safe to call at any time, but
128    /// for best results call it after all peers have converged.
129    ///
130    /// Returns the number of tombstones removed.
131    pub fn compact_tombstones(&mut self) -> usize {
132        let live_tags: BTreeSet<&(NodeId, u64)> = self
133            .elements
134            .values()
135            .flat_map(|tags| tags.iter())
136            .collect();
137
138        let before = self.tombstones.len();
139        // Keep only tombstones whose tag is still live somewhere (shouldn't
140        // normally happen, but guards against partial merges). All others
141        // are safely discarded.
142        self.tombstones.retain(|tag| live_tags.contains(tag));
143        before - self.tombstones.len()
144    }
145
146    /// Aggressively remove **all** tombstones.
147    ///
148    /// This is safe only when every peer has converged to the same state.
149    ///
150    /// Returns the number of tombstones removed.
151    pub fn compact_tombstones_all(&mut self) -> usize {
152        let count = self.tombstones.len();
153        self.tombstones.clear();
154        count
155    }
156}
157
158impl<T: Ord + Clone> IntoIterator for ORSet<T> {
159    type Item = T;
160    type IntoIter = alloc::vec::IntoIter<T>;
161
162    fn into_iter(self) -> Self::IntoIter {
163        let items: Vec<T> = self
164            .elements
165            .into_iter()
166            .filter(|(_, tags)| !tags.is_empty())
167            .map(|(v, _)| v)
168            .collect();
169        items.into_iter()
170    }
171}
172
173impl<T: Ord + Clone> Crdt for ORSet<T> {
174    fn merge(&mut self, other: &Self) {
175        for (value, other_tags) in &other.elements {
176            let self_tags = self.elements.entry(value.clone()).or_default();
177            for &tag in other_tags {
178                if !self.tombstones.contains(&tag) {
179                    self_tags.insert(tag);
180                }
181            }
182        }
183
184        for &tag in &other.tombstones {
185            for tags in self.elements.values_mut() {
186                tags.remove(&tag);
187            }
188        }
189
190        self.tombstones.extend(&other.tombstones);
191
192        self.elements.retain(|_, tags| !tags.is_empty());
193
194        self.counter = self.counter.max(other.counter);
195    }
196}
197
198/// Delta for [`ORSet`]: new element tags and new tombstones since another state.
199#[derive(Debug, Clone, PartialEq, Eq)]
200#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
201pub struct ORSetDelta<T: Ord + Clone> {
202    /// New element-tag pairs that the other replica doesn't have.
203    additions: BTreeMap<T, BTreeSet<(NodeId, u64)>>,
204    /// New tombstones that the other replica doesn't have.
205    tombstones: BTreeSet<(NodeId, u64)>,
206}
207
208impl<T: Ord + Clone> DeltaCrdt for ORSet<T> {
209    type Delta = ORSetDelta<T>;
210
211    fn delta(&self, other: &Self) -> ORSetDelta<T> {
212        let mut additions = BTreeMap::new();
213        for (value, self_tags) in &self.elements {
214            let other_tags = other.elements.get(value);
215            let new_tags: BTreeSet<_> = self_tags
216                .iter()
217                .filter(|tag| {
218                    other_tags.map_or(true, |ot| !ot.contains(*tag))
219                        && !other.tombstones.contains(*tag)
220                })
221                .copied()
222                .collect();
223            if !new_tags.is_empty() {
224                additions.insert(value.clone(), new_tags);
225            }
226        }
227
228        let tombstones: BTreeSet<_> = self
229            .tombstones
230            .difference(&other.tombstones)
231            .copied()
232            .collect();
233
234        ORSetDelta {
235            additions,
236            tombstones,
237        }
238    }
239
240    fn apply_delta(&mut self, delta: &ORSetDelta<T>) {
241        for (value, tags) in &delta.additions {
242            let self_tags = self.elements.entry(value.clone()).or_default();
243            for &tag in tags {
244                if !self.tombstones.contains(&tag) {
245                    self_tags.insert(tag);
246                }
247            }
248        }
249
250        for &tag in &delta.tombstones {
251            for tags in self.elements.values_mut() {
252                tags.remove(&tag);
253            }
254        }
255        self.tombstones.extend(&delta.tombstones);
256
257        self.elements.retain(|_, tags| !tags.is_empty());
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn new_set_is_empty() {
267        let s = ORSet::<String>::new(1);
268        assert!(s.is_empty());
269        assert_eq!(s.len(), 0);
270    }
271
272    #[test]
273    fn insert_and_contains() {
274        let mut s = ORSet::new(1);
275        s.insert("x");
276        assert!(s.contains(&"x"));
277        assert_eq!(s.len(), 1);
278    }
279
280    #[test]
281    fn remove_element() {
282        let mut s = ORSet::new(1);
283        s.insert("x");
284        assert!(s.remove(&"x"));
285        assert!(!s.contains(&"x"));
286        assert_eq!(s.len(), 0);
287    }
288
289    #[test]
290    fn can_readd_after_remove() {
291        let mut s = ORSet::new(1);
292        s.insert("x");
293        s.remove(&"x");
294        assert!(!s.contains(&"x"));
295
296        s.insert("x");
297        assert!(s.contains(&"x"));
298    }
299
300    #[test]
301    fn concurrent_add_survives_remove() {
302        let mut s1 = ORSet::new(1);
303        s1.insert("x");
304        s1.remove(&"x");
305
306        let mut s2 = ORSet::new(2);
307        s2.insert("x");
308
309        s1.merge(&s2);
310        assert!(s1.contains(&"x"));
311    }
312
313    #[test]
314    fn merge_is_commutative() {
315        let mut s1 = ORSet::new(1);
316        s1.insert("x");
317        s1.insert("y");
318
319        let mut s2 = ORSet::new(2);
320        s2.insert("y");
321        s2.insert("z");
322
323        let mut left = s1.clone();
324        left.merge(&s2);
325
326        let mut right = s2.clone();
327        right.merge(&s1);
328
329        let left_elems: BTreeSet<_> = left.iter().collect();
330        let right_elems: BTreeSet<_> = right.iter().collect();
331        assert_eq!(left_elems, right_elems);
332    }
333
334    #[test]
335    fn merge_is_idempotent() {
336        let mut s1 = ORSet::new(1);
337        s1.insert("x");
338
339        let mut s2 = ORSet::new(2);
340        s2.insert("y");
341
342        s1.merge(&s2);
343        let after_first = s1.clone();
344        s1.merge(&s2);
345
346        assert_eq!(s1, after_first);
347    }
348
349    #[test]
350    fn add_wins_semantics() {
351        let mut s1 = ORSet::new(1);
352        s1.insert("x");
353        s1.remove(&"x");
354
355        let mut s2 = ORSet::new(2);
356        s2.insert("x");
357
358        s1.merge(&s2);
359        assert!(s1.contains(&"x"));
360    }
361
362    #[test]
363    fn remove_nonexistent_returns_false() {
364        let mut s = ORSet::<&str>::new(1);
365        assert!(!s.remove(&"x"));
366    }
367
368    #[test]
369    fn iterate_elements() {
370        let mut s = ORSet::new(1);
371        s.insert(1);
372        s.insert(2);
373        s.insert(3);
374        s.remove(&2);
375
376        let elems: Vec<&i32> = s.iter().collect();
377        assert_eq!(elems, vec![&1, &3]);
378    }
379
380    #[test]
381    fn delta_apply_equivalent_to_merge() {
382        let mut s1 = ORSet::new(1);
383        s1.insert("x");
384        s1.insert("y");
385        s1.remove(&"x");
386
387        let mut s2 = ORSet::new(2);
388        s2.insert("y");
389        s2.insert("z");
390
391        let mut full = s2.clone();
392        full.merge(&s1);
393
394        let mut via_delta = s2.clone();
395        let d = s1.delta(&s2);
396        via_delta.apply_delta(&d);
397
398        let full_elems: BTreeSet<_> = full.iter().collect();
399        let delta_elems: BTreeSet<_> = via_delta.iter().collect();
400        assert_eq!(full_elems, delta_elems);
401    }
402
403    #[test]
404    fn delta_is_empty_when_equal() {
405        let mut s1 = ORSet::new(1);
406        s1.insert("x");
407
408        let s2 = s1.clone();
409        let d = s1.delta(&s2);
410        assert!(d.additions.is_empty());
411        assert!(d.tombstones.is_empty());
412    }
413
414    #[test]
415    fn tombstone_count_tracks_removals() {
416        let mut s = ORSet::new(1);
417        s.insert("x");
418        s.insert("y");
419        assert_eq!(s.tombstone_count(), 0);
420
421        s.remove(&"x");
422        assert_eq!(s.tombstone_count(), 1);
423
424        s.remove(&"y");
425        assert_eq!(s.tombstone_count(), 2);
426    }
427
428    #[test]
429    fn compact_tombstones_removes_dangling() {
430        let mut s = ORSet::new(1);
431        s.insert("x");
432        s.insert("y");
433        s.remove(&"x");
434        s.remove(&"y");
435
436        assert_eq!(s.tombstone_count(), 2);
437        let removed = s.compact_tombstones();
438        assert_eq!(removed, 2);
439        assert_eq!(s.tombstone_count(), 0);
440    }
441
442    #[test]
443    fn compact_tombstones_all_clears_everything() {
444        let mut s = ORSet::new(1);
445        s.insert("x");
446        s.remove(&"x");
447        s.insert("y");
448        s.remove(&"y");
449
450        assert_eq!(s.compact_tombstones_all(), 2);
451        assert_eq!(s.tombstone_count(), 0);
452    }
453
454    #[test]
455    fn delta_carries_tombstones() {
456        let mut s1 = ORSet::new(1);
457        s1.insert("x");
458
459        let s2 = s1.clone();
460        s1.remove(&"x");
461
462        let d = s1.delta(&s2);
463        assert!(!d.tombstones.is_empty());
464
465        let mut via_delta = s2.clone();
466        via_delta.apply_delta(&d);
467        assert!(!via_delta.contains(&"x"));
468    }
469}