ron_crdt/
batch.rs

1//! Batch of Frames
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::io::{self, Write};
6use std::ops::Range;
7
8use uuid::UUID;
9
10use crate::{scan_for_float, scan_for_integer, scan_for_string, Frame};
11
12/// An iterator over frames.
13#[derive(Clone, Debug)]
14pub struct Batch<'a> {
15    body: Cow<'a, str>,
16    next: Option<Range<usize>>,
17}
18
19impl<'a> Batch<'a> {
20    /// Crates a new batch from text encoded frames `s`.
21    pub fn parse<S>(s: S) -> Batch<'a>
22    where
23        S: Into<Cow<'a, str>>,
24    {
25        let b = s.into();
26        let mut n = Batch::scan(&b[..]);
27
28        if let Some(rgn) = n.clone() {
29            if rgn.start == 0 {
30                n = Batch::scan(&b[rgn.end..])
31                    .map(|x| (x.start + rgn.end)..(x.end + rgn.end));
32            }
33        }
34
35        Batch { body: b, next: n }
36    }
37
38    /// Indexes all frames. Returns map from object UUID to a pair of type and frames refering to
39    /// the object.
40    pub fn index(self) -> Option<HashMap<UUID, (UUID, Vec<Frame<'a>>)>> {
41        use crate::Op;
42
43        let mut index = HashMap::<UUID, (UUID, Vec<Frame<'a>>)>::default();
44
45        for frm in self {
46            match frm.peek().cloned() {
47                Some(Op { ty, object, .. }) => {
48                    let ent = index
49                        .entry(object)
50                        .or_insert_with(|| (ty.clone(), Default::default()));
51
52                    if ent.0 == ty {
53                        ent.1.push(frm);
54                    } else {
55                        error!(
56                            "miss matched type/object pair: {} vs. {} for object {}",
57                            ent.0, ty, object
58                        );
59                        return None;
60                    }
61                }
62                None => {}
63            }
64        }
65
66        Some(index)
67    }
68
69    /// Reduces all frames found in `self` and outputs the final status frames.
70    pub fn reduce_all<W>(self, mut out: W) -> io::Result<()>
71    where
72        W: Write,
73    {
74        use crate::{Set, CRDT, LWW};
75        use std::io::{Error, ErrorKind};
76        use std::str::FromStr;
77
78        let index = self
79            .index()
80            .ok_or(Error::new(ErrorKind::Other, "indexing failed"))?;
81        let lww = UUID::from_str("lww").unwrap();
82        let set = UUID::from_str("set").unwrap();
83
84        for (_, (ty, mut frames)) in index {
85            match frames.len() {
86                0 => {}
87                1 => {
88                    out.write_all(frames[0].body().as_bytes())?;
89                }
90                _ => {
91                    let s = frames.pop().unwrap();
92                    let state = if ty == lww {
93                        LWW::reduce(s, frames)
94                    } else if ty == set {
95                        Set::reduce(s, frames)
96                    } else {
97                        warn!("unknown type {}", ty);
98
99                        out.write_all(s.body().as_bytes())?;
100                        for frm in frames {
101                            out.write_all(frm.body().as_bytes())?;
102                        }
103                        continue;
104                    };
105
106                    match state {
107                        Some(state) => {
108                            out.write_all(state.body().as_bytes())?;
109                        }
110                        None => {}
111                    }
112                }
113            }
114        }
115
116        Ok(())
117    }
118
119    fn scan(s: &str) -> Option<Range<usize>> {
120        #[derive(PartialEq, Eq, Clone, Copy, Debug)]
121        enum Scan {
122            Initial,
123            SawType,
124            SawObject,
125            SawEvent,
126            SawLoc,
127        };
128        let mut pos = 0usize;
129
130        loop {
131            let start = pos;
132            let mut state = Scan::Initial;
133
134            // spec
135            loop {
136                match (state, s.get(pos..pos + 1)) {
137                    (Scan::Initial, Some("*")) => {
138                        state = Scan::SawType;
139                        pos += Self::scan_uuid(&s[pos + 1..]) + 1;
140                    }
141                    (Scan::Initial, Some("#")) | (Scan::SawType, Some("#")) => {
142                        state = Scan::SawObject;
143                        pos += Self::scan_uuid(&s[pos + 1..]) + 1;
144                    }
145                    (Scan::Initial, Some("@"))
146                    | (Scan::SawType, Some("@"))
147                    | (Scan::SawObject, Some("@")) => {
148                        state = Scan::SawEvent;
149                        pos += Self::scan_uuid(&s[pos + 1..]) + 1;
150                    }
151                    (Scan::Initial, Some(":"))
152                    | (Scan::SawType, Some(":"))
153                    | (Scan::SawObject, Some(":"))
154                    | (Scan::SawEvent, Some(":")) => {
155                        state = Scan::SawLoc;
156                        pos += Self::scan_uuid(&s[pos + 1..]) + 1;
157                    }
158                    (_, Some(x)) => {
159                        if x.chars().next().unwrap().is_whitespace() {
160                            pos += 1;
161                        } else {
162                            break;
163                        }
164                    }
165                    _ => {
166                        break;
167                    }
168                }
169            }
170
171            if state == Scan::Initial {
172                return None;
173            }
174
175            // atoms
176            loop {
177                match s.get(pos..pos + 1) {
178                    // atoms
179                    Some("=") => {
180                        pos += scan_for_integer(&s[pos + 1..]).unwrap_or(0) + 1;
181                    }
182                    Some("^") => {
183                        pos += scan_for_float(&s[pos + 1..]).unwrap_or(0) + 1;
184                    }
185                    Some(">") => {
186                        pos += Self::scan_uuid(&s[pos + 1..]) + 1;
187                    }
188                    Some("\'") => {
189                        pos += scan_for_string(&s[pos + 1..]).unwrap_or(0) + 2;
190                    }
191
192                    // terminator
193                    Some("?") | Some(",") => {
194                        pos += 1;
195                        break;
196                    }
197
198                    Some("!") | Some(";") => {
199                        return Some(start..pos + 1);
200                    }
201
202                    // next op
203                    Some("*") | Some("#") | Some("@") | Some(":") => break,
204
205                    // skip whitespace
206                    Some(x) => {
207                        if x.chars().next().unwrap().is_whitespace() {
208                            pos += 1;
209                        } else {
210                            return None;
211                        }
212                    }
213
214                    _ => {
215                        return None;
216                    }
217                }
218            }
219        }
220    }
221
222    fn scan_uuid(s: &str) -> usize {
223        let mut ret = 0;
224
225        while let Some(ch) = s.get(ret..ret + 1) {
226            let is_uuid_ch =
227                ch.chars().next().map(|x| x.is_digit(36)).unwrap_or(false)
228                    || ch == "~"
229                    || ch == "_"
230                    || ch == "-"
231                    || ch == "+"
232                    || ch == "%"
233                    || ch == "("
234                    || ch == "{"
235                    || ch == "["
236                    || ch == ")"
237                    || ch == "}"
238                    || ch == "]";
239
240            if !is_uuid_ch {
241                return ret;
242            }
243
244            ret += 1;
245        }
246
247        ret
248    }
249}
250
251impl<'a> Iterator for Batch<'a> {
252    type Item = Frame<'a>;
253
254    fn next(&mut self) -> Option<Frame<'a>> {
255        if self.body.is_empty() || self.body.starts_with(".") {
256            return None;
257        }
258
259        let p = self.next.take();
260        let end = p.clone().map(|x| x.start).unwrap_or(self.body.len());
261        let ret = match &mut self.body {
262            &mut Cow::Borrowed(s) => Frame::parse(&s[..end]),
263            &mut Cow::Owned(ref mut s) => Frame::parse(s[..end].to_string()),
264        };
265
266        match p {
267            Some(rgn) => {
268                let start = rgn.start;
269                let end = rgn.end;
270
271                self.next = Batch::scan(&self.body[end..]).map(|x| {
272                    let l = end - start;
273                    (x.start + l)..(x.end + l)
274                });
275
276                match &mut self.body {
277                    b @ &mut Cow::Borrowed(_) => {
278                        let s = match b {
279                            &mut Cow::Borrowed(s) => &s[start..],
280                            _ => unreachable!(),
281                        };
282
283                        *b = Cow::Borrowed(s);
284                    }
285                    &mut Cow::Owned(ref mut s) => s.replace_range(0..start, ""),
286                }
287            }
288            None => {
289                self.body = Cow::Owned(String::default());
290            }
291        }
292
293        Some(ret)
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use simple_logger;
301    use std::str::FromStr;
302
303    #[test]
304    fn batch_parse_none() {
305        let mut b1 = Batch::parse("");
306
307        assert!(b1.next().is_none());
308        assert!(b1.next().is_none());
309    }
310
311    #[test]
312    fn batch_parse_no_hdr() {
313        let mut b1 = Batch::parse("*a#a@a:0,");
314
315        assert!(b1.next().is_some());
316        assert!(b1.next().is_none());
317    }
318
319    #[test]
320    fn batch_parse_multi() {
321        let b1 = Batch::parse("*lww#test@0:0! @1:key'value' @2:number=1 *rga#text@3:0'T'! *rga#text@6:3, @4'e' @5'x' @6't' *lww#more:a=1;.");
322
323        for frm in b1.clone() {
324            println!("frm {}", frm.body());
325        }
326        assert_eq!(b1.count(), 3);
327    }
328
329    #[test]
330    fn batch_reduce_all() {
331        use std::io::Cursor;
332        use std::str;
333
334        let _ = simple_logger::init_with_level(log::Level::Trace);
335
336        let b = Batch::parse(include_str!(concat!(
337            env!("CARGO_MANIFEST_DIR"),
338            "/tests/batch-test"
339        )));
340        let mut c = Cursor::new(Vec::default());
341        b.reduce_all(&mut c).unwrap();
342
343        let s = c.into_inner();
344        println!("{}", str::from_utf8(&s).unwrap());
345    }
346
347    #[test]
348    fn index_one_obj() {
349        let b1 = Batch::parse(
350            "*lww#test@0:0! @1:key'value' *lww#test@2:0! @3:number=1",
351        );
352        let idx = b1.index().unwrap();
353        let obj = UUID::from_str("test").unwrap();
354        let ty = UUID::from_str("lww").unwrap();
355
356        assert_eq!(idx.len(), 1);
357        assert_eq!(idx[&obj].0, ty);
358        assert_eq!(idx[&obj].1.len(), 2);
359    }
360
361    #[test]
362    fn index_multiple_obj() {
363        let b1 = Batch::parse("*lww#test@0:0! @1:key'value' @2:number=1 *rga#text@3:0'T'! *rga#text@6:3, @4'e' @5'x' @6't' *lww#more:a=1;.");
364        let idx = b1.index().unwrap();
365        let obj1 = UUID::from_str("test").unwrap();
366        let obj2 = UUID::from_str("text").unwrap();
367        let ty1 = UUID::from_str("lww").unwrap();
368        let ty2 = UUID::from_str("rga").unwrap();
369
370        assert_eq!(idx.len(), 2);
371        assert_eq!(idx[&obj1].0, ty1);
372        assert_eq!(idx[&obj1].1.len(), 1);
373        assert_eq!(idx[&obj2].0, ty2);
374        assert_eq!(idx[&obj2].1.len(), 1);
375    }
376
377    #[test]
378    fn index_diff_type() {
379        let b1 = Batch::parse(
380            "*lww#test@0:0! @1:key'value' *rga#test@2:0! @3:number=1",
381        );
382
383        assert!(b1.index().is_none());
384    }
385
386    #[test]
387    fn index_empty_batch() {
388        let b1 = Batch::parse("");
389        let idx = b1.index().unwrap();
390
391        assert!(idx.is_empty());
392    }
393}