Skip to main content

crdt_kit/
mv_register.rs

1use std::collections::BTreeMap;
2
3use crate::Crdt;
4
5/// A multi-value register (MV-Register).
6///
7/// Unlike LWW-Register, this preserves all concurrently written values.
8/// When concurrent writes occur, all values are kept until a subsequent
9/// write supersedes them. This is useful when you want to detect conflicts
10/// rather than silently resolving them.
11///
12/// # Example
13///
14/// ```
15/// use crdt_kit::prelude::*;
16///
17/// let mut r1 = MVRegister::new("node-1");
18/// r1.set("alice");
19///
20/// let mut r2 = MVRegister::new("node-2");
21/// r2.set("bob");
22///
23/// r1.merge(&r2);
24/// // Both values are preserved as concurrent writes
25/// let values = r1.values();
26/// assert!(values.contains(&&"alice"));
27/// assert!(values.contains(&&"bob"));
28/// ```
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct MVRegister<T: Clone + Ord> {
31    actor: String,
32    /// Version vector: actor -> counter
33    version: BTreeMap<String, u64>,
34    /// Each entry: (value, version_at_write)
35    entries: Vec<(T, BTreeMap<String, u64>)>,
36}
37
38impl<T: Clone + Ord> MVRegister<T> {
39    /// Create a new empty MV-Register for the given actor.
40    pub fn new(actor: impl Into<String>) -> Self {
41        Self {
42            actor: actor.into(),
43            version: BTreeMap::new(),
44            entries: Vec::new(),
45        }
46    }
47
48    /// Set a new value, superseding all current values.
49    pub fn set(&mut self, value: T) {
50        let counter = self.version.entry(self.actor.clone()).or_insert(0);
51        *counter += 1;
52
53        self.entries.clear();
54        self.entries.push((value, self.version.clone()));
55    }
56
57    /// Get all current values.
58    ///
59    /// Returns a single value during normal operation, or multiple values
60    /// when concurrent writes have been merged without a subsequent write.
61    #[must_use]
62    pub fn values(&self) -> Vec<&T> {
63        let mut vals: Vec<&T> = self.entries.iter().map(|(v, _)| v).collect();
64        vals.sort();
65        vals.dedup();
66        vals
67    }
68
69    /// Returns `true` if there are concurrent (conflicting) values.
70    #[must_use]
71    pub fn is_conflicted(&self) -> bool {
72        self.entries.len() > 1
73    }
74
75    /// Get this replica's actor ID.
76    #[must_use]
77    pub fn actor(&self) -> &str {
78        &self.actor
79    }
80}
81
82/// Check if version `a` dominates (is strictly greater than or equal to) version `b`.
83fn dominates(a: &BTreeMap<String, u64>, b: &BTreeMap<String, u64>) -> bool {
84    for (actor, &count) in b {
85        if a.get(actor).copied().unwrap_or(0) < count {
86            return false;
87        }
88    }
89    true
90}
91
92impl<T: Clone + Ord> Crdt for MVRegister<T> {
93    fn merge(&mut self, other: &Self) {
94        // Save self's version before merging for correct dominance checks.
95        let self_version = self.version.clone();
96
97        let mut new_entries = Vec::new();
98
99        // Keep entries from self that are either:
100        // - not dominated by other's version (concurrent or newer), OR
101        // - also present in other's entries (both sides still hold it)
102        for entry in &self.entries {
103            if !dominates(&other.version, &entry.1)
104                || other.entries.iter().any(|e| e.1 == entry.1)
105            {
106                new_entries.push(entry.clone());
107            }
108        }
109
110        // Keep entries from other that are not dominated by self's original
111        // version, and avoid duplicates already added from self.
112        for entry in &other.entries {
113            if !dominates(&self_version, &entry.1)
114                && !new_entries.iter().any(|e| e.1 == entry.1)
115            {
116                new_entries.push(entry.clone());
117            }
118        }
119
120        // Merge version vectors (take max for each actor)
121        for (actor, &count) in &other.version {
122            let entry = self.version.entry(actor.clone()).or_insert(0);
123            *entry = (*entry).max(count);
124        }
125
126        self.entries = new_entries;
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn new_register_is_empty() {
136        let r = MVRegister::<String>::new("a");
137        assert!(r.values().is_empty());
138        assert!(!r.is_conflicted());
139    }
140
141    #[test]
142    fn set_replaces_value() {
143        let mut r = MVRegister::new("a");
144        r.set("hello");
145        assert_eq!(r.values(), vec![&"hello"]);
146
147        r.set("world");
148        assert_eq!(r.values(), vec![&"world"]);
149        assert!(!r.is_conflicted());
150    }
151
152    #[test]
153    fn concurrent_writes_preserved() {
154        let mut r1 = MVRegister::new("a");
155        r1.set("alice");
156
157        let mut r2 = MVRegister::new("b");
158        r2.set("bob");
159
160        r1.merge(&r2);
161        let vals = r1.values();
162        assert_eq!(vals.len(), 2);
163        assert!(vals.contains(&&"alice"));
164        assert!(vals.contains(&&"bob"));
165        assert!(r1.is_conflicted());
166    }
167
168    #[test]
169    fn subsequent_write_resolves_conflict() {
170        let mut r1 = MVRegister::new("a");
171        r1.set("alice");
172
173        let mut r2 = MVRegister::new("b");
174        r2.set("bob");
175
176        r1.merge(&r2);
177        assert!(r1.is_conflicted());
178
179        // New write after merge supersedes both
180        r1.set("resolved");
181        assert_eq!(r1.values(), vec![&"resolved"]);
182        assert!(!r1.is_conflicted());
183    }
184
185    #[test]
186    fn merge_is_commutative() {
187        let mut r1 = MVRegister::new("a");
188        r1.set("x");
189
190        let mut r2 = MVRegister::new("b");
191        r2.set("y");
192
193        let mut left = r1.clone();
194        left.merge(&r2);
195
196        let mut right = r2.clone();
197        right.merge(&r1);
198
199        let mut lv = left.values();
200        lv.sort();
201        let mut rv = right.values();
202        rv.sort();
203        assert_eq!(lv, rv);
204    }
205
206    #[test]
207    fn merge_is_idempotent() {
208        let mut r1 = MVRegister::new("a");
209        r1.set("x");
210
211        let mut r2 = MVRegister::new("b");
212        r2.set("y");
213
214        r1.merge(&r2);
215        let after_first = r1.clone();
216        r1.merge(&r2);
217
218        assert_eq!(r1, after_first);
219    }
220
221    #[test]
222    fn causal_write_supersedes() {
223        let mut r1 = MVRegister::new("a");
224        r1.set("first");
225
226        let mut r2 = r1.clone();
227        // r2 saw r1's write, so its write causally supersedes
228        r2.set("second");
229
230        r1.merge(&r2);
231        assert_eq!(r1.values(), vec![&"second"]);
232        assert!(!r1.is_conflicted());
233    }
234}