Skip to main content

crdt_kit/
or_set.rs

1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::string::String;
3
4use crate::{Crdt, DeltaCrdt};
5
6/// An observed-remove set (OR-Set), also known as an add-wins set.
7///
8/// Unlike the 2P-Set, elements can be freely added and removed, and
9/// re-added after removal. Each add operation generates a unique tag.
10/// Remove only removes the tags that the remover has observed, so
11/// concurrent adds are preserved.
12///
13/// # Example
14///
15/// ```
16/// use crdt_kit::prelude::*;
17///
18/// let mut s1 = ORSet::new("node-1");
19/// s1.insert("apple");
20/// s1.insert("banana");
21/// s1.remove(&"banana");
22///
23/// let mut s2 = ORSet::new("node-2");
24/// s2.insert("banana"); // concurrent add
25///
26/// s1.merge(&s2);
27/// // banana is present because s2's add was concurrent with s1's remove
28/// assert!(s1.contains(&"banana"));
29/// assert!(s1.contains(&"apple"));
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct ORSet<T: Ord + Clone> {
34    actor: String,
35    counter: u64,
36    /// element -> set of unique tags (actor, counter)
37    elements: BTreeMap<T, BTreeSet<(String, u64)>>,
38    /// Tombstones: tags that have been removed
39    tombstones: BTreeSet<(String, u64)>,
40}
41
42impl<T: Ord + Clone> ORSet<T> {
43    /// Create a new empty OR-Set for the given actor.
44    pub fn new(actor: impl Into<String>) -> Self {
45        Self {
46            actor: actor.into(),
47            counter: 0,
48            elements: BTreeMap::new(),
49            tombstones: BTreeSet::new(),
50        }
51    }
52
53    /// Insert an element into the set.
54    ///
55    /// Generates a unique tag for this insertion. Even if the element
56    /// was previously removed, this new tag allows it to be re-added.
57    pub fn insert(&mut self, value: T) {
58        self.counter += 1;
59        let tag = (self.actor.clone(), self.counter);
60        self.elements.entry(value).or_default().insert(tag);
61    }
62
63    /// Remove an element from the set.
64    ///
65    /// Only removes the tags that this replica has observed. Concurrent
66    /// adds on other replicas will survive the merge.
67    ///
68    /// Returns `true` if the element was present and removed.
69    pub fn remove(&mut self, value: &T) -> bool {
70        if let Some(tags) = self.elements.remove(value) {
71            self.tombstones.extend(tags);
72            true
73        } else {
74            false
75        }
76    }
77
78    /// Check if the set contains an element.
79    #[must_use]
80    pub fn contains(&self, value: &T) -> bool {
81        self.elements
82            .get(value)
83            .is_some_and(|tags| !tags.is_empty())
84    }
85
86    /// Get the number of distinct elements in the set.
87    #[must_use]
88    pub fn len(&self) -> usize {
89        self.elements
90            .values()
91            .filter(|tags| !tags.is_empty())
92            .count()
93    }
94
95    /// Check if the set is empty.
96    #[must_use]
97    pub fn is_empty(&self) -> bool {
98        self.len() == 0
99    }
100
101    /// Iterate over the elements in the set.
102    pub fn iter(&self) -> impl Iterator<Item = &T> {
103        self.elements
104            .iter()
105            .filter(|(_, tags)| !tags.is_empty())
106            .map(|(v, _)| v)
107    }
108
109    /// Get this replica's actor ID.
110    #[must_use]
111    pub fn actor(&self) -> &str {
112        &self.actor
113    }
114}
115
116impl<T: Ord + Clone> Crdt for ORSet<T> {
117    fn merge(&mut self, other: &Self) {
118        // Merge all elements and their tags
119        for (value, other_tags) in &other.elements {
120            let self_tags = self.elements.entry(value.clone()).or_default();
121            for tag in other_tags {
122                // Only add tag if it's not in our tombstones
123                if !self.tombstones.contains(tag) {
124                    self_tags.insert(tag.clone());
125                }
126            }
127        }
128
129        // Apply other's tombstones to our elements
130        for tag in &other.tombstones {
131            for tags in self.elements.values_mut() {
132                tags.remove(tag);
133            }
134        }
135
136        // Merge tombstones
137        self.tombstones.extend(other.tombstones.iter().cloned());
138
139        // Clean up empty tag sets
140        self.elements.retain(|_, tags| !tags.is_empty());
141
142        // Update counter to be at least as high as the other
143        self.counter = self.counter.max(other.counter);
144    }
145}
146
147/// Delta for [`ORSet`]: new element tags and new tombstones since another state.
148#[derive(Debug, Clone, PartialEq, Eq)]
149#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
150pub struct ORSetDelta<T: Ord + Clone> {
151    /// New element-tag pairs that the other replica doesn't have.
152    additions: BTreeMap<T, BTreeSet<(String, u64)>>,
153    /// New tombstones that the other replica doesn't have.
154    tombstones: BTreeSet<(String, u64)>,
155}
156
157impl<T: Ord + Clone> DeltaCrdt for ORSet<T> {
158    type Delta = ORSetDelta<T>;
159
160    fn delta(&self, other: &Self) -> ORSetDelta<T> {
161        let mut additions = BTreeMap::new();
162        for (value, self_tags) in &self.elements {
163            let other_tags = other.elements.get(value);
164            let new_tags: BTreeSet<_> = self_tags
165                .iter()
166                .filter(|tag| {
167                    other_tags.map_or(true, |ot| !ot.contains(*tag))
168                        && !other.tombstones.contains(*tag)
169                })
170                .cloned()
171                .collect();
172            if !new_tags.is_empty() {
173                additions.insert(value.clone(), new_tags);
174            }
175        }
176
177        let tombstones: BTreeSet<_> = self
178            .tombstones
179            .difference(&other.tombstones)
180            .cloned()
181            .collect();
182
183        ORSetDelta {
184            additions,
185            tombstones,
186        }
187    }
188
189    fn apply_delta(&mut self, delta: &ORSetDelta<T>) {
190        // Apply additions
191        for (value, tags) in &delta.additions {
192            let self_tags = self.elements.entry(value.clone()).or_default();
193            for tag in tags {
194                if !self.tombstones.contains(tag) {
195                    self_tags.insert(tag.clone());
196                }
197            }
198        }
199
200        // Apply tombstones
201        for tag in &delta.tombstones {
202            for tags in self.elements.values_mut() {
203                tags.remove(tag);
204            }
205        }
206        self.tombstones.extend(delta.tombstones.iter().cloned());
207
208        // Clean up empty tag sets
209        self.elements.retain(|_, tags| !tags.is_empty());
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn new_set_is_empty() {
219        let s = ORSet::<String>::new("a");
220        assert!(s.is_empty());
221        assert_eq!(s.len(), 0);
222    }
223
224    #[test]
225    fn insert_and_contains() {
226        let mut s = ORSet::new("a");
227        s.insert("x");
228        assert!(s.contains(&"x"));
229        assert_eq!(s.len(), 1);
230    }
231
232    #[test]
233    fn remove_element() {
234        let mut s = ORSet::new("a");
235        s.insert("x");
236        assert!(s.remove(&"x"));
237        assert!(!s.contains(&"x"));
238        assert_eq!(s.len(), 0);
239    }
240
241    #[test]
242    fn can_readd_after_remove() {
243        let mut s = ORSet::new("a");
244        s.insert("x");
245        s.remove(&"x");
246        assert!(!s.contains(&"x"));
247
248        s.insert("x");
249        assert!(s.contains(&"x"));
250    }
251
252    #[test]
253    fn concurrent_add_survives_remove() {
254        let mut s1 = ORSet::new("a");
255        s1.insert("x");
256
257        // s1 removes x
258        s1.remove(&"x");
259
260        // s2 concurrently adds x (new unique tag from different replica)
261        let mut s2 = ORSet::new("b");
262        s2.insert("x");
263
264        s1.merge(&s2);
265        // s2's add should survive because s1 didn't observe that tag
266        assert!(s1.contains(&"x"));
267    }
268
269    #[test]
270    fn merge_is_commutative() {
271        let mut s1 = ORSet::new("a");
272        s1.insert("x");
273        s1.insert("y");
274
275        let mut s2 = ORSet::new("b");
276        s2.insert("y");
277        s2.insert("z");
278
279        let mut left = s1.clone();
280        left.merge(&s2);
281
282        let mut right = s2.clone();
283        right.merge(&s1);
284
285        let left_elems: BTreeSet<_> = left.iter().collect();
286        let right_elems: BTreeSet<_> = right.iter().collect();
287        assert_eq!(left_elems, right_elems);
288    }
289
290    #[test]
291    fn merge_is_idempotent() {
292        let mut s1 = ORSet::new("a");
293        s1.insert("x");
294
295        let mut s2 = ORSet::new("b");
296        s2.insert("y");
297
298        s1.merge(&s2);
299        let after_first = s1.clone();
300        s1.merge(&s2);
301
302        assert_eq!(s1, after_first);
303    }
304
305    #[test]
306    fn add_wins_semantics() {
307        // Simulate: s1 has "x" and removes it, s2 adds "x" concurrently
308        let mut s1 = ORSet::new("a");
309        s1.insert("x");
310        s1.remove(&"x");
311
312        // Different node adds "x" concurrently (new unique tag)
313        let mut s2 = ORSet::new("b");
314        s2.insert("x");
315
316        s1.merge(&s2);
317        // Add wins: "x" should be present because of s2_new's concurrent add
318        assert!(s1.contains(&"x"));
319    }
320
321    #[test]
322    fn remove_nonexistent_returns_false() {
323        let mut s = ORSet::<&str>::new("a");
324        assert!(!s.remove(&"x"));
325    }
326
327    #[test]
328    fn iterate_elements() {
329        let mut s = ORSet::new("a");
330        s.insert(1);
331        s.insert(2);
332        s.insert(3);
333        s.remove(&2);
334
335        let elems: Vec<&i32> = s.iter().collect();
336        assert_eq!(elems, vec![&1, &3]);
337    }
338
339    #[test]
340    fn delta_apply_equivalent_to_merge() {
341        let mut s1 = ORSet::new("a");
342        s1.insert("x");
343        s1.insert("y");
344        s1.remove(&"x");
345
346        let mut s2 = ORSet::new("b");
347        s2.insert("y");
348        s2.insert("z");
349
350        let mut full = s2.clone();
351        full.merge(&s1);
352
353        let mut via_delta = s2.clone();
354        let d = s1.delta(&s2);
355        via_delta.apply_delta(&d);
356
357        let full_elems: BTreeSet<_> = full.iter().collect();
358        let delta_elems: BTreeSet<_> = via_delta.iter().collect();
359        assert_eq!(full_elems, delta_elems);
360    }
361
362    #[test]
363    fn delta_is_empty_when_equal() {
364        let mut s1 = ORSet::new("a");
365        s1.insert("x");
366
367        let s2 = s1.clone();
368        let d = s1.delta(&s2);
369        assert!(d.additions.is_empty());
370        assert!(d.tombstones.is_empty());
371    }
372
373    #[test]
374    fn delta_carries_tombstones() {
375        let mut s1 = ORSet::new("a");
376        s1.insert("x");
377
378        let s2 = s1.clone();
379        s1.remove(&"x");
380
381        let d = s1.delta(&s2);
382        assert!(!d.tombstones.is_empty());
383
384        let mut via_delta = s2.clone();
385        via_delta.apply_delta(&d);
386        assert!(!via_delta.contains(&"x"));
387    }
388}