ron_crdt/crdt/
lww.rs

1//! Last-writer wins dictionary.
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5use std::str::FromStr;
6
7use uuid::UUID;
8
9use crate::{Atom, Frame, FrameOrd, Op, Terminator, CRDT};
10
11/// An LWW is an associated map from UUIDs to Atoms.
12///
13/// Conflicting writes are merge by using the Op event timestamp. The last writes wins.
14pub struct LWW;
15
16impl LWW {
17    /// Sets `key` to `value` in LWW `state`. Returns the update Frame without modifying the state.
18    pub fn set<'a>(
19        state: &Frame<'a>, key: UUID, value: Atom,
20    ) -> Option<Frame<'a>> {
21        state.peek().map(|op| {
22            let &Op { ref object, .. } = op;
23
24            Frame::compress(vec![Op {
25                ty: UUID::from_str("lww").unwrap(),
26                object: object.clone(),
27                event: UUID::now(),
28                location: key,
29                atoms: vec![value].into(),
30                term: Terminator::Raw,
31            }])
32        })
33    }
34}
35
36impl CRDT for LWW {
37    type T = HashMap<UUID, Atom>;
38
39    fn new<'a>(obj: UUID) -> Frame<'a> {
40        Frame::compress(vec![Op {
41            ty: UUID::from_str("lww").unwrap(),
42            object: obj,
43            event: UUID::now(),
44            location: UUID::zero(),
45            atoms: Default::default(),
46            term: Terminator::Header,
47        }])
48    }
49
50    fn reduce<'a>(
51        state: Frame<'a>, updates: Vec<Frame<'a>>,
52    ) -> Option<Frame<'a>> {
53        super::merge::<LWWOrd>(state, updates)
54    }
55
56    fn map<'a>(state: Frame<'a>) -> Option<Self::T> {
57        use crate::Terminator::*;
58        use std::iter::FromIterator;
59
60        Some(HashMap::from_iter(state.filter_map(|mut op| {
61            match op {
62                Op { term: Header, .. } | Op { term: Query, .. } => None,
63                Op { ref location, ref mut atoms, .. }
64                    if !location.is_zero() && atoms.len() == 1 =>
65                {
66                    Some((location.clone(), atoms.pop().unwrap()))
67                }
68                Op { .. } => None,
69            }
70        })))
71    }
72}
73
74#[derive(Debug)]
75struct LWWOrd<'a>(Frame<'a>);
76
77impl<'a> FrameOrd<'a> for LWWOrd<'a> {
78    fn primary_cmp(a: &Op, b: &Op) -> Ordering {
79        UUID::weak_cmp(&a.location, &b.location)
80    }
81
82    fn secondary_cmp(a: &Op, b: &Op) -> Ordering {
83        UUID::weak_cmp(&b.event, &a.event)
84    }
85
86    fn peek(&self) -> Option<&Op> {
87        self.0.peek()
88    }
89}
90
91impl<'a> Iterator for LWWOrd<'a> {
92    type Item = Op;
93
94    fn next(&mut self) -> Option<Op> {
95        self.0.next()
96    }
97}
98
99impl<'a> From<Frame<'a>> for LWWOrd<'a> {
100    fn from(frame: Frame<'a>) -> Self {
101        LWWOrd(frame)
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108
109    #[test]
110    fn lww_basic_1() {
111        let f1 = Frame::parse("*lww#test1@0!");
112        let f2 = Frame::parse("*lww#test1@time:a'A';");
113        let exp = Frame::parse("*lww#test1@time:0!        :a      'A' ,");
114        let r = LWW::reduce(f1, vec![f2]).unwrap();
115
116        eprintln!(
117            "expected: {:?}",
118            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
119        );
120        eprintln!(
121            "     got: {:?}",
122            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
123        );
124        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
125    }
126
127    #[test]
128    fn lww_basic_2() {
129        let f1 = Frame::parse("*lww#test2@1:0!:a'A'");
130        let f2 = Frame::parse("*lww#test2@2:b'B';");
131        let exp = Frame::parse(
132            "*lww#test2@2:0!
133    @1  :a      'A' ,
134    @2  :b      'B' ,",
135        );
136        let r = LWW::reduce(f1, vec![f2]).unwrap();
137
138        eprintln!(
139            "expected: {:?}",
140            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
141        );
142        eprintln!(
143            "     got: {:?}",
144            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
145        );
146        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
147    }
148
149    #[test]
150    fn lww_basic_3() {
151        let f1 = Frame::parse("*lww#test3@1:a'A1';");
152        let f2 = Frame::parse("*lww#test3@2:a'A2';");
153        let exp = Frame::parse(
154            "*lww#test3@2:1!
155        :a      'A2' ,",
156        );
157        let r = LWW::reduce(f1, vec![f2]).unwrap();
158
159        eprintln!(
160            "expected: {:?}",
161            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
162        );
163        eprintln!(
164            "     got: {:?}",
165            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
166        );
167        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
168    }
169
170    #[test]
171    fn lww_basic_4() {
172        let f1 = Frame::parse(
173            "*lww#test4@2:1!
174    :a  'A1'
175    :b  'B1'
176    :c  'C1'",
177        );
178        let f2 = Frame::parse(
179            "*lww#test4@3:1!
180    :a  'A2'
181    :b  'B2'",
182        );
183        let exp = Frame::parse(
184            "*lww#test4@3:2!
185        :a      'A2' ,
186        :b      'B2' ,
187    @2  :c      'C1' ,",
188        );
189        let r = LWW::reduce(f1, vec![f2]).unwrap();
190
191        eprintln!(
192            "expected: {:?}",
193            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
194        );
195        eprintln!(
196            "     got: {:?}",
197            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
198        );
199        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
200    }
201
202    #[test]
203    fn lww_basic_5() {
204        let f1 = Frame::parse(
205            "*lww#array@1:0!
206    :0%0 =0,
207    :)1%0 =-1",
208        );
209        let f2 = Frame::parse(
210            "*lww#array@2:0!
211    :0%)1 '1',
212    :)1%0 =1,
213    :)1%)1 =65536",
214        );
215        let exp = Frame::parse(
216            "*lww#array@2:0!
217    @1  :0%0      =0  ,
218    @2  :0%0000000001    '1' ,
219        :0000000001%0    =1  ,
220        :0000000001%0000000001    =65536  ,",
221        );
222        let r = LWW::reduce(f1, vec![f2]).unwrap();
223
224        eprintln!(
225            "expected: {:?}",
226            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
227        );
228        eprintln!(
229            "     got: {:?}",
230            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
231        );
232        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
233    }
234
235    #[test]
236    fn lww_basic_6() {
237        let f1 = Frame::parse("*lww#weird@0:0!");
238        let f2 = Frame::parse("*lww#weird@1 :longString 'While classic databases score 0 on the ACID\\' scale, I should probably reserve the value of -1 for one data sync system based on Operational Transforms.\\n Because of the way its OT mechanics worked, even minor glitches messed up the entire database through offset corruption. That was probably the worst case I observed in the wild. Some may build on quicksand, others need solid bedrock… but that system needed a diamond plate to stay still.' ;");
239        let f3 = Frame::parse("*lww#weird@2 :pi ^3.141592653589793 ;");
240        let f4 = Frame::parse("*lww#weird@3 :minus =-9223372036854775808 ;");
241        let exp = Frame::parse("*lww#weird@3:0!
242	@1 :longString 'While classic databases score 0 on the ACID\\' scale, I should probably reserve the value of -1 for one data sync system based on Operational Transforms.\\n Because of the way its OT mechanics worked, even minor glitches messed up the entire database through offset corruption. That was probably the worst case I observed in the wild. Some may build on quicksand, others need solid bedrock… but that system needed a diamond plate to stay still.' ,
243	@3 :minus =-9223372036854775808 ,
244	@2 :pi ^3.141592653589793 ,");
245        let r = LWW::reduce(f1, vec![f2, f3, f4]).unwrap();
246
247        eprintln!(
248            "expected: {:?}",
249            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
250        );
251        eprintln!(
252            "     got: {:?}",
253            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
254        );
255        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
256    }
257
258    #[test]
259    fn lww_basic_7() {
260        let f1 = Frame::parse("*lww#raw@1:one=1;");
261        let f2 = Frame::parse("*lww#raw@2:two^2.0;");
262        let f3 = Frame::parse("*lww#raw@2:three'три';");
263        let exp = Frame::parse(
264            "*lww#raw@2:1!
265	@1 :one =1 ,
266	@2 :three 'три' ,
267	:two ^2.000000e+00 ,",
268        );
269        let r = LWW::reduce(f1, vec![f2, f3]).unwrap();
270
271        eprintln!(
272            "expected: {:?}",
273            exp.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
274        );
275        eprintln!(
276            "     got: {:?}",
277            r.clone().map(|x| format!("{} ", x)).collect::<Vec<_>>()
278        );
279        assert_eq!(exp.collect::<Vec<_>>(), r.collect::<Vec<_>>());
280    }
281
282    #[test]
283    fn map_basic_7() {
284        use std::str::FromStr;
285        let frm = Frame::parse(
286            "*lww#raw@2:1!@1 :one =1 ,@2 :three 'три' ,:two ^2.000000e+00 ,",
287        );
288        let r = LWW::map(frm).unwrap();
289        let mut exp = HashMap::default();
290
291        exp.insert(UUID::from_str("one").unwrap(), Atom::Integer(1));
292        exp.insert(UUID::from_str("two").unwrap(), Atom::Float(2.0));
293        exp.insert(
294            UUID::from_str("three").unwrap(),
295            Atom::String("три".to_string()),
296        );
297
298        assert_eq!(exp, r);
299    }
300
301    #[test]
302    fn map_empty() {
303        let frm = LWW::new(UUID::now());
304        let r = LWW::map(frm).unwrap();
305        let exp = HashMap::default();
306
307        assert_eq!(exp, r);
308    }
309
310    #[test]
311    fn map_insert() {
312        use std::iter::FromIterator;
313
314        let key = UUID::from_str("a").unwrap();
315
316        // empty set
317        let st0 = LWW::new(UUID::now());
318        let exp0 = HashMap::default();
319        assert_eq!(exp0, LWW::map(st0.clone()).unwrap());
320
321        // set 'a' to =1
322        let ch1 = LWW::set(&st0, key, Atom::Integer(1)).unwrap();
323        let st1 = LWW::reduce(st0, vec![ch1]).unwrap();
324        let exp1 =
325            HashMap::from_iter(vec![(key, Atom::Integer(1))].into_iter());
326        assert_eq!(exp1, LWW::map(st1.clone()).unwrap());
327
328        // set 'a' to =1
329        let ch2 = LWW::set(&st1, key, Atom::Integer(1)).unwrap();
330        let st2 = LWW::reduce(st1, vec![ch2]).unwrap();
331        let exp2 =
332            HashMap::from_iter(vec![(key, Atom::Integer(1))].into_iter());
333        assert_eq!(exp2, LWW::map(st2.clone()).unwrap());
334
335        // set 'a' to =2
336        let ch3 = LWW::set(&st2, key, Atom::Integer(2)).unwrap();
337        let st3 = LWW::reduce(st2, vec![ch3]).unwrap();
338        let exp3 = HashMap::from_iter(
339            vec![(key, Atom::Integer(1)), (key, Atom::Integer(2))].into_iter(),
340        );
341        assert_eq!(exp3, LWW::map(st3.clone()).unwrap());
342    }
343}