1use alloc::collections::BTreeMap;
2use alloc::vec::Vec;
3
4use crate::{Crdt, DeltaCrdt, NodeId};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct MVRegister<T: Clone + Ord> {
33 actor: NodeId,
34 version: BTreeMap<NodeId, u64>,
36 entries: Vec<(T, BTreeMap<NodeId, u64>)>,
38}
39
40impl<T: Clone + Ord> MVRegister<T> {
41 pub fn new(actor: NodeId) -> Self {
43 Self {
44 actor,
45 version: BTreeMap::new(),
46 entries: Vec::new(),
47 }
48 }
49
50 pub fn set(&mut self, value: T) {
52 let counter = self.version.entry(self.actor).or_insert(0);
53 *counter += 1;
54
55 self.entries.clear();
56 self.entries.push((value, self.version.clone()));
57 }
58
59 #[must_use]
64 pub fn values(&self) -> Vec<&T> {
65 let mut vals: Vec<&T> = self.entries.iter().map(|(v, _)| v).collect();
66 vals.sort();
67 vals.dedup();
68 vals
69 }
70
71 #[must_use]
73 pub fn is_conflicted(&self) -> bool {
74 self.entries.len() > 1
75 }
76
77 #[must_use]
79 pub fn actor(&self) -> NodeId {
80 self.actor
81 }
82}
83
84fn dominates(a: &BTreeMap<NodeId, u64>, b: &BTreeMap<NodeId, u64>) -> bool {
86 for (&actor, &count) in b {
87 if a.get(&actor).copied().unwrap_or(0) < count {
88 return false;
89 }
90 }
91 true
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
97pub struct MVRegisterDelta<T: Clone + Ord> {
98 pub entries: Vec<(T, BTreeMap<NodeId, u64>)>,
100 pub version: BTreeMap<NodeId, u64>,
102}
103
104impl<T: Clone + Ord> DeltaCrdt for MVRegister<T> {
105 type Delta = MVRegisterDelta<T>;
106
107 fn delta(&self, other: &Self) -> MVRegisterDelta<T> {
108 let entries: Vec<_> = self
109 .entries
110 .iter()
111 .filter(|entry| !dominates(&other.version, &entry.1))
112 .cloned()
113 .collect();
114
115 MVRegisterDelta {
116 entries,
117 version: self.version.clone(),
118 }
119 }
120
121 fn apply_delta(&mut self, delta: &MVRegisterDelta<T>) {
122 let self_version = self.version.clone();
123 let mut new_entries = Vec::new();
124
125 for entry in &self.entries {
126 if !dominates(&delta.version, &entry.1)
127 || delta.entries.iter().any(|e| e.1 == entry.1)
128 {
129 new_entries.push(entry.clone());
130 }
131 }
132
133 for entry in &delta.entries {
134 if !dominates(&self_version, &entry.1)
135 && !new_entries.iter().any(|e| e.1 == entry.1)
136 {
137 new_entries.push(entry.clone());
138 }
139 }
140
141 for (&actor, &count) in &delta.version {
142 let v = self.version.entry(actor).or_insert(0);
143 *v = (*v).max(count);
144 }
145
146 self.entries = new_entries;
147 }
148}
149
150impl<T: Clone + Ord> Crdt for MVRegister<T> {
151 fn merge(&mut self, other: &Self) {
152 let self_version = self.version.clone();
153
154 let mut new_entries = Vec::new();
155
156 for entry in &self.entries {
157 if !dominates(&other.version, &entry.1) || other.entries.iter().any(|e| e.1 == entry.1)
158 {
159 new_entries.push(entry.clone());
160 }
161 }
162
163 for entry in &other.entries {
164 if !dominates(&self_version, &entry.1) && !new_entries.iter().any(|e| e.1 == entry.1) {
165 new_entries.push(entry.clone());
166 }
167 }
168
169 for (&actor, &count) in &other.version {
170 let entry = self.version.entry(actor).or_insert(0);
171 *entry = (*entry).max(count);
172 }
173
174 self.entries = new_entries;
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn new_register_is_empty() {
184 let r = MVRegister::<String>::new(1);
185 assert!(r.values().is_empty());
186 assert!(!r.is_conflicted());
187 }
188
189 #[test]
190 fn set_replaces_value() {
191 let mut r = MVRegister::new(1);
192 r.set("hello");
193 assert_eq!(r.values(), vec![&"hello"]);
194
195 r.set("world");
196 assert_eq!(r.values(), vec![&"world"]);
197 assert!(!r.is_conflicted());
198 }
199
200 #[test]
201 fn concurrent_writes_preserved() {
202 let mut r1 = MVRegister::new(1);
203 r1.set("alice");
204
205 let mut r2 = MVRegister::new(2);
206 r2.set("bob");
207
208 r1.merge(&r2);
209 let vals = r1.values();
210 assert_eq!(vals.len(), 2);
211 assert!(vals.contains(&&"alice"));
212 assert!(vals.contains(&&"bob"));
213 assert!(r1.is_conflicted());
214 }
215
216 #[test]
217 fn subsequent_write_resolves_conflict() {
218 let mut r1 = MVRegister::new(1);
219 r1.set("alice");
220
221 let mut r2 = MVRegister::new(2);
222 r2.set("bob");
223
224 r1.merge(&r2);
225 assert!(r1.is_conflicted());
226
227 r1.set("resolved");
228 assert_eq!(r1.values(), vec![&"resolved"]);
229 assert!(!r1.is_conflicted());
230 }
231
232 #[test]
233 fn merge_is_commutative() {
234 let mut r1 = MVRegister::new(1);
235 r1.set("x");
236
237 let mut r2 = MVRegister::new(2);
238 r2.set("y");
239
240 let mut left = r1.clone();
241 left.merge(&r2);
242
243 let mut right = r2.clone();
244 right.merge(&r1);
245
246 let mut lv = left.values();
247 lv.sort();
248 let mut rv = right.values();
249 rv.sort();
250 assert_eq!(lv, rv);
251 }
252
253 #[test]
254 fn merge_is_idempotent() {
255 let mut r1 = MVRegister::new(1);
256 r1.set("x");
257
258 let mut r2 = MVRegister::new(2);
259 r2.set("y");
260
261 r1.merge(&r2);
262 let after_first = r1.clone();
263 r1.merge(&r2);
264
265 assert_eq!(r1, after_first);
266 }
267
268 #[test]
269 fn delta_apply_equivalent_to_merge() {
270 let mut r1 = MVRegister::new(1);
271 r1.set("alice");
272
273 let mut r2 = MVRegister::new(2);
274 r2.set("bob");
275
276 let mut full = r2.clone();
277 full.merge(&r1);
278
279 let mut via_delta = r2.clone();
280 let d = r1.delta(&r2);
281 via_delta.apply_delta(&d);
282
283 let mut fv = full.values();
284 fv.sort();
285 let mut dv = via_delta.values();
286 dv.sort();
287 assert_eq!(fv, dv);
288 }
289
290 #[test]
291 fn delta_from_causal_successor_supersedes() {
292 let mut r1 = MVRegister::new(1);
293 r1.set("first");
294
295 let mut r2 = r1.clone();
296 r2.set("second");
297
298 let d = r2.delta(&r1);
299 let mut via_delta = r1.clone();
300 via_delta.apply_delta(&d);
301
302 assert_eq!(via_delta.values(), vec![&"second"]);
303 assert!(!via_delta.is_conflicted());
304 }
305
306 #[test]
307 fn causal_write_supersedes() {
308 let mut r1 = MVRegister::new(1);
309 r1.set("first");
310
311 let mut r2 = r1.clone();
312 r2.set("second");
313
314 r1.merge(&r2);
315 assert_eq!(r1.values(), vec![&"second"]);
316 assert!(!r1.is_conflicted());
317 }
318}