1use alloc::collections::{BTreeMap, BTreeSet};
2use alloc::string::String;
3
4use crate::{Crdt, DeltaCrdt};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct ORSet<T: Ord + Clone> {
34 actor: String,
35 counter: u64,
36 elements: BTreeMap<T, BTreeSet<(String, u64)>>,
38 tombstones: BTreeSet<(String, u64)>,
40}
41
42impl<T: Ord + Clone> ORSet<T> {
43 pub fn new(actor: impl Into<String>) -> Self {
45 Self {
46 actor: actor.into(),
47 counter: 0,
48 elements: BTreeMap::new(),
49 tombstones: BTreeSet::new(),
50 }
51 }
52
53 pub fn insert(&mut self, value: T) {
58 self.counter += 1;
59 let tag = (self.actor.clone(), self.counter);
60 self.elements.entry(value).or_default().insert(tag);
61 }
62
63 pub fn remove(&mut self, value: &T) -> bool {
70 if let Some(tags) = self.elements.remove(value) {
71 self.tombstones.extend(tags);
72 true
73 } else {
74 false
75 }
76 }
77
78 #[must_use]
80 pub fn contains(&self, value: &T) -> bool {
81 self.elements
82 .get(value)
83 .is_some_and(|tags| !tags.is_empty())
84 }
85
86 #[must_use]
88 pub fn len(&self) -> usize {
89 self.elements
90 .values()
91 .filter(|tags| !tags.is_empty())
92 .count()
93 }
94
95 #[must_use]
97 pub fn is_empty(&self) -> bool {
98 self.len() == 0
99 }
100
101 pub fn iter(&self) -> impl Iterator<Item = &T> {
103 self.elements
104 .iter()
105 .filter(|(_, tags)| !tags.is_empty())
106 .map(|(v, _)| v)
107 }
108
109 #[must_use]
111 pub fn actor(&self) -> &str {
112 &self.actor
113 }
114}
115
116impl<T: Ord + Clone> IntoIterator for ORSet<T> {
117 type Item = T;
118 type IntoIter = alloc::vec::IntoIter<T>;
119
120 fn into_iter(self) -> Self::IntoIter {
121 let items: alloc::vec::Vec<T> = self
122 .elements
123 .into_iter()
124 .filter(|(_, tags)| !tags.is_empty())
125 .map(|(v, _)| v)
126 .collect();
127 items.into_iter()
128 }
129}
130
131impl<T: Ord + Clone> Crdt for ORSet<T> {
132 fn merge(&mut self, other: &Self) {
133 for (value, other_tags) in &other.elements {
135 let self_tags = self.elements.entry(value.clone()).or_default();
136 for tag in other_tags {
137 if !self.tombstones.contains(tag) {
139 self_tags.insert(tag.clone());
140 }
141 }
142 }
143
144 for tag in &other.tombstones {
146 for tags in self.elements.values_mut() {
147 tags.remove(tag);
148 }
149 }
150
151 self.tombstones.extend(other.tombstones.iter().cloned());
153
154 self.elements.retain(|_, tags| !tags.is_empty());
156
157 self.counter = self.counter.max(other.counter);
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165pub struct ORSetDelta<T: Ord + Clone> {
166 additions: BTreeMap<T, BTreeSet<(String, u64)>>,
168 tombstones: BTreeSet<(String, u64)>,
170}
171
172impl<T: Ord + Clone> DeltaCrdt for ORSet<T> {
173 type Delta = ORSetDelta<T>;
174
175 fn delta(&self, other: &Self) -> ORSetDelta<T> {
176 let mut additions = BTreeMap::new();
177 for (value, self_tags) in &self.elements {
178 let other_tags = other.elements.get(value);
179 let new_tags: BTreeSet<_> = self_tags
180 .iter()
181 .filter(|tag| {
182 other_tags.map_or(true, |ot| !ot.contains(*tag))
183 && !other.tombstones.contains(*tag)
184 })
185 .cloned()
186 .collect();
187 if !new_tags.is_empty() {
188 additions.insert(value.clone(), new_tags);
189 }
190 }
191
192 let tombstones: BTreeSet<_> = self
193 .tombstones
194 .difference(&other.tombstones)
195 .cloned()
196 .collect();
197
198 ORSetDelta {
199 additions,
200 tombstones,
201 }
202 }
203
204 fn apply_delta(&mut self, delta: &ORSetDelta<T>) {
205 for (value, tags) in &delta.additions {
207 let self_tags = self.elements.entry(value.clone()).or_default();
208 for tag in tags {
209 if !self.tombstones.contains(tag) {
210 self_tags.insert(tag.clone());
211 }
212 }
213 }
214
215 for tag in &delta.tombstones {
217 for tags in self.elements.values_mut() {
218 tags.remove(tag);
219 }
220 }
221 self.tombstones.extend(delta.tombstones.iter().cloned());
222
223 self.elements.retain(|_, tags| !tags.is_empty());
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn new_set_is_empty() {
234 let s = ORSet::<String>::new("a");
235 assert!(s.is_empty());
236 assert_eq!(s.len(), 0);
237 }
238
239 #[test]
240 fn insert_and_contains() {
241 let mut s = ORSet::new("a");
242 s.insert("x");
243 assert!(s.contains(&"x"));
244 assert_eq!(s.len(), 1);
245 }
246
247 #[test]
248 fn remove_element() {
249 let mut s = ORSet::new("a");
250 s.insert("x");
251 assert!(s.remove(&"x"));
252 assert!(!s.contains(&"x"));
253 assert_eq!(s.len(), 0);
254 }
255
256 #[test]
257 fn can_readd_after_remove() {
258 let mut s = ORSet::new("a");
259 s.insert("x");
260 s.remove(&"x");
261 assert!(!s.contains(&"x"));
262
263 s.insert("x");
264 assert!(s.contains(&"x"));
265 }
266
267 #[test]
268 fn concurrent_add_survives_remove() {
269 let mut s1 = ORSet::new("a");
270 s1.insert("x");
271
272 s1.remove(&"x");
274
275 let mut s2 = ORSet::new("b");
277 s2.insert("x");
278
279 s1.merge(&s2);
280 assert!(s1.contains(&"x"));
282 }
283
284 #[test]
285 fn merge_is_commutative() {
286 let mut s1 = ORSet::new("a");
287 s1.insert("x");
288 s1.insert("y");
289
290 let mut s2 = ORSet::new("b");
291 s2.insert("y");
292 s2.insert("z");
293
294 let mut left = s1.clone();
295 left.merge(&s2);
296
297 let mut right = s2.clone();
298 right.merge(&s1);
299
300 let left_elems: BTreeSet<_> = left.iter().collect();
301 let right_elems: BTreeSet<_> = right.iter().collect();
302 assert_eq!(left_elems, right_elems);
303 }
304
305 #[test]
306 fn merge_is_idempotent() {
307 let mut s1 = ORSet::new("a");
308 s1.insert("x");
309
310 let mut s2 = ORSet::new("b");
311 s2.insert("y");
312
313 s1.merge(&s2);
314 let after_first = s1.clone();
315 s1.merge(&s2);
316
317 assert_eq!(s1, after_first);
318 }
319
320 #[test]
321 fn add_wins_semantics() {
322 let mut s1 = ORSet::new("a");
324 s1.insert("x");
325 s1.remove(&"x");
326
327 let mut s2 = ORSet::new("b");
329 s2.insert("x");
330
331 s1.merge(&s2);
332 assert!(s1.contains(&"x"));
334 }
335
336 #[test]
337 fn remove_nonexistent_returns_false() {
338 let mut s = ORSet::<&str>::new("a");
339 assert!(!s.remove(&"x"));
340 }
341
342 #[test]
343 fn iterate_elements() {
344 let mut s = ORSet::new("a");
345 s.insert(1);
346 s.insert(2);
347 s.insert(3);
348 s.remove(&2);
349
350 let elems: Vec<&i32> = s.iter().collect();
351 assert_eq!(elems, vec![&1, &3]);
352 }
353
354 #[test]
355 fn delta_apply_equivalent_to_merge() {
356 let mut s1 = ORSet::new("a");
357 s1.insert("x");
358 s1.insert("y");
359 s1.remove(&"x");
360
361 let mut s2 = ORSet::new("b");
362 s2.insert("y");
363 s2.insert("z");
364
365 let mut full = s2.clone();
366 full.merge(&s1);
367
368 let mut via_delta = s2.clone();
369 let d = s1.delta(&s2);
370 via_delta.apply_delta(&d);
371
372 let full_elems: BTreeSet<_> = full.iter().collect();
373 let delta_elems: BTreeSet<_> = via_delta.iter().collect();
374 assert_eq!(full_elems, delta_elems);
375 }
376
377 #[test]
378 fn delta_is_empty_when_equal() {
379 let mut s1 = ORSet::new("a");
380 s1.insert("x");
381
382 let s2 = s1.clone();
383 let d = s1.delta(&s2);
384 assert!(d.additions.is_empty());
385 assert!(d.tombstones.is_empty());
386 }
387
388 #[test]
389 fn delta_carries_tombstones() {
390 let mut s1 = ORSet::new("a");
391 s1.insert("x");
392
393 let s2 = s1.clone();
394 s1.remove(&"x");
395
396 let d = s1.delta(&s2);
397 assert!(!d.tombstones.is_empty());
398
399 let mut via_delta = s2.clone();
400 via_delta.apply_delta(&d);
401 assert!(!via_delta.contains(&"x"));
402 }
403}