Skip to main content

oxigdal_sync/crdt/
or_set.rs

1//! Observed-Remove Set CRDT
2//!
3//! A set that supports add and remove operations with proper
4//! causal consistency.
5
6use crate::crdt::{Crdt, DeviceAware};
7use crate::{DeviceId, SyncResult};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10use std::hash::Hash;
11use uuid::Uuid;
12
13/// Unique identifier for set elements
14pub type ElementId = Uuid;
15
16/// Element metadata
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
18struct ElementMetadata {
19    /// Unique ID for this add operation
20    id: ElementId,
21    /// Device that added the element
22    device_id: DeviceId,
23}
24
25/// Observed-Remove Set
26///
27/// A CRDT set that supports add and remove operations.
28/// Each add operation gets a unique ID, and removes only affect
29/// observed add operations. This ensures that concurrent add/remove
30/// operations are handled correctly.
31///
32/// # Example
33///
34/// ```rust
35/// use oxigdal_sync::crdt::{OrSet, Crdt};
36///
37/// let mut set1 = OrSet::new("device-1".to_string());
38/// set1.insert("apple".to_string());
39/// set1.insert("banana".to_string());
40///
41/// let mut set2 = OrSet::new("device-2".to_string());
42/// set2.insert("cherry".to_string());
43///
44/// set1.merge(&set2).ok();
45/// assert_eq!(set1.len(), 3);
46/// ```
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(bound(serialize = "T: Serialize"))]
49#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
50pub struct OrSet<T>
51where
52    T: Clone + Eq + Hash,
53{
54    /// Map from element to its add operation IDs
55    elements: HashMap<T, HashSet<ElementMetadata>>,
56    /// Tombstones (removed element IDs)
57    tombstones: HashSet<ElementId>,
58    /// Device ID
59    device_id: DeviceId,
60}
61
62impl<T> OrSet<T>
63where
64    T: Clone + Eq + Hash,
65{
66    /// Creates a new OR-Set
67    ///
68    /// # Arguments
69    ///
70    /// * `device_id` - The device ID
71    pub fn new(device_id: DeviceId) -> Self {
72        Self {
73            elements: HashMap::new(),
74            tombstones: HashSet::new(),
75            device_id,
76        }
77    }
78
79    /// Inserts an element into the set
80    ///
81    /// # Arguments
82    ///
83    /// * `element` - The element to insert
84    ///
85    /// # Returns
86    ///
87    /// True if the element was newly inserted, false if already present
88    pub fn insert(&mut self, element: T) -> bool {
89        let metadata = ElementMetadata {
90            id: Uuid::new_v4(),
91            device_id: self.device_id.clone(),
92        };
93
94        let entry = self.elements.entry(element).or_default();
95        entry.insert(metadata)
96    }
97
98    /// Removes an element from the set
99    ///
100    /// # Arguments
101    ///
102    /// * `element` - The element to remove
103    ///
104    /// # Returns
105    ///
106    /// True if the element was present and removed
107    pub fn remove(&mut self, element: &T) -> bool {
108        if let Some(metadata_set) = self.elements.get(element) {
109            // Add all observed IDs to tombstones
110            for metadata in metadata_set {
111                self.tombstones.insert(metadata.id);
112            }
113
114            // Remove the element
115            self.elements.remove(element);
116            true
117        } else {
118            false
119        }
120    }
121
122    /// Checks if the set contains an element
123    ///
124    /// # Arguments
125    ///
126    /// * `element` - The element to check
127    ///
128    /// # Returns
129    ///
130    /// True if the element is in the set
131    pub fn contains(&self, element: &T) -> bool {
132        if let Some(metadata_set) = self.elements.get(element) {
133            // Element is present if it has at least one non-tombstoned ID
134            metadata_set
135                .iter()
136                .any(|m| !self.tombstones.contains(&m.id))
137        } else {
138            false
139        }
140    }
141
142    /// Gets the number of elements in the set
143    pub fn len(&self) -> usize {
144        self.elements
145            .iter()
146            .filter(|(_, metadata_set)| {
147                metadata_set
148                    .iter()
149                    .any(|m| !self.tombstones.contains(&m.id))
150            })
151            .count()
152    }
153
154    /// Checks if the set is empty
155    pub fn is_empty(&self) -> bool {
156        self.len() == 0
157    }
158
159    /// Gets an iterator over the elements
160    pub fn iter(&self) -> impl Iterator<Item = &T> {
161        self.elements
162            .iter()
163            .filter(|(_, metadata_set)| {
164                metadata_set
165                    .iter()
166                    .any(|m| !self.tombstones.contains(&m.id))
167            })
168            .map(|(element, _)| element)
169    }
170
171    /// Converts the set to a HashSet
172    pub fn to_hashset(&self) -> HashSet<T> {
173        self.iter().cloned().collect()
174    }
175
176    /// Clears all elements from the set
177    pub fn clear(&mut self) {
178        for metadata_set in self.elements.values() {
179            for metadata in metadata_set {
180                self.tombstones.insert(metadata.id);
181            }
182        }
183        self.elements.clear();
184    }
185}
186
187impl<T> Crdt for OrSet<T>
188where
189    T: Clone + Eq + Hash + Serialize + for<'de> serde::Deserialize<'de>,
190{
191    fn merge(&mut self, other: &Self) -> SyncResult<()> {
192        // Merge elements
193        for (element, metadata_set) in &other.elements {
194            let entry = self.elements.entry(element.clone()).or_default();
195            for metadata in metadata_set {
196                entry.insert(metadata.clone());
197            }
198        }
199
200        // Merge tombstones
201        for tombstone in &other.tombstones {
202            self.tombstones.insert(*tombstone);
203        }
204
205        // Clean up elements that are fully tombstoned
206        self.elements.retain(|_, metadata_set| {
207            metadata_set
208                .iter()
209                .any(|m| !self.tombstones.contains(&m.id))
210        });
211
212        Ok(())
213    }
214
215    fn dominated_by(&self, other: &Self) -> bool {
216        // Check if all our elements are in the other set
217        for (element, metadata_set) in &self.elements {
218            if let Some(other_metadata_set) = other.elements.get(element) {
219                for metadata in metadata_set {
220                    if !self.tombstones.contains(&metadata.id) {
221                        // This element is live in our set
222                        if !other_metadata_set.contains(metadata)
223                            || other.tombstones.contains(&metadata.id)
224                        {
225                            // Not in other set or tombstoned in other
226                            return false;
227                        }
228                    }
229                }
230            } else {
231                // Element not in other set at all
232                if metadata_set
233                    .iter()
234                    .any(|m| !self.tombstones.contains(&m.id))
235                {
236                    return false;
237                }
238            }
239        }
240
241        // Check if all our tombstones are in the other set
242        for tombstone in &self.tombstones {
243            if !other.tombstones.contains(tombstone) {
244                return false;
245            }
246        }
247
248        true
249    }
250}
251
252impl<T> DeviceAware for OrSet<T>
253where
254    T: Clone + Eq + Hash + Serialize + for<'de> serde::Deserialize<'de>,
255{
256    fn device_id(&self) -> &DeviceId {
257        &self.device_id
258    }
259
260    fn set_device_id(&mut self, device_id: DeviceId) {
261        self.device_id = device_id;
262    }
263}
264
265impl<T> PartialEq for OrSet<T>
266where
267    T: Clone + Eq + Hash,
268{
269    fn eq(&self, other: &Self) -> bool {
270        self.to_hashset() == other.to_hashset()
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_or_set_creation() {
280        let set: OrSet<String> = OrSet::new("device-1".to_string());
281        assert_eq!(set.len(), 0);
282        assert!(set.is_empty());
283    }
284
285    #[test]
286    fn test_or_set_insert() {
287        let mut set = OrSet::new("device-1".to_string());
288        assert!(set.insert("apple".to_string()));
289        assert!(set.contains(&"apple".to_string()));
290        assert_eq!(set.len(), 1);
291    }
292
293    #[test]
294    fn test_or_set_remove() {
295        let mut set = OrSet::new("device-1".to_string());
296        set.insert("apple".to_string());
297        assert!(set.remove(&"apple".to_string()));
298        assert!(!set.contains(&"apple".to_string()));
299        assert_eq!(set.len(), 0);
300    }
301
302    #[test]
303    fn test_or_set_multiple_elements() {
304        let mut set = OrSet::new("device-1".to_string());
305        set.insert("apple".to_string());
306        set.insert("banana".to_string());
307        set.insert("cherry".to_string());
308
309        assert_eq!(set.len(), 3);
310        assert!(set.contains(&"apple".to_string()));
311        assert!(set.contains(&"banana".to_string()));
312        assert!(set.contains(&"cherry".to_string()));
313    }
314
315    #[test]
316    fn test_or_set_merge() {
317        let mut set1 = OrSet::new("device-1".to_string());
318        let mut set2 = OrSet::new("device-2".to_string());
319
320        set1.insert("apple".to_string());
321        set1.insert("banana".to_string());
322
323        set2.insert("cherry".to_string());
324        set2.insert("date".to_string());
325
326        set1.merge(&set2).ok();
327
328        assert_eq!(set1.len(), 4);
329        assert!(set1.contains(&"apple".to_string()));
330        assert!(set1.contains(&"banana".to_string()));
331        assert!(set1.contains(&"cherry".to_string()));
332        assert!(set1.contains(&"date".to_string()));
333    }
334
335    #[test]
336    fn test_or_set_concurrent_add_remove() {
337        let mut set1 = OrSet::new("device-1".to_string());
338        let mut set2 = OrSet::new("device-2".to_string());
339
340        // Both add the same element
341        set1.insert("apple".to_string());
342        set2.insert("apple".to_string());
343
344        // set1 removes it
345        set1.remove(&"apple".to_string());
346
347        // Merge - concurrent add should win
348        set1.merge(&set2).ok();
349
350        // The element from device-2 should still be present
351        assert!(set1.contains(&"apple".to_string()));
352    }
353
354    #[test]
355    fn test_or_set_clear() {
356        let mut set = OrSet::new("device-1".to_string());
357        set.insert("apple".to_string());
358        set.insert("banana".to_string());
359
360        set.clear();
361
362        assert_eq!(set.len(), 0);
363        assert!(set.is_empty());
364    }
365
366    #[test]
367    fn test_or_set_iter() {
368        let mut set = OrSet::new("device-1".to_string());
369        set.insert("apple".to_string());
370        set.insert("banana".to_string());
371        set.insert("cherry".to_string());
372
373        let elements: HashSet<_> = set.iter().cloned().collect();
374        assert_eq!(elements.len(), 3);
375        assert!(elements.contains("apple"));
376        assert!(elements.contains("banana"));
377        assert!(elements.contains("cherry"));
378    }
379}