1use crate::lattice::{DeltaCRDT, Lattice};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use ulid::Ulid;
10
11#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13pub struct Tag {
14 pub replica_id: String,
16 pub unique_id: Ulid,
18}
19
20impl Tag {
21 pub fn new(replica_id: impl Into<String>) -> Self {
22 Self {
23 replica_id: replica_id.into(),
24 unique_id: Ulid::new(),
25 }
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
37pub struct ORSet<T: Ord + Clone> {
38 entries: BTreeMap<T, BTreeSet<Tag>>,
40 tombstones: BTreeSet<Tag>,
43 #[serde(skip)]
45 pending_delta: Option<ORSetDelta<T>>,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ORSetDelta<T: Ord + Clone> {
51 pub additions: BTreeMap<T, BTreeSet<Tag>>,
53 pub removals: BTreeSet<Tag>,
55}
56
57impl<T: Ord + Clone> ORSet<T> {
58 pub fn new() -> Self {
60 Self {
61 entries: BTreeMap::new(),
62 tombstones: BTreeSet::new(),
63 pending_delta: None,
64 }
65 }
66
67 pub fn add(&mut self, replica_id: &str, value: T) {
69 let tag = Tag::new(replica_id);
70
71 self.entries
72 .entry(value.clone())
73 .or_default()
74 .insert(tag.clone());
75
76 let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
78 additions: BTreeMap::new(),
79 removals: BTreeSet::new(),
80 });
81 delta.additions.entry(value).or_default().insert(tag);
82 }
83
84 pub fn remove(&mut self, value: &T) {
86 if let Some(tags) = self.entries.remove(value) {
87 for tag in tags.iter() {
89 self.tombstones.insert(tag.clone());
90 }
91
92 let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
94 additions: BTreeMap::new(),
95 removals: BTreeSet::new(),
96 });
97 delta.removals.extend(tags);
98 }
99 }
100
101 pub fn contains(&self, value: &T) -> bool {
103 self.entries
104 .get(value)
105 .is_some_and(|tags| !tags.is_empty())
106 }
107
108 pub fn iter(&self) -> impl Iterator<Item = &T> {
110 self.entries.keys()
111 }
112
113 pub fn len(&self) -> usize {
115 self.entries.len()
116 }
117
118 pub fn is_empty(&self) -> bool {
120 self.entries.is_empty()
121 }
122}
123
124
125impl<T: Ord + Clone> Default for ORSet<T> {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl<T: Ord + Clone> Lattice for ORSet<T> {
132 fn bottom() -> Self {
133 Self::new()
134 }
135
136 fn join(&self, other: &Self) -> Self {
137 let mut result = Self::new();
138
139 result.tombstones = self.tombstones.union(&other.tombstones).cloned().collect();
141
142 let all_keys: BTreeSet<_> = self
144 .entries
145 .keys()
146 .chain(other.entries.keys())
147 .cloned()
148 .collect();
149
150 for key in all_keys {
151 let self_tags = self.entries.get(&key).cloned().unwrap_or_default();
152 let other_tags = other.entries.get(&key).cloned().unwrap_or_default();
153
154 let merged_tags: BTreeSet<Tag> = self_tags
155 .union(&other_tags)
156 .filter(|tag| !result.tombstones.contains(tag))
157 .cloned()
158 .collect();
159
160 if !merged_tags.is_empty() {
161 result.entries.insert(key, merged_tags);
162 }
163 }
164
165 result
166 }
167}
168
169impl<T: Ord + Clone> Lattice for ORSetDelta<T> {
170 fn bottom() -> Self {
171 Self {
172 additions: BTreeMap::new(),
173 removals: BTreeSet::new(),
174 }
175 }
176
177 fn join(&self, other: &Self) -> Self {
178 let mut additions = self.additions.clone();
179 for (k, v) in &other.additions {
180 additions.entry(k.clone()).or_default().extend(v.clone());
181 }
182
183 Self {
184 additions,
185 removals: self.removals.union(&other.removals).cloned().collect(),
186 }
187 }
188}
189
190impl<T: Ord + Clone> DeltaCRDT for ORSet<T> {
191 type Delta = ORSetDelta<T>;
192
193 fn split_delta(&mut self) -> Option<Self::Delta> {
194 self.pending_delta.take()
195 }
196
197 fn apply_delta(&mut self, delta: &Self::Delta) {
198 self.tombstones.extend(delta.removals.iter().cloned());
200
201 for (value, tags) in &delta.additions {
203 let entry = self.entries.entry(value.clone()).or_default();
204 for tag in tags {
205 if !self.tombstones.contains(tag) {
206 entry.insert(tag.clone());
207 }
208 }
209 }
210
211 self.entries.retain(|_, tags| !tags.is_empty());
213 }
214}