1use crate::lattice::{DeltaCRDT, Lattice};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use ulid::Ulid;
10
11#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13pub struct Tag {
14 pub replica_id: String,
16 pub unique_id: Ulid,
18}
19
20impl Tag {
21 pub fn new(replica_id: impl Into<String>) -> Self {
22 Self {
23 replica_id: replica_id.into(),
24 unique_id: Ulid::new(),
25 }
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ORSet<T: Ord + Clone> {
31 entries: BTreeMap<T, BTreeSet<Tag>>,
33 tombstones: BTreeSet<Tag>,
36 #[serde(skip)]
38 pending_delta: Option<ORSetDelta<T>>,
39}
40
41#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
42pub struct ORSetDelta<T: Ord + Clone> {
43 pub additions: BTreeMap<T, BTreeSet<Tag>>,
44 pub removals: BTreeSet<Tag>,
45}
46
47impl<T: Ord + Clone> ORSet<T> {
48 pub fn new() -> Self {
49 Self {
50 entries: BTreeMap::new(),
51 tombstones: BTreeSet::new(),
52 pending_delta: None,
53 }
54 }
55
56 pub fn add(&mut self, replica_id: &str, value: T) {
58 let tag = Tag::new(replica_id);
59
60 self.entries
61 .entry(value.clone())
62 .or_default()
63 .insert(tag.clone());
64
65 let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
67 additions: BTreeMap::new(),
68 removals: BTreeSet::new(),
69 });
70 delta.additions.entry(value).or_default().insert(tag);
71 }
72
73 pub fn remove(&mut self, value: &T) {
75 if let Some(tags) = self.entries.remove(value) {
76 for tag in tags.iter() {
78 self.tombstones.insert(tag.clone());
79 }
80
81 let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
83 additions: BTreeMap::new(),
84 removals: BTreeSet::new(),
85 });
86 delta.removals.extend(tags);
87 }
88 }
89
90 pub fn contains(&self, value: &T) -> bool {
91 self.entries
92 .get(value)
93 .is_some_and(|tags| !tags.is_empty())
94 }
95
96 pub fn iter(&self) -> impl Iterator<Item = &T> {
97 self.entries.keys()
98 }
99
100 pub fn len(&self) -> usize {
101 self.entries.len()
102 }
103
104 pub fn is_empty(&self) -> bool {
105 self.entries.is_empty()
106 }
107}
108
109impl<T: Ord + Clone> Default for ORSet<T> {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl<T: Ord + Clone> Lattice for ORSet<T> {
116 fn bottom() -> Self {
117 Self::new()
118 }
119
120 fn join(&self, other: &Self) -> Self {
121 let mut result = Self::new();
122
123 result.tombstones = self.tombstones.union(&other.tombstones).cloned().collect();
125
126 let all_keys: BTreeSet<_> = self
128 .entries
129 .keys()
130 .chain(other.entries.keys())
131 .cloned()
132 .collect();
133
134 for key in all_keys {
135 let self_tags = self.entries.get(&key).cloned().unwrap_or_default();
136 let other_tags = other.entries.get(&key).cloned().unwrap_or_default();
137
138 let merged_tags: BTreeSet<Tag> = self_tags
139 .union(&other_tags)
140 .filter(|tag| !result.tombstones.contains(tag))
141 .cloned()
142 .collect();
143
144 if !merged_tags.is_empty() {
145 result.entries.insert(key, merged_tags);
146 }
147 }
148
149 result
150 }
151}
152
153impl<T: Ord + Clone> Lattice for ORSetDelta<T> {
154 fn bottom() -> Self {
155 Self {
156 additions: BTreeMap::new(),
157 removals: BTreeSet::new(),
158 }
159 }
160
161 fn join(&self, other: &Self) -> Self {
162 let mut additions = self.additions.clone();
163 for (k, v) in &other.additions {
164 additions.entry(k.clone()).or_default().extend(v.clone());
165 }
166
167 Self {
168 additions,
169 removals: self.removals.union(&other.removals).cloned().collect(),
170 }
171 }
172}
173
174impl<T: Ord + Clone> DeltaCRDT for ORSet<T> {
175 type Delta = ORSetDelta<T>;
176
177 fn split_delta(&mut self) -> Option<Self::Delta> {
178 self.pending_delta.take()
179 }
180
181 fn apply_delta(&mut self, delta: &Self::Delta) {
182 self.tombstones.extend(delta.removals.iter().cloned());
184
185 for (value, tags) in &delta.additions {
187 let entry = self.entries.entry(value.clone()).or_default();
188 for tag in tags {
189 if !self.tombstones.contains(tag) {
190 entry.insert(tag.clone());
191 }
192 }
193 }
194
195 self.entries.retain(|_, tags| !tags.is_empty());
197 }
198}