1use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::str::FromStr;
6
7use uuid::UUID;
8
9use crate::{Atom, Frame, FrameOrd, Op, Terminator, CRDT};
10
11pub struct LWW;
15
16impl LWW {
17 pub fn set<'a>(
19 state: &Frame<'a>, key: UUID, value: Atom,
20 ) -> Option<Frame<'a>> {
21 state.peek().map(|op| {
22 let &Op { ref object, .. } = op;
23
24 Frame::compress(vec![Op {
25 ty: UUID::from_str("lww").unwrap(),
26 object: object.clone(),
27 event: UUID::now(),
28 location: key,
29 atoms: vec![value].into(),
30 term: Terminator::Raw,
31 }])
32 })
33 }
34}
35
36impl CRDT for LWW {
37 type T = HashMap<UUID, Atom>;
38
39 fn new<'a>(obj: UUID) -> Frame<'a> {
40 Frame::compress(vec![Op {
41 ty: UUID::from_str("lww").unwrap(),
42 object: obj,
43 event: UUID::now(),
44 location: UUID::zero(),
45 atoms: Default::default(),
46 term: Terminator::Header,
47 }])
48 }
49
50 fn reduce<'a>(
51 state: Frame<'a>, updates: Vec<Frame<'a>>,
52 ) -> Option<Frame<'a>> {
53 super::merge::<LWWOrd>(state, updates)
54 }
55
56 fn map<'a>(state: Frame<'a>) -> Option<Self::T> {
57 use crate::Terminator::*;
58 use std::iter::FromIterator;
59
60 Some(HashMap::from_iter(state.filter_map(|mut op| {
61 match op {
62 Op { term: Header, .. } | Op { term: Query, .. } => None,
63 Op { ref location, ref mut atoms, .. }
64 if !location.is_zero() && atoms.len() == 1 =>
65 {
66 Some((location.clone(), atoms.pop().unwrap()))
67 }
68 Op { .. } => None,
69 }
70 })))
71 }
72}
73
74#[derive(Debug)]
75struct LWWOrd<'a>(Frame<'a>);
76
77impl<'a> FrameOrd<'a> for LWWOrd<'a> {
78 fn primary_cmp(a: &Op, b: &Op) -> Ordering {
79 UUID::weak_cmp(&a.location, &b.location)
80 }
81
82 fn secondary_cmp(a: &Op, b: &Op) -> Ordering {
83 UUID::weak_cmp(&b.event, &a.event)
84 }
85
86 fn peek(&self) -> Option<&Op> {
87 self.0.peek()
88 }
89}
90
91impl<'a> Iterator for LWWOrd<'a> {
92 type Item = Op;
93
94 fn next(&mut self) -> Option<Op> {
95 self.0.next()
96 }
97}
98
99impl<'a> From<Frame<'a>> for LWWOrd<'a> {
100 fn from(frame: Frame<'a>) -> Self {
101 LWWOrd(frame)
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 #[test]
110 fn lww_basic_1() {
111 let f1 = Frame::parse("*lww#test1@0!");
112 let f2 = Frame::parse("*lww#test1@time:a'A';");
113 let exp = Frame::parse("*lww#test1@time:0! :a 'A' ,");
114 let r = LWW::reduce(f1, vec![f2]).unwrap();
115
116 eprintln!(
117 "expected: {:?}",
118 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
119 );
120 eprintln!(
121 " got: {:?}",
122 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
123 );
124 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
125 }
126
127 #[test]
128 fn lww_basic_2() {
129 let f1 = Frame::parse("*lww#test2@1:0!:a'A'");
130 let f2 = Frame::parse("*lww#test2@2:b'B';");
131 let exp = Frame::parse(
132 "*lww#test2@2:0!
133 @1 :a 'A' ,
134 @2 :b 'B' ,",
135 );
136 let r = LWW::reduce(f1, vec![f2]).unwrap();
137
138 eprintln!(
139 "expected: {:?}",
140 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
141 );
142 eprintln!(
143 " got: {:?}",
144 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
145 );
146 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
147 }
148
149 #[test]
150 fn lww_basic_3() {
151 let f1 = Frame::parse("*lww#test3@1:a'A1';");
152 let f2 = Frame::parse("*lww#test3@2:a'A2';");
153 let exp = Frame::parse(
154 "*lww#test3@2:1!
155 :a 'A2' ,",
156 );
157 let r = LWW::reduce(f1, vec![f2]).unwrap();
158
159 eprintln!(
160 "expected: {:?}",
161 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
162 );
163 eprintln!(
164 " got: {:?}",
165 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
166 );
167 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
168 }
169
170 #[test]
171 fn lww_basic_4() {
172 let f1 = Frame::parse(
173 "*lww#test4@2:1!
174 :a 'A1'
175 :b 'B1'
176 :c 'C1'",
177 );
178 let f2 = Frame::parse(
179 "*lww#test4@3:1!
180 :a 'A2'
181 :b 'B2'",
182 );
183 let exp = Frame::parse(
184 "*lww#test4@3:2!
185 :a 'A2' ,
186 :b 'B2' ,
187 @2 :c 'C1' ,",
188 );
189 let r = LWW::reduce(f1, vec![f2]).unwrap();
190
191 eprintln!(
192 "expected: {:?}",
193 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
194 );
195 eprintln!(
196 " got: {:?}",
197 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
198 );
199 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
200 }
201
202 #[test]
203 fn lww_basic_5() {
204 let f1 = Frame::parse(
205 "*lww#array@1:0!
206 :0%0 =0,
207 :)1%0 =-1",
208 );
209 let f2 = Frame::parse(
210 "*lww#array@2:0!
211 :0%)1 '1',
212 :)1%0 =1,
213 :)1%)1 =65536",
214 );
215 let exp = Frame::parse(
216 "*lww#array@2:0!
217 @1 :0%0 =0 ,
218 @2 :0%0000000001 '1' ,
219 :0000000001%0 =1 ,
220 :0000000001%0000000001 =65536 ,",
221 );
222 let r = LWW::reduce(f1, vec![f2]).unwrap();
223
224 eprintln!(
225 "expected: {:?}",
226 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
227 );
228 eprintln!(
229 " got: {:?}",
230 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
231 );
232 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
233 }
234
235 #[test]
236 fn lww_basic_6() {
237 let f1 = Frame::parse("*lww#weird@0:0!");
238 let f2 = Frame::parse("*lww#weird@1 :longString 'While classic databases score 0 on the ACID\\' scale, I should probably reserve the value of -1 for one data sync system based on Operational Transforms.\\n Because of the way its OT mechanics worked, even minor glitches messed up the entire database through offset corruption. That was probably the worst case I observed in the wild. Some may build on quicksand, others need solid bedrock… but that system needed a diamond plate to stay still.' ;");
239 let f3 = Frame::parse("*lww#weird@2 :pi ^3.141592653589793 ;");
240 let f4 = Frame::parse("*lww#weird@3 :minus =-9223372036854775808 ;");
241 let exp = Frame::parse("*lww#weird@3:0!
242 @1 :longString 'While classic databases score 0 on the ACID\\' scale, I should probably reserve the value of -1 for one data sync system based on Operational Transforms.\\n Because of the way its OT mechanics worked, even minor glitches messed up the entire database through offset corruption. That was probably the worst case I observed in the wild. Some may build on quicksand, others need solid bedrock… but that system needed a diamond plate to stay still.' ,
243 @3 :minus =-9223372036854775808 ,
244 @2 :pi ^3.141592653589793 ,");
245 let r = LWW::reduce(f1, vec![f2, f3, f4]).unwrap();
246
247 eprintln!(
248 "expected: {:?}",
249 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
250 );
251 eprintln!(
252 " got: {:?}",
253 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
254 );
255 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
256 }
257
258 #[test]
259 fn lww_basic_7() {
260 let f1 = Frame::parse("*lww#raw@1:one=1;");
261 let f2 = Frame::parse("*lww#raw@2:two^2.0;");
262 let f3 = Frame::parse("*lww#raw@2:three'три';");
263 let exp = Frame::parse(
264 "*lww#raw@2:1!
265 @1 :one =1 ,
266 @2 :three 'три' ,
267 :two ^2.000000e+00 ,",
268 );
269 let r = LWW::reduce(f1, vec![f2, f3]).unwrap();
270
271 eprintln!(
272 "expected: {:?}",
273 exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
274 );
275 eprintln!(
276 " got: {:?}",
277 r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
278 );
279 assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
280 }
281
282 #[test]
283 fn map_basic_7() {
284 use std::str::FromStr;
285 let frm = Frame::parse(
286 "*lww#raw@2:1!@1 :one =1 ,@2 :three 'три' ,:two ^2.000000e+00 ,",
287 );
288 let r = LWW::map(frm).unwrap();
289 let mut exp = HashMap::default();
290
291 exp.insert(UUID::from_str("one").unwrap(), Atom::Integer(1));
292 exp.insert(UUID::from_str("two").unwrap(), Atom::Float(2.0));
293 exp.insert(
294 UUID::from_str("three").unwrap(),
295 Atom::String("три".to_string()),
296 );
297
298 assert_eq!(exp, r);
299 }
300
301 #[test]
302 fn map_empty() {
303 let frm = LWW::new(UUID::now());
304 let r = LWW::map(frm).unwrap();
305 let exp = HashMap::default();
306
307 assert_eq!(exp, r);
308 }
309
310 #[test]
311 fn map_insert() {
312 use std::iter::FromIterator;
313
314 let key = UUID::from_str("a").unwrap();
315
316 let st0 = LWW::new(UUID::now());
318 let exp0 = HashMap::default();
319 assert_eq!(exp0, LWW::map(st0.clone()).unwrap());
320
321 let ch1 = LWW::set(&st0, key, Atom::Integer(1)).unwrap();
323 let st1 = LWW::reduce(st0, vec![ch1]).unwrap();
324 let exp1 =
325 HashMap::from_iter(vec![(key, Atom::Integer(1))].into_iter());
326 assert_eq!(exp1, LWW::map(st1.clone()).unwrap());
327
328 let ch2 = LWW::set(&st1, key, Atom::Integer(1)).unwrap();
330 let st2 = LWW::reduce(st1, vec![ch2]).unwrap();
331 let exp2 =
332 HashMap::from_iter(vec![(key, Atom::Integer(1))].into_iter());
333 assert_eq!(exp2, LWW::map(st2.clone()).unwrap());
334
335 let ch3 = LWW::set(&st2, key, Atom::Integer(2)).unwrap();
337 let st3 = LWW::reduce(st2, vec![ch3]).unwrap();
338 let exp3 = HashMap::from_iter(
339 vec![(key, Atom::Integer(1)), (key, Atom::Integer(2))].into_iter(),
340 );
341 assert_eq!(exp3, LWW::map(st3.clone()).unwrap());
342 }
343}