atomr_distributed_data/
sets.rs1use std::collections::{HashMap, HashSet};
4use std::hash::Hash;
5
6use serde::{Deserialize, Serialize};
7
8use crate::traits::CrdtMerge;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct GSet<T>
12where
13 T: Eq + Hash + Clone,
14{
15 items: HashSet<T>,
16}
17
18impl<T: Eq + Hash + Clone> Default for GSet<T> {
19 fn default() -> Self {
20 Self { items: HashSet::new() }
21 }
22}
23
24impl<T: Eq + Hash + Clone> GSet<T> {
25 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn add(&mut self, item: T) {
30 self.items.insert(item);
31 }
32
33 pub fn contains(&self, item: &T) -> bool {
34 self.items.contains(item)
35 }
36
37 pub fn iter(&self) -> impl Iterator<Item = &T> {
38 self.items.iter()
39 }
40
41 pub fn len(&self) -> usize {
42 self.items.len()
43 }
44
45 pub fn is_empty(&self) -> bool {
46 self.items.is_empty()
47 }
48}
49
50impl<T: Eq + Hash + Clone> CrdtMerge for GSet<T> {
51 fn merge(&mut self, other: &Self) {
52 for item in &other.items {
53 self.items.insert(item.clone());
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct OrSet<T>
63where
64 T: Eq + Hash + Clone,
65{
66 adds: HashMap<T, HashSet<u64>>,
67 removes: HashMap<T, HashSet<u64>>,
68 counter: u64,
69}
70
71impl<T: Eq + Hash + Clone> Default for OrSet<T> {
72 fn default() -> Self {
73 Self { adds: HashMap::new(), removes: HashMap::new(), counter: 0 }
74 }
75}
76
77impl<T: Eq + Hash + Clone> OrSet<T> {
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 pub fn add(&mut self, item: T) {
83 self.counter += 1;
84 self.adds.entry(item).or_default().insert(self.counter);
85 }
86
87 pub fn remove(&mut self, item: &T) {
88 if let Some(tags) = self.adds.get(item).cloned() {
89 self.removes.entry(item.clone()).or_default().extend(tags);
90 }
91 }
92
93 pub fn contains(&self, item: &T) -> bool {
94 match (self.adds.get(item), self.removes.get(item)) {
95 (Some(a), Some(r)) => a.difference(r).next().is_some(),
96 (Some(a), None) => !a.is_empty(),
97 _ => false,
98 }
99 }
100}
101
102impl<T: Eq + Hash + Clone> CrdtMerge for OrSet<T> {
103 fn merge(&mut self, other: &Self) {
104 for (k, v) in &other.adds {
105 self.adds.entry(k.clone()).or_default().extend(v.iter().copied());
106 }
107 for (k, v) in &other.removes {
108 self.removes.entry(k.clone()).or_default().extend(v.iter().copied());
109 }
110 self.counter = self.counter.max(other.counter);
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[test]
119 fn gset_merges_union() {
120 let mut a = GSet::<i32>::new();
121 let mut b = GSet::<i32>::new();
122 a.add(1);
123 b.add(2);
124 a.merge(&b);
125 assert_eq!(a.len(), 2);
126 }
127
128 #[test]
129 fn orset_add_then_remove() {
130 let mut s = OrSet::<&'static str>::new();
131 s.add("x");
132 assert!(s.contains(&"x"));
133 s.remove(&"x");
134 assert!(!s.contains(&"x"));
135 }
136
137 #[test]
138 fn orset_merge_preserves_re_add_after_concurrent_remove() {
139 let mut a = OrSet::<&'static str>::new();
140 a.add("x");
141
142 let mut b = a.clone();
143 b.remove(&"x");
144
145 a.add("x");
146 a.merge(&b);
147 assert!(a.contains(&"x"));
148 }
149}