Skip to main content

mdcs_core/
orset.rs

1//!  Observed-Remove Set (OR-Set / Add-Wins Set)
2//!
3//! Each add generates a unique tag.  Remove only removes currently observed tags.
4//!  Concurrent add and remove of the same element:  add wins.
5
6use crate::lattice::{DeltaCRDT, Lattice};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use ulid::Ulid;
10
11/// A unique tag for each add operation
12#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13pub struct Tag {
14    /// The replica that created this tag
15    pub replica_id: String,
16    /// Unique identifier for this specific add
17    pub unique_id: Ulid,
18}
19
20impl Tag {
21    pub fn new(replica_id: impl Into<String>) -> Self {
22        Self {
23            replica_id: replica_id.into(),
24            unique_id: Ulid::new(),
25        }
26    }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ORSet<T: Ord + Clone> {
31    /// Maps elements to their active tags
32    entries: BTreeMap<T, BTreeSet<Tag>>,
33    /// Tombstones:  tags that have been removed
34    /// (Required for distributed consistency)
35    tombstones: BTreeSet<Tag>,
36    /// Pending delta for delta-state replication
37    #[serde(skip)]
38    pending_delta: Option<ORSetDelta<T>>,
39}
40
41#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
42pub struct ORSetDelta<T: Ord + Clone> {
43    pub additions: BTreeMap<T, BTreeSet<Tag>>,
44    pub removals: BTreeSet<Tag>,
45}
46
47impl<T: Ord + Clone> ORSet<T> {
48    pub fn new() -> Self {
49        Self {
50            entries: BTreeMap::new(),
51            tombstones: BTreeSet::new(),
52            pending_delta: None,
53        }
54    }
55
56    /// Add an element with a new unique tag
57    pub fn add(&mut self, replica_id: &str, value: T) {
58        let tag = Tag::new(replica_id);
59
60        self.entries
61            .entry(value.clone())
62            .or_default()
63            .insert(tag.clone());
64
65        // Record in delta
66        let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
67            additions: BTreeMap::new(),
68            removals: BTreeSet::new(),
69        });
70        delta.additions.entry(value).or_default().insert(tag);
71    }
72
73    /// Remove all observed instances of an element
74    pub fn remove(&mut self, value: &T) {
75        if let Some(tags) = self.entries.remove(value) {
76            // Move tags to tombstones
77            for tag in tags.iter() {
78                self.tombstones.insert(tag.clone());
79            }
80
81            // Record in delta
82            let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
83                additions: BTreeMap::new(),
84                removals: BTreeSet::new(),
85            });
86            delta.removals.extend(tags);
87        }
88    }
89
90    pub fn contains(&self, value: &T) -> bool {
91        self.entries
92            .get(value)
93            .is_some_and(|tags| !tags.is_empty())
94    }
95
96    pub fn iter(&self) -> impl Iterator<Item = &T> {
97        self.entries.keys()
98    }
99
100    pub fn len(&self) -> usize {
101        self.entries.len()
102    }
103
104    pub fn is_empty(&self) -> bool {
105        self.entries.is_empty()
106    }
107}
108
109impl<T: Ord + Clone> Default for ORSet<T> {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl<T: Ord + Clone> Lattice for ORSet<T> {
116    fn bottom() -> Self {
117        Self::new()
118    }
119
120    fn join(&self, other: &Self) -> Self {
121        let mut result = Self::new();
122
123        // Merge tombstones first
124        result.tombstones = self.tombstones.union(&other.tombstones).cloned().collect();
125
126        // Merge entries, filtering out tombstoned tags
127        let all_keys: BTreeSet<_> = self
128            .entries
129            .keys()
130            .chain(other.entries.keys())
131            .cloned()
132            .collect();
133
134        for key in all_keys {
135            let self_tags = self.entries.get(&key).cloned().unwrap_or_default();
136            let other_tags = other.entries.get(&key).cloned().unwrap_or_default();
137
138            let merged_tags: BTreeSet<Tag> = self_tags
139                .union(&other_tags)
140                .filter(|tag| !result.tombstones.contains(tag))
141                .cloned()
142                .collect();
143
144            if !merged_tags.is_empty() {
145                result.entries.insert(key, merged_tags);
146            }
147        }
148
149        result
150    }
151}
152
153impl<T: Ord + Clone> Lattice for ORSetDelta<T> {
154    fn bottom() -> Self {
155        Self {
156            additions: BTreeMap::new(),
157            removals: BTreeSet::new(),
158        }
159    }
160
161    fn join(&self, other: &Self) -> Self {
162        let mut additions = self.additions.clone();
163        for (k, v) in &other.additions {
164            additions.entry(k.clone()).or_default().extend(v.clone());
165        }
166
167        Self {
168            additions,
169            removals: self.removals.union(&other.removals).cloned().collect(),
170        }
171    }
172}
173
174impl<T: Ord + Clone> DeltaCRDT for ORSet<T> {
175    type Delta = ORSetDelta<T>;
176
177    fn split_delta(&mut self) -> Option<Self::Delta> {
178        self.pending_delta.take()
179    }
180
181    fn apply_delta(&mut self, delta: &Self::Delta) {
182        // Apply removals to tombstones
183        self.tombstones.extend(delta.removals.iter().cloned());
184
185        // Apply additions, filtering tombstones
186        for (value, tags) in &delta.additions {
187            let entry = self.entries.entry(value.clone()).or_default();
188            for tag in tags {
189                if !self.tombstones.contains(tag) {
190                    entry.insert(tag.clone());
191                }
192            }
193        }
194
195        // Clean up empty entries
196        self.entries.retain(|_, tags| !tags.is_empty());
197    }
198}