Skip to main content

crdt_kit/
mv_register.rs

1use alloc::collections::BTreeMap;
2use alloc::vec::Vec;
3
4use crate::{Crdt, DeltaCrdt, NodeId};
5
6/// A multi-value register (MV-Register).
7///
8/// Unlike LWW-Register, this preserves all concurrently written values.
9/// When concurrent writes occur, all values are kept until a subsequent
10/// write supersedes them. This is useful when you want to detect conflicts
11/// rather than silently resolving them.
12///
13/// # Example
14///
15/// ```
16/// use crdt_kit::prelude::*;
17///
18/// let mut r1 = MVRegister::new(1);
19/// r1.set("alice");
20///
21/// let mut r2 = MVRegister::new(2);
22/// r2.set("bob");
23///
24/// r1.merge(&r2);
25/// // Both values are preserved as concurrent writes
26/// let values = r1.values();
27/// assert!(values.contains(&&"alice"));
28/// assert!(values.contains(&&"bob"));
29/// ```
30#[derive(Debug, Clone, PartialEq, Eq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct MVRegister<T: Clone + Ord> {
33    actor: NodeId,
34    /// Version vector: actor -> counter
35    version: BTreeMap<NodeId, u64>,
36    /// Each entry: (value, version_at_write)
37    entries: Vec<(T, BTreeMap<NodeId, u64>)>,
38}
39
40impl<T: Clone + Ord> MVRegister<T> {
41    /// Create a new empty MV-Register for the given node.
42    pub fn new(actor: NodeId) -> Self {
43        Self {
44            actor,
45            version: BTreeMap::new(),
46            entries: Vec::new(),
47        }
48    }
49
50    /// Set a new value, superseding all current values.
51    pub fn set(&mut self, value: T) {
52        let counter = self.version.entry(self.actor).or_insert(0);
53        *counter += 1;
54
55        self.entries.clear();
56        self.entries.push((value, self.version.clone()));
57    }
58
59    /// Get all current values.
60    ///
61    /// Returns a single value during normal operation, or multiple values
62    /// when concurrent writes have been merged without a subsequent write.
63    #[must_use]
64    pub fn values(&self) -> Vec<&T> {
65        let mut vals: Vec<&T> = self.entries.iter().map(|(v, _)| v).collect();
66        vals.sort();
67        vals.dedup();
68        vals
69    }
70
71    /// Returns `true` if there are concurrent (conflicting) values.
72    #[must_use]
73    pub fn is_conflicted(&self) -> bool {
74        self.entries.len() > 1
75    }
76
77    /// Get this replica's node ID.
78    #[must_use]
79    pub fn actor(&self) -> NodeId {
80        self.actor
81    }
82}
83
84/// Check if version `a` dominates (is strictly greater than or equal to) version `b`.
85fn dominates(a: &BTreeMap<NodeId, u64>, b: &BTreeMap<NodeId, u64>) -> bool {
86    for (&actor, &count) in b {
87        if a.get(&actor).copied().unwrap_or(0) < count {
88            return false;
89        }
90    }
91    true
92}
93
94/// Delta for [`MVRegister`]: the full state needed to bring a peer up to date.
95#[derive(Debug, Clone, PartialEq, Eq)]
96#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
97pub struct MVRegisterDelta<T: Clone + Ord> {
98    /// Entries that the other replica doesn't have.
99    pub entries: Vec<(T, BTreeMap<NodeId, u64>)>,
100    /// Version vector of the source.
101    pub version: BTreeMap<NodeId, u64>,
102}
103
104impl<T: Clone + Ord> DeltaCrdt for MVRegister<T> {
105    type Delta = MVRegisterDelta<T>;
106
107    fn delta(&self, other: &Self) -> MVRegisterDelta<T> {
108        let entries: Vec<_> = self
109            .entries
110            .iter()
111            .filter(|entry| !dominates(&other.version, &entry.1))
112            .cloned()
113            .collect();
114
115        MVRegisterDelta {
116            entries,
117            version: self.version.clone(),
118        }
119    }
120
121    fn apply_delta(&mut self, delta: &MVRegisterDelta<T>) {
122        let self_version = self.version.clone();
123        let mut new_entries = Vec::new();
124
125        for entry in &self.entries {
126            if !dominates(&delta.version, &entry.1)
127                || delta.entries.iter().any(|e| e.1 == entry.1)
128            {
129                new_entries.push(entry.clone());
130            }
131        }
132
133        for entry in &delta.entries {
134            if !dominates(&self_version, &entry.1)
135                && !new_entries.iter().any(|e| e.1 == entry.1)
136            {
137                new_entries.push(entry.clone());
138            }
139        }
140
141        for (&actor, &count) in &delta.version {
142            let v = self.version.entry(actor).or_insert(0);
143            *v = (*v).max(count);
144        }
145
146        self.entries = new_entries;
147    }
148}
149
150impl<T: Clone + Ord> Crdt for MVRegister<T> {
151    fn merge(&mut self, other: &Self) {
152        let self_version = self.version.clone();
153
154        let mut new_entries = Vec::new();
155
156        for entry in &self.entries {
157            if !dominates(&other.version, &entry.1) || other.entries.iter().any(|e| e.1 == entry.1)
158            {
159                new_entries.push(entry.clone());
160            }
161        }
162
163        for entry in &other.entries {
164            if !dominates(&self_version, &entry.1) && !new_entries.iter().any(|e| e.1 == entry.1) {
165                new_entries.push(entry.clone());
166            }
167        }
168
169        for (&actor, &count) in &other.version {
170            let entry = self.version.entry(actor).or_insert(0);
171            *entry = (*entry).max(count);
172        }
173
174        self.entries = new_entries;
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn new_register_is_empty() {
184        let r = MVRegister::<String>::new(1);
185        assert!(r.values().is_empty());
186        assert!(!r.is_conflicted());
187    }
188
189    #[test]
190    fn set_replaces_value() {
191        let mut r = MVRegister::new(1);
192        r.set("hello");
193        assert_eq!(r.values(), vec![&"hello"]);
194
195        r.set("world");
196        assert_eq!(r.values(), vec![&"world"]);
197        assert!(!r.is_conflicted());
198    }
199
200    #[test]
201    fn concurrent_writes_preserved() {
202        let mut r1 = MVRegister::new(1);
203        r1.set("alice");
204
205        let mut r2 = MVRegister::new(2);
206        r2.set("bob");
207
208        r1.merge(&r2);
209        let vals = r1.values();
210        assert_eq!(vals.len(), 2);
211        assert!(vals.contains(&&"alice"));
212        assert!(vals.contains(&&"bob"));
213        assert!(r1.is_conflicted());
214    }
215
216    #[test]
217    fn subsequent_write_resolves_conflict() {
218        let mut r1 = MVRegister::new(1);
219        r1.set("alice");
220
221        let mut r2 = MVRegister::new(2);
222        r2.set("bob");
223
224        r1.merge(&r2);
225        assert!(r1.is_conflicted());
226
227        r1.set("resolved");
228        assert_eq!(r1.values(), vec![&"resolved"]);
229        assert!(!r1.is_conflicted());
230    }
231
232    #[test]
233    fn merge_is_commutative() {
234        let mut r1 = MVRegister::new(1);
235        r1.set("x");
236
237        let mut r2 = MVRegister::new(2);
238        r2.set("y");
239
240        let mut left = r1.clone();
241        left.merge(&r2);
242
243        let mut right = r2.clone();
244        right.merge(&r1);
245
246        let mut lv = left.values();
247        lv.sort();
248        let mut rv = right.values();
249        rv.sort();
250        assert_eq!(lv, rv);
251    }
252
253    #[test]
254    fn merge_is_idempotent() {
255        let mut r1 = MVRegister::new(1);
256        r1.set("x");
257
258        let mut r2 = MVRegister::new(2);
259        r2.set("y");
260
261        r1.merge(&r2);
262        let after_first = r1.clone();
263        r1.merge(&r2);
264
265        assert_eq!(r1, after_first);
266    }
267
268    #[test]
269    fn delta_apply_equivalent_to_merge() {
270        let mut r1 = MVRegister::new(1);
271        r1.set("alice");
272
273        let mut r2 = MVRegister::new(2);
274        r2.set("bob");
275
276        let mut full = r2.clone();
277        full.merge(&r1);
278
279        let mut via_delta = r2.clone();
280        let d = r1.delta(&r2);
281        via_delta.apply_delta(&d);
282
283        let mut fv = full.values();
284        fv.sort();
285        let mut dv = via_delta.values();
286        dv.sort();
287        assert_eq!(fv, dv);
288    }
289
290    #[test]
291    fn delta_from_causal_successor_supersedes() {
292        let mut r1 = MVRegister::new(1);
293        r1.set("first");
294
295        let mut r2 = r1.clone();
296        r2.set("second");
297
298        let d = r2.delta(&r1);
299        let mut via_delta = r1.clone();
300        via_delta.apply_delta(&d);
301
302        assert_eq!(via_delta.values(), vec![&"second"]);
303        assert!(!via_delta.is_conflicted());
304    }
305
306    #[test]
307    fn causal_write_supersedes() {
308        let mut r1 = MVRegister::new(1);
309        r1.set("first");
310
311        let mut r2 = r1.clone();
312        r2.set("second");
313
314        r1.merge(&r2);
315        assert_eq!(r1.values(), vec![&"second"]);
316        assert!(!r1.is_conflicted());
317    }
318}