Skip to main content

logicaffeine_data/crdt/
mvregister.rs

1//! MV-Register (Multi-Value Register) CRDT.
2//!
3//! A register that preserves all concurrent writes.
4//! When conflicts occur, all conflicting values are retained until resolved.
5
6use super::causal::VClock;
7use super::delta::DeltaCrdt;
8use super::replica::{generate_replica_id, ReplicaId};
9use super::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12
13/// Delta for MVRegister synchronization.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MVRegisterDelta<T> {
16    pub values: Vec<(T, VClock)>,
17}
18
19/// A register that keeps all concurrent values.
20///
21/// Unlike LWW-Register which silently picks a winner, MVRegister
22/// preserves all concurrent writes so conflicts can be detected and resolved.
23#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24pub struct MVRegister<T> {
25    /// Each value paired with its vector clock
26    values: Vec<(T, VClock)>,
27    /// This replica's ID
28    replica_id: ReplicaId,
29}
30
31impl<T> MVRegister<T> {
32    /// Create a new empty register with a specific replica ID.
33    pub fn new(replica_id: ReplicaId) -> Self {
34        Self {
35            values: Vec::new(),
36            replica_id,
37        }
38    }
39
40    /// Create a new register with a random replica ID.
41    pub fn new_random() -> Self {
42        Self::new(generate_replica_id())
43    }
44
45    /// Get the replica ID for this register.
46    pub fn replica_id(&self) -> ReplicaId {
47        self.replica_id
48    }
49}
50
51impl<T: Clone + PartialEq> MVRegister<T> {
52    /// Set a new value, creating a new version that dominates all current values.
53    pub fn set(&mut self, value: T) {
54        // Create a clock that dominates all current clocks
55        let mut new_clock = VClock::new();
56        for (_, clock) in &self.values {
57            new_clock.merge_vclock(clock);
58        }
59        new_clock.increment(self.replica_id);
60
61        // Replace all values with the new one
62        self.values = vec![(value, new_clock)];
63    }
64
65    /// Get all current values.
66    ///
67    /// If there's only one value, there's no conflict.
68    /// Multiple values indicate concurrent writes that need resolution.
69    pub fn values(&self) -> Vec<&T> {
70        self.values.iter().map(|(v, _)| v).collect()
71    }
72
73    /// Resolve a conflict by setting a new value.
74    ///
75    /// This is the same as `set`, but semantically indicates conflict resolution.
76    pub fn resolve(&mut self, value: T) {
77        self.set(value);
78    }
79
80    /// Check if there's a conflict (more than one value).
81    pub fn has_conflict(&self) -> bool {
82        self.values.len() > 1
83    }
84}
85
86impl<T: Clone + PartialEq> Merge for MVRegister<T> {
87    /// Merge another register into this one.
88    ///
89    /// Keeps values that are not dominated by any other value.
90    fn merge(&mut self, other: &Self) {
91        // Collect all values from both registers
92        let mut all_values: Vec<(T, VClock)> = self.values.clone();
93        all_values.extend(other.values.clone());
94
95        // Keep only values that are not dominated by any other
96        let mut result: Vec<(T, VClock)> = Vec::new();
97
98        for (value, clock) in &all_values {
99            let is_dominated = all_values.iter().any(|(_, other_clock)| {
100                other_clock.dominates(clock) && other_clock != clock
101            });
102
103            if !is_dominated {
104                // Check if we already have this exact value+clock
105                let already_exists = result
106                    .iter()
107                    .any(|(v, c)| v == value && c == clock);
108
109                if !already_exists {
110                    result.push((value.clone(), clock.clone()));
111                }
112            }
113        }
114
115        self.values = result;
116    }
117}
118
119impl<T: Clone + PartialEq + Serialize + DeserializeOwned + Send + 'static> DeltaCrdt
120    for MVRegister<T>
121{
122    type Delta = MVRegisterDelta<T>;
123
124    fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
125        let current = self.version();
126        if since.dominates(&current) {
127            return None;
128        }
129
130        // Return all values as delta
131        Some(MVRegisterDelta {
132            values: self.values.clone(),
133        })
134    }
135
136    fn apply_delta(&mut self, delta: &Self::Delta) {
137        // Merge the delta values using MVRegister merge semantics
138        let mut all_values: Vec<(T, VClock)> = self.values.clone();
139        all_values.extend(delta.values.clone());
140
141        let mut result: Vec<(T, VClock)> = Vec::new();
142
143        for (value, clock) in &all_values {
144            let is_dominated = all_values.iter().any(|(_, other_clock)| {
145                other_clock.dominates(clock) && other_clock != clock
146            });
147
148            if !is_dominated {
149                let already_exists = result.iter().any(|(v, c)| v == value && c == clock);
150
151                if !already_exists {
152                    result.push((value.clone(), clock.clone()));
153                }
154            }
155        }
156
157        self.values = result;
158    }
159
160    fn version(&self) -> VClock {
161        // Version is the merge of all value clocks
162        let mut combined = VClock::new();
163        for (_, clock) in &self.values {
164            combined.merge_vclock(clock);
165        }
166        combined
167    }
168}
169
170impl<T: Clone + PartialEq + Default> Default for MVRegister<T> {
171    fn default() -> Self {
172        Self::new_random()
173    }
174}
175
176
177// NOTE: Showable impl is in logicaffeine_system (io module)
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn test_mvregister_new() {
185        let reg: MVRegister<String> = MVRegister::new(1);
186        assert!(reg.values().is_empty());
187    }
188
189    #[test]
190    fn test_mvregister_set_get() {
191        let mut reg: MVRegister<String> = MVRegister::new(1);
192        reg.set("hello".to_string());
193        assert_eq!(reg.values().len(), 1);
194        assert_eq!(reg.values()[0], &"hello".to_string());
195    }
196
197    #[test]
198    fn test_mvregister_concurrent() {
199        let mut a: MVRegister<String> = MVRegister::new(1);
200        let mut b: MVRegister<String> = MVRegister::new(2);
201
202        a.set("from-a".to_string());
203        b.set("from-b".to_string());
204        a.merge(&b);
205
206        assert_eq!(a.values().len(), 2);
207    }
208
209    #[test]
210    fn test_mvregister_resolve() {
211        let mut a: MVRegister<String> = MVRegister::new(1);
212        let mut b: MVRegister<String> = MVRegister::new(2);
213
214        a.set("from-a".to_string());
215        b.set("from-b".to_string());
216        a.merge(&b);
217        a.resolve("resolved".to_string());
218
219        assert_eq!(a.values().len(), 1);
220        assert_eq!(a.values()[0], &"resolved".to_string());
221    }
222}