logicaffeine_data/crdt/
mvregister.rs1use super::causal::VClock;
7use super::delta::DeltaCrdt;
8use super::replica::{generate_replica_id, ReplicaId};
9use super::Merge;
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MVRegisterDelta<T> {
16 pub values: Vec<(T, VClock)>,
17}
18
19#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
24pub struct MVRegister<T> {
25 values: Vec<(T, VClock)>,
27 replica_id: ReplicaId,
29}
30
31impl<T> MVRegister<T> {
32 pub fn new(replica_id: ReplicaId) -> Self {
34 Self {
35 values: Vec::new(),
36 replica_id,
37 }
38 }
39
40 pub fn new_random() -> Self {
42 Self::new(generate_replica_id())
43 }
44
45 pub fn replica_id(&self) -> ReplicaId {
47 self.replica_id
48 }
49}
50
51impl<T: Clone + PartialEq> MVRegister<T> {
52 pub fn set(&mut self, value: T) {
54 let mut new_clock = VClock::new();
56 for (_, clock) in &self.values {
57 new_clock.merge_vclock(clock);
58 }
59 new_clock.increment(self.replica_id);
60
61 self.values = vec![(value, new_clock)];
63 }
64
65 pub fn values(&self) -> Vec<&T> {
70 self.values.iter().map(|(v, _)| v).collect()
71 }
72
73 pub fn resolve(&mut self, value: T) {
77 self.set(value);
78 }
79
80 pub fn has_conflict(&self) -> bool {
82 self.values.len() > 1
83 }
84}
85
86impl<T: Clone + PartialEq> Merge for MVRegister<T> {
87 fn merge(&mut self, other: &Self) {
91 let mut all_values: Vec<(T, VClock)> = self.values.clone();
93 all_values.extend(other.values.clone());
94
95 let mut result: Vec<(T, VClock)> = Vec::new();
97
98 for (value, clock) in &all_values {
99 let is_dominated = all_values.iter().any(|(_, other_clock)| {
100 other_clock.dominates(clock) && other_clock != clock
101 });
102
103 if !is_dominated {
104 let already_exists = result
106 .iter()
107 .any(|(v, c)| v == value && c == clock);
108
109 if !already_exists {
110 result.push((value.clone(), clock.clone()));
111 }
112 }
113 }
114
115 self.values = result;
116 }
117}
118
119impl<T: Clone + PartialEq + Serialize + DeserializeOwned + Send + 'static> DeltaCrdt
120 for MVRegister<T>
121{
122 type Delta = MVRegisterDelta<T>;
123
124 fn delta_since(&self, since: &VClock) -> Option<Self::Delta> {
125 let current = self.version();
126 if since.dominates(¤t) {
127 return None;
128 }
129
130 Some(MVRegisterDelta {
132 values: self.values.clone(),
133 })
134 }
135
136 fn apply_delta(&mut self, delta: &Self::Delta) {
137 let mut all_values: Vec<(T, VClock)> = self.values.clone();
139 all_values.extend(delta.values.clone());
140
141 let mut result: Vec<(T, VClock)> = Vec::new();
142
143 for (value, clock) in &all_values {
144 let is_dominated = all_values.iter().any(|(_, other_clock)| {
145 other_clock.dominates(clock) && other_clock != clock
146 });
147
148 if !is_dominated {
149 let already_exists = result.iter().any(|(v, c)| v == value && c == clock);
150
151 if !already_exists {
152 result.push((value.clone(), clock.clone()));
153 }
154 }
155 }
156
157 self.values = result;
158 }
159
160 fn version(&self) -> VClock {
161 let mut combined = VClock::new();
163 for (_, clock) in &self.values {
164 combined.merge_vclock(clock);
165 }
166 combined
167 }
168}
169
170impl<T: Clone + PartialEq + Default> Default for MVRegister<T> {
171 fn default() -> Self {
172 Self::new_random()
173 }
174}
175
176
177#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn test_mvregister_new() {
185 let reg: MVRegister<String> = MVRegister::new(1);
186 assert!(reg.values().is_empty());
187 }
188
189 #[test]
190 fn test_mvregister_set_get() {
191 let mut reg: MVRegister<String> = MVRegister::new(1);
192 reg.set("hello".to_string());
193 assert_eq!(reg.values().len(), 1);
194 assert_eq!(reg.values()[0], &"hello".to_string());
195 }
196
197 #[test]
198 fn test_mvregister_concurrent() {
199 let mut a: MVRegister<String> = MVRegister::new(1);
200 let mut b: MVRegister<String> = MVRegister::new(2);
201
202 a.set("from-a".to_string());
203 b.set("from-b".to_string());
204 a.merge(&b);
205
206 assert_eq!(a.values().len(), 2);
207 }
208
209 #[test]
210 fn test_mvregister_resolve() {
211 let mut a: MVRegister<String> = MVRegister::new(1);
212 let mut b: MVRegister<String> = MVRegister::new(2);
213
214 a.set("from-a".to_string());
215 b.set("from-b".to_string());
216 a.merge(&b);
217 a.resolve("resolved".to_string());
218
219 assert_eq!(a.values().len(), 1);
220 assert_eq!(a.values()[0], &"resolved".to_string());
221 }
222}