Skip to main content

crdt_kit/
mv_register.rs

1use alloc::collections::BTreeMap;
2use alloc::string::String;
3use alloc::vec::Vec;
4
5use crate::Crdt;
6
7/// A multi-value register (MV-Register).
8///
9/// Unlike LWW-Register, this preserves all concurrently written values.
10/// When concurrent writes occur, all values are kept until a subsequent
11/// write supersedes them. This is useful when you want to detect conflicts
12/// rather than silently resolving them.
13///
14/// # Example
15///
16/// ```
17/// use crdt_kit::prelude::*;
18///
19/// let mut r1 = MVRegister::new("node-1");
20/// r1.set("alice");
21///
22/// let mut r2 = MVRegister::new("node-2");
23/// r2.set("bob");
24///
25/// r1.merge(&r2);
26/// // Both values are preserved as concurrent writes
27/// let values = r1.values();
28/// assert!(values.contains(&&"alice"));
29/// assert!(values.contains(&&"bob"));
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct MVRegister<T: Clone + Ord> {
34    actor: String,
35    /// Version vector: actor -> counter
36    version: BTreeMap<String, u64>,
37    /// Each entry: (value, version_at_write)
38    entries: Vec<(T, BTreeMap<String, u64>)>,
39}
40
41impl<T: Clone + Ord> MVRegister<T> {
42    /// Create a new empty MV-Register for the given actor.
43    pub fn new(actor: impl Into<String>) -> Self {
44        Self {
45            actor: actor.into(),
46            version: BTreeMap::new(),
47            entries: Vec::new(),
48        }
49    }
50
51    /// Set a new value, superseding all current values.
52    pub fn set(&mut self, value: T) {
53        let counter = self.version.entry(self.actor.clone()).or_insert(0);
54        *counter += 1;
55
56        self.entries.clear();
57        self.entries.push((value, self.version.clone()));
58    }
59
60    /// Get all current values.
61    ///
62    /// Returns a single value during normal operation, or multiple values
63    /// when concurrent writes have been merged without a subsequent write.
64    #[must_use]
65    pub fn values(&self) -> Vec<&T> {
66        let mut vals: Vec<&T> = self.entries.iter().map(|(v, _)| v).collect();
67        vals.sort();
68        vals.dedup();
69        vals
70    }
71
72    /// Returns `true` if there are concurrent (conflicting) values.
73    #[must_use]
74    pub fn is_conflicted(&self) -> bool {
75        self.entries.len() > 1
76    }
77
78    /// Get this replica's actor ID.
79    #[must_use]
80    pub fn actor(&self) -> &str {
81        &self.actor
82    }
83}
84
85/// Check if version `a` dominates (is strictly greater than or equal to) version `b`.
86fn dominates(a: &BTreeMap<String, u64>, b: &BTreeMap<String, u64>) -> bool {
87    for (actor, &count) in b {
88        if a.get(actor).copied().unwrap_or(0) < count {
89            return false;
90        }
91    }
92    true
93}
94
95impl<T: Clone + Ord> Crdt for MVRegister<T> {
96    fn merge(&mut self, other: &Self) {
97        // Save self's version before merging for correct dominance checks.
98        let self_version = self.version.clone();
99
100        let mut new_entries = Vec::new();
101
102        // Keep entries from self that are either:
103        // - not dominated by other's version (concurrent or newer), OR
104        // - also present in other's entries (both sides still hold it)
105        for entry in &self.entries {
106            if !dominates(&other.version, &entry.1) || other.entries.iter().any(|e| e.1 == entry.1)
107            {
108                new_entries.push(entry.clone());
109            }
110        }
111
112        // Keep entries from other that are not dominated by self's original
113        // version, and avoid duplicates already added from self.
114        for entry in &other.entries {
115            if !dominates(&self_version, &entry.1) && !new_entries.iter().any(|e| e.1 == entry.1) {
116                new_entries.push(entry.clone());
117            }
118        }
119
120        // Merge version vectors (take max for each actor)
121        for (actor, &count) in &other.version {
122            let entry = self.version.entry(actor.clone()).or_insert(0);
123            *entry = (*entry).max(count);
124        }
125
126        self.entries = new_entries;
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn new_register_is_empty() {
136        let r = MVRegister::<String>::new("a");
137        assert!(r.values().is_empty());
138        assert!(!r.is_conflicted());
139    }
140
141    #[test]
142    fn set_replaces_value() {
143        let mut r = MVRegister::new("a");
144        r.set("hello");
145        assert_eq!(r.values(), vec![&"hello"]);
146
147        r.set("world");
148        assert_eq!(r.values(), vec![&"world"]);
149        assert!(!r.is_conflicted());
150    }
151
152    #[test]
153    fn concurrent_writes_preserved() {
154        let mut r1 = MVRegister::new("a");
155        r1.set("alice");
156
157        let mut r2 = MVRegister::new("b");
158        r2.set("bob");
159
160        r1.merge(&r2);
161        let vals = r1.values();
162        assert_eq!(vals.len(), 2);
163        assert!(vals.contains(&&"alice"));
164        assert!(vals.contains(&&"bob"));
165        assert!(r1.is_conflicted());
166    }
167
168    #[test]
169    fn subsequent_write_resolves_conflict() {
170        let mut r1 = MVRegister::new("a");
171        r1.set("alice");
172
173        let mut r2 = MVRegister::new("b");
174        r2.set("bob");
175
176        r1.merge(&r2);
177        assert!(r1.is_conflicted());
178
179        // New write after merge supersedes both
180        r1.set("resolved");
181        assert_eq!(r1.values(), vec![&"resolved"]);
182        assert!(!r1.is_conflicted());
183    }
184
185    #[test]
186    fn merge_is_commutative() {
187        let mut r1 = MVRegister::new("a");
188        r1.set("x");
189
190        let mut r2 = MVRegister::new("b");
191        r2.set("y");
192
193        let mut left = r1.clone();
194        left.merge(&r2);
195
196        let mut right = r2.clone();
197        right.merge(&r1);
198
199        let mut lv = left.values();
200        lv.sort();
201        let mut rv = right.values();
202        rv.sort();
203        assert_eq!(lv, rv);
204    }
205
206    #[test]
207    fn merge_is_idempotent() {
208        let mut r1 = MVRegister::new("a");
209        r1.set("x");
210
211        let mut r2 = MVRegister::new("b");
212        r2.set("y");
213
214        r1.merge(&r2);
215        let after_first = r1.clone();
216        r1.merge(&r2);
217
218        assert_eq!(r1, after_first);
219    }
220
221    #[test]
222    fn causal_write_supersedes() {
223        let mut r1 = MVRegister::new("a");
224        r1.set("first");
225
226        let mut r2 = r1.clone();
227        // r2 saw r1's write, so its write causally supersedes
228        r2.set("second");
229
230        r1.merge(&r2);
231        assert_eq!(r1.values(), vec![&"second"]);
232        assert!(!r1.is_conflicted());
233    }
234}