Skip to main content

mdcs_core/
mvreg.rs

1//! Multi-Value Register CRDT
2//!
3//! The Multi-Value Register (MV-Register) maintains a set of concurrent values
4//! instead of choosing a single winner. Each value is tagged with a unique
5//! identifier (dot) to distinguish different writes.
6//!
7//! When concurrent writes occur, the register contains all of them until
8//! one of them is explicitly observed and the others are discarded.
9
10use crate::lattice::Lattice;
11use serde::{Deserialize, Deserializer, Serialize, Serializer};
12use std::collections::BTreeMap;
13use ulid::Ulid;
14
15/// A unique identifier for a write operation
16#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
17pub struct Dot {
18    pub replica_id: String,
19    pub unique_id: Ulid,
20}
21
22impl Dot {
23    pub fn new(replica_id: impl Into<String>) -> Self {
24        Self {
25            replica_id: replica_id.into(),
26            unique_id: Ulid::new(),
27        }
28    }
29}
30
31/// A Multi-Value Register CRDT
32///
33/// Maintains a set of values, each with a unique dot. This allows
34/// concurrent writes to coexist until explicitly resolved.
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct MVRegister<T: Ord + Clone> {
37    /// Current values, each tagged with a unique dot
38    values: BTreeMap<Dot, T>,
39}
40
41// Custom serialization: serialize as Vec<(Dot, T)> for JSON compatibility
42impl<T: Ord + Clone + Serialize> Serialize for MVRegister<T> {
43    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
44    where
45        S: Serializer,
46    {
47        let entries: Vec<(&Dot, &T)> = self.values.iter().collect();
48        entries.serialize(serializer)
49    }
50}
51
52impl<'de, T: Ord + Clone + Deserialize<'de>> Deserialize<'de> for MVRegister<T> {
53    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
54    where
55        D: Deserializer<'de>,
56    {
57        let entries: Vec<(Dot, T)> = Vec::deserialize(deserializer)?;
58        Ok(Self {
59            values: entries.into_iter().collect(),
60        })
61    }
62}
63
64impl<T: Ord + Clone> MVRegister<T> {
65    /// Create a new empty Multi-Value Register
66    pub fn new() -> Self {
67        Self {
68            values: BTreeMap::new(),
69        }
70    }
71
72    /// Write a new value, generating a unique dot
73    pub fn write(&mut self, replica_id: &str, value: T) -> Dot {
74        let dot = Dot::new(replica_id);
75        // Clear previous values and insert the new one
76        self.values.clear();
77        self.values.insert(dot.clone(), value);
78        dot
79    }
80
81    /// Write a value with a specific dot (for merging)
82    pub fn write_with_dot(&mut self, dot: Dot, value: T) {
83        self.values.insert(dot, value);
84    }
85
86    /// Get all current values
87    pub fn read(&self) -> Vec<&T> {
88        self.values.values().collect()
89    }
90
91    /// Get all current values with their dots
92    pub fn read_with_dots(&self) -> Vec<(&Dot, &T)> {
93        self.values.iter().collect()
94    }
95
96    /// Resolve concurrent values by choosing one (for write-after-read consistency)
97    pub fn resolve(&mut self, replica_id: &str, value: T) -> Dot {
98        let dot = Dot::new(replica_id);
99        self.values.clear();
100        self.values.insert(dot.clone(), value);
101        dot
102    }
103
104    /// Remove a specific dot (value)
105    pub fn remove_dot(&mut self, dot: &Dot) {
106        self.values.remove(dot);
107    }
108
109    /// Check if register is empty
110    pub fn is_empty(&self) -> bool {
111        self.values.is_empty()
112    }
113
114    /// Get the number of concurrent values
115    pub fn len(&self) -> usize {
116        self.values.len()
117    }
118}
119
120impl<T: Ord + Clone> Default for MVRegister<T> {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl<T: Ord + Clone> Lattice for MVRegister<T> {
127    fn bottom() -> Self {
128        Self::new()
129    }
130
131    /// Join operation: union of all values from both registers
132    /// This represents the concurrent state after a merge
133    fn join(&self, other: &Self) -> Self {
134        let mut values = self.values.clone();
135
136        // Union all values from other
137        for (dot, value) in &other.values {
138            // Only insert if we don't already have a value with this dot
139            values.entry(dot.clone()).or_insert_with(|| value.clone());
140        }
141
142        Self { values }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_mvreg_basic_write() {
152        let mut reg = MVRegister::new();
153
154        assert!(reg.is_empty());
155
156        let _dot1 = reg.write("replica1", 42);
157        assert_eq!(reg.len(), 1);
158        assert_eq!(reg.read(), vec![&42]);
159    }
160
161    #[test]
162    fn test_mvreg_concurrent_writes() {
163        let mut reg = MVRegister::new();
164
165        // First write
166        let _dot1 = reg.write("replica1", 10);
167        assert_eq!(reg.read(), vec![&10]);
168
169        // Concurrent write (should clear previous)
170        let _dot2 = reg.write("replica2", 20);
171        assert_eq!(reg.read(), vec![&20]);
172    }
173
174    #[test]
175    fn test_mvreg_merge_concurrent_values() {
176        let mut reg1 = MVRegister::new();
177        reg1.write("replica1", 10);
178
179        let mut reg2 = MVRegister::new();
180        reg2.write("replica2", 20);
181
182        // Merge should have both values
183        let merged = reg1.join(&reg2);
184        let values = merged.read();
185        assert_eq!(values.len(), 2);
186        assert!(values.contains(&&10));
187        assert!(values.contains(&&20));
188    }
189
190    #[test]
191    fn test_mvreg_resolve_conflicts() {
192        let mut reg = MVRegister::new();
193
194        // Multiple concurrent writes
195        reg.write("replica1", 10);
196        let mut reg2 = MVRegister::new();
197        reg2.write("replica2", 20);
198
199        let merged = reg.join(&reg2);
200        assert_eq!(merged.len(), 2);
201
202        // Resolve by choosing one
203        let mut resolved = merged.clone();
204        resolved.resolve("replica3", 30);
205        assert_eq!(resolved.len(), 1);
206        assert_eq!(resolved.read(), vec![&30]);
207    }
208
209    #[test]
210    fn test_mvreg_join_idempotent() {
211        let mut reg = MVRegister::new();
212        reg.write("replica1", 42);
213
214        let joined = reg.join(&reg);
215        assert_eq!(joined.len(), reg.len());
216        assert_eq!(joined.read(), reg.read());
217    }
218
219    #[test]
220    fn test_mvreg_join_commutative() {
221        let mut reg1 = MVRegister::new();
222        reg1.write("replica1", 10);
223
224        let mut reg2 = MVRegister::new();
225        reg2.write("replica2", 20);
226
227        let joined1 = reg1.join(&reg2);
228        let joined2 = reg2.join(&reg1);
229
230        assert_eq!(joined1.len(), joined2.len());
231
232        let mut v1 = joined1.read();
233        let mut v2 = joined2.read();
234        v1.sort();
235        v2.sort();
236        assert_eq!(v1, v2);
237    }
238
239    #[test]
240    fn test_mvreg_join_associative() {
241        let mut reg1 = MVRegister::new();
242        reg1.write("replica1", 10);
243
244        let mut reg2 = MVRegister::new();
245        reg2.write("replica2", 20);
246
247        let mut reg3 = MVRegister::new();
248        reg3.write("replica3", 30);
249
250        let left = reg1.join(&reg2).join(&reg3);
251        let right = reg1.join(&reg2.join(&reg3));
252
253        assert_eq!(left.len(), right.len());
254    }
255
256    #[test]
257    fn test_mvreg_bottom_is_identity() {
258        let mut reg = MVRegister::new();
259        reg.write("replica1", 42);
260
261        let bottom = MVRegister::bottom();
262        let joined = reg.join(&bottom);
263
264        assert_eq!(joined.len(), reg.len());
265        assert_eq!(joined.read(), reg.read());
266    }
267
268    #[test]
269    fn test_mvreg_serialization() {
270        let mut reg = MVRegister::new();
271        reg.write("replica1", 42);
272
273        let serialized = serde_json::to_string(&reg).unwrap();
274        let deserialized: MVRegister<i32> = serde_json::from_str(&serialized).unwrap();
275
276        assert_eq!(deserialized.read(), vec![&42]);
277    }
278}