1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::string::String;
3
4use crate::{Crdt, DeltaCrdt};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct ORSet<T: Ord + Clone> {
34 actor: String,
35 counter: u64,
36 elements: BTreeMap<T, BTreeSet<(String, u64)>>,
38 tombstones: BTreeSet<(String, u64)>,
40}
41
42impl<T: Ord + Clone> ORSet<T> {
43 pub fn new(actor: impl Into<String>) -> Self {
45 Self {
46 actor: actor.into(),
47 counter: 0,
48 elements: BTreeMap::new(),
49 tombstones: BTreeSet::new(),
50 }
51 }
52
53 pub fn insert(&mut self, value: T) {
58 self.counter += 1;
59 let tag = (self.actor.clone(), self.counter);
60 self.elements.entry(value).or_default().insert(tag);
61 }
62
63 pub fn remove(&mut self, value: &T) -> bool {
70 if let Some(tags) = self.elements.remove(value) {
71 self.tombstones.extend(tags);
72 true
73 } else {
74 false
75 }
76 }
77
78 #[must_use]
80 pub fn contains(&self, value: &T) -> bool {
81 self.elements
82 .get(value)
83 .is_some_and(|tags| !tags.is_empty())
84 }
85
86 #[must_use]
88 pub fn len(&self) -> usize {
89 self.elements
90 .values()
91 .filter(|tags| !tags.is_empty())
92 .count()
93 }
94
95 #[must_use]
97 pub fn is_empty(&self) -> bool {
98 self.len() == 0
99 }
100
101 pub fn iter(&self) -> impl Iterator<Item = &T> {
103 self.elements
104 .iter()
105 .filter(|(_, tags)| !tags.is_empty())
106 .map(|(v, _)| v)
107 }
108
109 #[must_use]
111 pub fn actor(&self) -> &str {
112 &self.actor
113 }
114}
115
116impl<T: Ord + Clone> Crdt for ORSet<T> {
117 fn merge(&mut self, other: &Self) {
118 for (value, other_tags) in &other.elements {
120 let self_tags = self.elements.entry(value.clone()).or_default();
121 for tag in other_tags {
122 if !self.tombstones.contains(tag) {
124 self_tags.insert(tag.clone());
125 }
126 }
127 }
128
129 for tag in &other.tombstones {
131 for tags in self.elements.values_mut() {
132 tags.remove(tag);
133 }
134 }
135
136 self.tombstones.extend(other.tombstones.iter().cloned());
138
139 self.elements.retain(|_, tags| !tags.is_empty());
141
142 self.counter = self.counter.max(other.counter);
144 }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
149#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
150pub struct ORSetDelta<T: Ord + Clone> {
151 additions: BTreeMap<T, BTreeSet<(String, u64)>>,
153 tombstones: BTreeSet<(String, u64)>,
155}
156
157impl<T: Ord + Clone> DeltaCrdt for ORSet<T> {
158 type Delta = ORSetDelta<T>;
159
160 fn delta(&self, other: &Self) -> ORSetDelta<T> {
161 let mut additions = BTreeMap::new();
162 for (value, self_tags) in &self.elements {
163 let other_tags = other.elements.get(value);
164 let new_tags: BTreeSet<_> = self_tags
165 .iter()
166 .filter(|tag| {
167 other_tags.map_or(true, |ot| !ot.contains(*tag))
168 && !other.tombstones.contains(*tag)
169 })
170 .cloned()
171 .collect();
172 if !new_tags.is_empty() {
173 additions.insert(value.clone(), new_tags);
174 }
175 }
176
177 let tombstones: BTreeSet<_> = self
178 .tombstones
179 .difference(&other.tombstones)
180 .cloned()
181 .collect();
182
183 ORSetDelta {
184 additions,
185 tombstones,
186 }
187 }
188
189 fn apply_delta(&mut self, delta: &ORSetDelta<T>) {
190 for (value, tags) in &delta.additions {
192 let self_tags = self.elements.entry(value.clone()).or_default();
193 for tag in tags {
194 if !self.tombstones.contains(tag) {
195 self_tags.insert(tag.clone());
196 }
197 }
198 }
199
200 for tag in &delta.tombstones {
202 for tags in self.elements.values_mut() {
203 tags.remove(tag);
204 }
205 }
206 self.tombstones.extend(delta.tombstones.iter().cloned());
207
208 self.elements.retain(|_, tags| !tags.is_empty());
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn new_set_is_empty() {
219 let s = ORSet::<String>::new("a");
220 assert!(s.is_empty());
221 assert_eq!(s.len(), 0);
222 }
223
224 #[test]
225 fn insert_and_contains() {
226 let mut s = ORSet::new("a");
227 s.insert("x");
228 assert!(s.contains(&"x"));
229 assert_eq!(s.len(), 1);
230 }
231
232 #[test]
233 fn remove_element() {
234 let mut s = ORSet::new("a");
235 s.insert("x");
236 assert!(s.remove(&"x"));
237 assert!(!s.contains(&"x"));
238 assert_eq!(s.len(), 0);
239 }
240
241 #[test]
242 fn can_readd_after_remove() {
243 let mut s = ORSet::new("a");
244 s.insert("x");
245 s.remove(&"x");
246 assert!(!s.contains(&"x"));
247
248 s.insert("x");
249 assert!(s.contains(&"x"));
250 }
251
252 #[test]
253 fn concurrent_add_survives_remove() {
254 let mut s1 = ORSet::new("a");
255 s1.insert("x");
256
257 s1.remove(&"x");
259
260 let mut s2 = ORSet::new("b");
262 s2.insert("x");
263
264 s1.merge(&s2);
265 assert!(s1.contains(&"x"));
267 }
268
269 #[test]
270 fn merge_is_commutative() {
271 let mut s1 = ORSet::new("a");
272 s1.insert("x");
273 s1.insert("y");
274
275 let mut s2 = ORSet::new("b");
276 s2.insert("y");
277 s2.insert("z");
278
279 let mut left = s1.clone();
280 left.merge(&s2);
281
282 let mut right = s2.clone();
283 right.merge(&s1);
284
285 let left_elems: BTreeSet<_> = left.iter().collect();
286 let right_elems: BTreeSet<_> = right.iter().collect();
287 assert_eq!(left_elems, right_elems);
288 }
289
290 #[test]
291 fn merge_is_idempotent() {
292 let mut s1 = ORSet::new("a");
293 s1.insert("x");
294
295 let mut s2 = ORSet::new("b");
296 s2.insert("y");
297
298 s1.merge(&s2);
299 let after_first = s1.clone();
300 s1.merge(&s2);
301
302 assert_eq!(s1, after_first);
303 }
304
305 #[test]
306 fn add_wins_semantics() {
307 let mut s1 = ORSet::new("a");
309 s1.insert("x");
310 s1.remove(&"x");
311
312 let mut s2 = ORSet::new("b");
314 s2.insert("x");
315
316 s1.merge(&s2);
317 assert!(s1.contains(&"x"));
319 }
320
321 #[test]
322 fn remove_nonexistent_returns_false() {
323 let mut s = ORSet::<&str>::new("a");
324 assert!(!s.remove(&"x"));
325 }
326
327 #[test]
328 fn iterate_elements() {
329 let mut s = ORSet::new("a");
330 s.insert(1);
331 s.insert(2);
332 s.insert(3);
333 s.remove(&2);
334
335 let elems: Vec<&i32> = s.iter().collect();
336 assert_eq!(elems, vec![&1, &3]);
337 }
338
339 #[test]
340 fn delta_apply_equivalent_to_merge() {
341 let mut s1 = ORSet::new("a");
342 s1.insert("x");
343 s1.insert("y");
344 s1.remove(&"x");
345
346 let mut s2 = ORSet::new("b");
347 s2.insert("y");
348 s2.insert("z");
349
350 let mut full = s2.clone();
351 full.merge(&s1);
352
353 let mut via_delta = s2.clone();
354 let d = s1.delta(&s2);
355 via_delta.apply_delta(&d);
356
357 let full_elems: BTreeSet<_> = full.iter().collect();
358 let delta_elems: BTreeSet<_> = via_delta.iter().collect();
359 assert_eq!(full_elems, delta_elems);
360 }
361
362 #[test]
363 fn delta_is_empty_when_equal() {
364 let mut s1 = ORSet::new("a");
365 s1.insert("x");
366
367 let s2 = s1.clone();
368 let d = s1.delta(&s2);
369 assert!(d.additions.is_empty());
370 assert!(d.tombstones.is_empty());
371 }
372
373 #[test]
374 fn delta_carries_tombstones() {
375 let mut s1 = ORSet::new("a");
376 s1.insert("x");
377
378 let s2 = s1.clone();
379 s1.remove(&"x");
380
381 let d = s1.delta(&s2);
382 assert!(!d.tombstones.is_empty());
383
384 let mut via_delta = s2.clone();
385 via_delta.apply_delta(&d);
386 assert!(!via_delta.contains(&"x"));
387 }
388}