1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::vec::Vec;
3
4use crate::{Crdt, DeltaCrdt, NodeId};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct ORSet<T: Ord + Clone> {
34 actor: NodeId,
35 counter: u64,
36 elements: BTreeMap<T, BTreeSet<(NodeId, u64)>>,
38 tombstones: BTreeSet<(NodeId, u64)>,
40}
41
42impl<T: Ord + Clone> ORSet<T> {
43 pub fn new(actor: NodeId) -> Self {
45 Self {
46 actor,
47 counter: 0,
48 elements: BTreeMap::new(),
49 tombstones: BTreeSet::new(),
50 }
51 }
52
53 pub fn insert(&mut self, value: T) {
58 self.counter += 1;
59 let tag = (self.actor, self.counter);
60 self.elements.entry(value).or_default().insert(tag);
61 }
62
63 pub fn remove(&mut self, value: &T) -> bool {
70 if let Some(tags) = self.elements.remove(value) {
71 self.tombstones.extend(tags);
72 true
73 } else {
74 false
75 }
76 }
77
78 #[must_use]
80 pub fn contains(&self, value: &T) -> bool {
81 self.elements
82 .get(value)
83 .is_some_and(|tags| !tags.is_empty())
84 }
85
86 #[must_use]
88 pub fn len(&self) -> usize {
89 self.elements
90 .values()
91 .filter(|tags| !tags.is_empty())
92 .count()
93 }
94
95 #[must_use]
97 pub fn is_empty(&self) -> bool {
98 self.len() == 0
99 }
100
101 pub fn iter(&self) -> impl Iterator<Item = &T> {
103 self.elements
104 .iter()
105 .filter(|(_, tags)| !tags.is_empty())
106 .map(|(v, _)| v)
107 }
108
109 #[must_use]
111 pub fn actor(&self) -> NodeId {
112 self.actor
113 }
114
115 #[must_use]
117 pub fn tombstone_count(&self) -> usize {
118 self.tombstones.len()
119 }
120
121 pub fn compact_tombstones(&mut self) -> usize {
132 let live_tags: BTreeSet<&(NodeId, u64)> = self
133 .elements
134 .values()
135 .flat_map(|tags| tags.iter())
136 .collect();
137
138 let before = self.tombstones.len();
139 self.tombstones.retain(|tag| live_tags.contains(tag));
143 before - self.tombstones.len()
144 }
145
146 pub fn compact_tombstones_all(&mut self) -> usize {
152 let count = self.tombstones.len();
153 self.tombstones.clear();
154 count
155 }
156}
157
158impl<T: Ord + Clone> IntoIterator for ORSet<T> {
159 type Item = T;
160 type IntoIter = alloc::vec::IntoIter<T>;
161
162 fn into_iter(self) -> Self::IntoIter {
163 let items: Vec<T> = self
164 .elements
165 .into_iter()
166 .filter(|(_, tags)| !tags.is_empty())
167 .map(|(v, _)| v)
168 .collect();
169 items.into_iter()
170 }
171}
172
173impl<T: Ord + Clone> Crdt for ORSet<T> {
174 fn merge(&mut self, other: &Self) {
175 for (value, other_tags) in &other.elements {
176 let self_tags = self.elements.entry(value.clone()).or_default();
177 for &tag in other_tags {
178 if !self.tombstones.contains(&tag) {
179 self_tags.insert(tag);
180 }
181 }
182 }
183
184 for &tag in &other.tombstones {
185 for tags in self.elements.values_mut() {
186 tags.remove(&tag);
187 }
188 }
189
190 self.tombstones.extend(&other.tombstones);
191
192 self.elements.retain(|_, tags| !tags.is_empty());
193
194 self.counter = self.counter.max(other.counter);
195 }
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
200#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
201pub struct ORSetDelta<T: Ord + Clone> {
202 additions: BTreeMap<T, BTreeSet<(NodeId, u64)>>,
204 tombstones: BTreeSet<(NodeId, u64)>,
206}
207
208impl<T: Ord + Clone> DeltaCrdt for ORSet<T> {
209 type Delta = ORSetDelta<T>;
210
211 fn delta(&self, other: &Self) -> ORSetDelta<T> {
212 let mut additions = BTreeMap::new();
213 for (value, self_tags) in &self.elements {
214 let other_tags = other.elements.get(value);
215 let new_tags: BTreeSet<_> = self_tags
216 .iter()
217 .filter(|tag| {
218 other_tags.map_or(true, |ot| !ot.contains(*tag))
219 && !other.tombstones.contains(*tag)
220 })
221 .copied()
222 .collect();
223 if !new_tags.is_empty() {
224 additions.insert(value.clone(), new_tags);
225 }
226 }
227
228 let tombstones: BTreeSet<_> = self
229 .tombstones
230 .difference(&other.tombstones)
231 .copied()
232 .collect();
233
234 ORSetDelta {
235 additions,
236 tombstones,
237 }
238 }
239
240 fn apply_delta(&mut self, delta: &ORSetDelta<T>) {
241 for (value, tags) in &delta.additions {
242 let self_tags = self.elements.entry(value.clone()).or_default();
243 for &tag in tags {
244 if !self.tombstones.contains(&tag) {
245 self_tags.insert(tag);
246 }
247 }
248 }
249
250 for &tag in &delta.tombstones {
251 for tags in self.elements.values_mut() {
252 tags.remove(&tag);
253 }
254 }
255 self.tombstones.extend(&delta.tombstones);
256
257 self.elements.retain(|_, tags| !tags.is_empty());
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn new_set_is_empty() {
267 let s = ORSet::<String>::new(1);
268 assert!(s.is_empty());
269 assert_eq!(s.len(), 0);
270 }
271
272 #[test]
273 fn insert_and_contains() {
274 let mut s = ORSet::new(1);
275 s.insert("x");
276 assert!(s.contains(&"x"));
277 assert_eq!(s.len(), 1);
278 }
279
280 #[test]
281 fn remove_element() {
282 let mut s = ORSet::new(1);
283 s.insert("x");
284 assert!(s.remove(&"x"));
285 assert!(!s.contains(&"x"));
286 assert_eq!(s.len(), 0);
287 }
288
289 #[test]
290 fn can_readd_after_remove() {
291 let mut s = ORSet::new(1);
292 s.insert("x");
293 s.remove(&"x");
294 assert!(!s.contains(&"x"));
295
296 s.insert("x");
297 assert!(s.contains(&"x"));
298 }
299
300 #[test]
301 fn concurrent_add_survives_remove() {
302 let mut s1 = ORSet::new(1);
303 s1.insert("x");
304 s1.remove(&"x");
305
306 let mut s2 = ORSet::new(2);
307 s2.insert("x");
308
309 s1.merge(&s2);
310 assert!(s1.contains(&"x"));
311 }
312
313 #[test]
314 fn merge_is_commutative() {
315 let mut s1 = ORSet::new(1);
316 s1.insert("x");
317 s1.insert("y");
318
319 let mut s2 = ORSet::new(2);
320 s2.insert("y");
321 s2.insert("z");
322
323 let mut left = s1.clone();
324 left.merge(&s2);
325
326 let mut right = s2.clone();
327 right.merge(&s1);
328
329 let left_elems: BTreeSet<_> = left.iter().collect();
330 let right_elems: BTreeSet<_> = right.iter().collect();
331 assert_eq!(left_elems, right_elems);
332 }
333
334 #[test]
335 fn merge_is_idempotent() {
336 let mut s1 = ORSet::new(1);
337 s1.insert("x");
338
339 let mut s2 = ORSet::new(2);
340 s2.insert("y");
341
342 s1.merge(&s2);
343 let after_first = s1.clone();
344 s1.merge(&s2);
345
346 assert_eq!(s1, after_first);
347 }
348
349 #[test]
350 fn add_wins_semantics() {
351 let mut s1 = ORSet::new(1);
352 s1.insert("x");
353 s1.remove(&"x");
354
355 let mut s2 = ORSet::new(2);
356 s2.insert("x");
357
358 s1.merge(&s2);
359 assert!(s1.contains(&"x"));
360 }
361
362 #[test]
363 fn remove_nonexistent_returns_false() {
364 let mut s = ORSet::<&str>::new(1);
365 assert!(!s.remove(&"x"));
366 }
367
368 #[test]
369 fn iterate_elements() {
370 let mut s = ORSet::new(1);
371 s.insert(1);
372 s.insert(2);
373 s.insert(3);
374 s.remove(&2);
375
376 let elems: Vec<&i32> = s.iter().collect();
377 assert_eq!(elems, vec![&1, &3]);
378 }
379
380 #[test]
381 fn delta_apply_equivalent_to_merge() {
382 let mut s1 = ORSet::new(1);
383 s1.insert("x");
384 s1.insert("y");
385 s1.remove(&"x");
386
387 let mut s2 = ORSet::new(2);
388 s2.insert("y");
389 s2.insert("z");
390
391 let mut full = s2.clone();
392 full.merge(&s1);
393
394 let mut via_delta = s2.clone();
395 let d = s1.delta(&s2);
396 via_delta.apply_delta(&d);
397
398 let full_elems: BTreeSet<_> = full.iter().collect();
399 let delta_elems: BTreeSet<_> = via_delta.iter().collect();
400 assert_eq!(full_elems, delta_elems);
401 }
402
403 #[test]
404 fn delta_is_empty_when_equal() {
405 let mut s1 = ORSet::new(1);
406 s1.insert("x");
407
408 let s2 = s1.clone();
409 let d = s1.delta(&s2);
410 assert!(d.additions.is_empty());
411 assert!(d.tombstones.is_empty());
412 }
413
414 #[test]
415 fn tombstone_count_tracks_removals() {
416 let mut s = ORSet::new(1);
417 s.insert("x");
418 s.insert("y");
419 assert_eq!(s.tombstone_count(), 0);
420
421 s.remove(&"x");
422 assert_eq!(s.tombstone_count(), 1);
423
424 s.remove(&"y");
425 assert_eq!(s.tombstone_count(), 2);
426 }
427
428 #[test]
429 fn compact_tombstones_removes_dangling() {
430 let mut s = ORSet::new(1);
431 s.insert("x");
432 s.insert("y");
433 s.remove(&"x");
434 s.remove(&"y");
435
436 assert_eq!(s.tombstone_count(), 2);
437 let removed = s.compact_tombstones();
438 assert_eq!(removed, 2);
439 assert_eq!(s.tombstone_count(), 0);
440 }
441
442 #[test]
443 fn compact_tombstones_all_clears_everything() {
444 let mut s = ORSet::new(1);
445 s.insert("x");
446 s.remove(&"x");
447 s.insert("y");
448 s.remove(&"y");
449
450 assert_eq!(s.compact_tombstones_all(), 2);
451 assert_eq!(s.tombstone_count(), 0);
452 }
453
454 #[test]
455 fn delta_carries_tombstones() {
456 let mut s1 = ORSet::new(1);
457 s1.insert("x");
458
459 let s2 = s1.clone();
460 s1.remove(&"x");
461
462 let d = s1.delta(&s2);
463 assert!(!d.tombstones.is_empty());
464
465 let mut via_delta = s2.clone();
466 via_delta.apply_delta(&d);
467 assert!(!via_delta.contains(&"x"));
468 }
469}