oxigdal_sync/crdt/
or_set.rs1use crate::crdt::{Crdt, DeviceAware};
7use crate::{DeviceId, SyncResult};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10use std::hash::Hash;
11use uuid::Uuid;
12
13pub type ElementId = Uuid;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
18struct ElementMetadata {
19 id: ElementId,
21 device_id: DeviceId,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(bound(serialize = "T: Serialize"))]
49#[serde(bound(deserialize = "T: serde::de::DeserializeOwned"))]
50pub struct OrSet<T>
51where
52 T: Clone + Eq + Hash,
53{
54 elements: HashMap<T, HashSet<ElementMetadata>>,
56 tombstones: HashSet<ElementId>,
58 device_id: DeviceId,
60}
61
62impl<T> OrSet<T>
63where
64 T: Clone + Eq + Hash,
65{
66 pub fn new(device_id: DeviceId) -> Self {
72 Self {
73 elements: HashMap::new(),
74 tombstones: HashSet::new(),
75 device_id,
76 }
77 }
78
79 pub fn insert(&mut self, element: T) -> bool {
89 let metadata = ElementMetadata {
90 id: Uuid::new_v4(),
91 device_id: self.device_id.clone(),
92 };
93
94 let entry = self.elements.entry(element).or_default();
95 entry.insert(metadata)
96 }
97
98 pub fn remove(&mut self, element: &T) -> bool {
108 if let Some(metadata_set) = self.elements.get(element) {
109 for metadata in metadata_set {
111 self.tombstones.insert(metadata.id);
112 }
113
114 self.elements.remove(element);
116 true
117 } else {
118 false
119 }
120 }
121
122 pub fn contains(&self, element: &T) -> bool {
132 if let Some(metadata_set) = self.elements.get(element) {
133 metadata_set
135 .iter()
136 .any(|m| !self.tombstones.contains(&m.id))
137 } else {
138 false
139 }
140 }
141
142 pub fn len(&self) -> usize {
144 self.elements
145 .iter()
146 .filter(|(_, metadata_set)| {
147 metadata_set
148 .iter()
149 .any(|m| !self.tombstones.contains(&m.id))
150 })
151 .count()
152 }
153
154 pub fn is_empty(&self) -> bool {
156 self.len() == 0
157 }
158
159 pub fn iter(&self) -> impl Iterator<Item = &T> {
161 self.elements
162 .iter()
163 .filter(|(_, metadata_set)| {
164 metadata_set
165 .iter()
166 .any(|m| !self.tombstones.contains(&m.id))
167 })
168 .map(|(element, _)| element)
169 }
170
171 pub fn to_hashset(&self) -> HashSet<T> {
173 self.iter().cloned().collect()
174 }
175
176 pub fn clear(&mut self) {
178 for metadata_set in self.elements.values() {
179 for metadata in metadata_set {
180 self.tombstones.insert(metadata.id);
181 }
182 }
183 self.elements.clear();
184 }
185}
186
187impl<T> Crdt for OrSet<T>
188where
189 T: Clone + Eq + Hash + Serialize + for<'de> serde::Deserialize<'de>,
190{
191 fn merge(&mut self, other: &Self) -> SyncResult<()> {
192 for (element, metadata_set) in &other.elements {
194 let entry = self.elements.entry(element.clone()).or_default();
195 for metadata in metadata_set {
196 entry.insert(metadata.clone());
197 }
198 }
199
200 for tombstone in &other.tombstones {
202 self.tombstones.insert(*tombstone);
203 }
204
205 self.elements.retain(|_, metadata_set| {
207 metadata_set
208 .iter()
209 .any(|m| !self.tombstones.contains(&m.id))
210 });
211
212 Ok(())
213 }
214
215 fn dominated_by(&self, other: &Self) -> bool {
216 for (element, metadata_set) in &self.elements {
218 if let Some(other_metadata_set) = other.elements.get(element) {
219 for metadata in metadata_set {
220 if !self.tombstones.contains(&metadata.id) {
221 if !other_metadata_set.contains(metadata)
223 || other.tombstones.contains(&metadata.id)
224 {
225 return false;
227 }
228 }
229 }
230 } else {
231 if metadata_set
233 .iter()
234 .any(|m| !self.tombstones.contains(&m.id))
235 {
236 return false;
237 }
238 }
239 }
240
241 for tombstone in &self.tombstones {
243 if !other.tombstones.contains(tombstone) {
244 return false;
245 }
246 }
247
248 true
249 }
250}
251
252impl<T> DeviceAware for OrSet<T>
253where
254 T: Clone + Eq + Hash + Serialize + for<'de> serde::Deserialize<'de>,
255{
256 fn device_id(&self) -> &DeviceId {
257 &self.device_id
258 }
259
260 fn set_device_id(&mut self, device_id: DeviceId) {
261 self.device_id = device_id;
262 }
263}
264
265impl<T> PartialEq for OrSet<T>
266where
267 T: Clone + Eq + Hash,
268{
269 fn eq(&self, other: &Self) -> bool {
270 self.to_hashset() == other.to_hashset()
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_or_set_creation() {
280 let set: OrSet<String> = OrSet::new("device-1".to_string());
281 assert_eq!(set.len(), 0);
282 assert!(set.is_empty());
283 }
284
285 #[test]
286 fn test_or_set_insert() {
287 let mut set = OrSet::new("device-1".to_string());
288 assert!(set.insert("apple".to_string()));
289 assert!(set.contains(&"apple".to_string()));
290 assert_eq!(set.len(), 1);
291 }
292
293 #[test]
294 fn test_or_set_remove() {
295 let mut set = OrSet::new("device-1".to_string());
296 set.insert("apple".to_string());
297 assert!(set.remove(&"apple".to_string()));
298 assert!(!set.contains(&"apple".to_string()));
299 assert_eq!(set.len(), 0);
300 }
301
302 #[test]
303 fn test_or_set_multiple_elements() {
304 let mut set = OrSet::new("device-1".to_string());
305 set.insert("apple".to_string());
306 set.insert("banana".to_string());
307 set.insert("cherry".to_string());
308
309 assert_eq!(set.len(), 3);
310 assert!(set.contains(&"apple".to_string()));
311 assert!(set.contains(&"banana".to_string()));
312 assert!(set.contains(&"cherry".to_string()));
313 }
314
315 #[test]
316 fn test_or_set_merge() {
317 let mut set1 = OrSet::new("device-1".to_string());
318 let mut set2 = OrSet::new("device-2".to_string());
319
320 set1.insert("apple".to_string());
321 set1.insert("banana".to_string());
322
323 set2.insert("cherry".to_string());
324 set2.insert("date".to_string());
325
326 set1.merge(&set2).ok();
327
328 assert_eq!(set1.len(), 4);
329 assert!(set1.contains(&"apple".to_string()));
330 assert!(set1.contains(&"banana".to_string()));
331 assert!(set1.contains(&"cherry".to_string()));
332 assert!(set1.contains(&"date".to_string()));
333 }
334
335 #[test]
336 fn test_or_set_concurrent_add_remove() {
337 let mut set1 = OrSet::new("device-1".to_string());
338 let mut set2 = OrSet::new("device-2".to_string());
339
340 set1.insert("apple".to_string());
342 set2.insert("apple".to_string());
343
344 set1.remove(&"apple".to_string());
346
347 set1.merge(&set2).ok();
349
350 assert!(set1.contains(&"apple".to_string()));
352 }
353
354 #[test]
355 fn test_or_set_clear() {
356 let mut set = OrSet::new("device-1".to_string());
357 set.insert("apple".to_string());
358 set.insert("banana".to_string());
359
360 set.clear();
361
362 assert_eq!(set.len(), 0);
363 assert!(set.is_empty());
364 }
365
366 #[test]
367 fn test_or_set_iter() {
368 let mut set = OrSet::new("device-1".to_string());
369 set.insert("apple".to_string());
370 set.insert("banana".to_string());
371 set.insert("cherry".to_string());
372
373 let elements: HashSet<_> = set.iter().cloned().collect();
374 assert_eq!(elements.len(), 3);
375 assert!(elements.contains("apple"));
376 assert!(elements.contains("banana"));
377 assert!(elements.contains("cherry"));
378 }
379}