1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::io::{self, Write};
6use std::ops::Range;
7
8use uuid::UUID;
9
10use crate::{scan_for_float, scan_for_integer, scan_for_string, Frame};
11
12#[derive(Clone, Debug)]
14pub struct Batch<'a> {
15 body: Cow<'a, str>,
16 next: Option<Range<usize>>,
17}
18
19impl<'a> Batch<'a> {
20 pub fn parse<S>(s: S) -> Batch<'a>
22 where
23 S: Into<Cow<'a, str>>,
24 {
25 let b = s.into();
26 let mut n = Batch::scan(&b[..]);
27
28 if let Some(rgn) = n.clone() {
29 if rgn.start == 0 {
30 n = Batch::scan(&b[rgn.end..])
31 .map(|x| (x.start + rgn.end)..(x.end + rgn.end));
32 }
33 }
34
35 Batch { body: b, next: n }
36 }
37
38 pub fn index(self) -> Option<HashMap<UUID, (UUID, Vec<Frame<'a>>)>> {
41 use crate::Op;
42
43 let mut index = HashMap::<UUID, (UUID, Vec<Frame<'a>>)>::default();
44
45 for frm in self {
46 match frm.peek().cloned() {
47 Some(Op { ty, object, .. }) => {
48 let ent = index
49 .entry(object)
50 .or_insert_with(|| (ty.clone(), Default::default()));
51
52 if ent.0 == ty {
53 ent.1.push(frm);
54 } else {
55 error!(
56 "miss matched type/object pair: {} vs. {} for object {}",
57 ent.0, ty, object
58 );
59 return None;
60 }
61 }
62 None => {}
63 }
64 }
65
66 Some(index)
67 }
68
69 pub fn reduce_all<W>(self, mut out: W) -> io::Result<()>
71 where
72 W: Write,
73 {
74 use crate::{Set, CRDT, LWW};
75 use std::io::{Error, ErrorKind};
76 use std::str::FromStr;
77
78 let index = self
79 .index()
80 .ok_or(Error::new(ErrorKind::Other, "indexing failed"))?;
81 let lww = UUID::from_str("lww").unwrap();
82 let set = UUID::from_str("set").unwrap();
83
84 for (_, (ty, mut frames)) in index {
85 match frames.len() {
86 0 => {}
87 1 => {
88 out.write_all(frames[0].body().as_bytes())?;
89 }
90 _ => {
91 let s = frames.pop().unwrap();
92 let state = if ty == lww {
93 LWW::reduce(s, frames)
94 } else if ty == set {
95 Set::reduce(s, frames)
96 } else {
97 warn!("unknown type {}", ty);
98
99 out.write_all(s.body().as_bytes())?;
100 for frm in frames {
101 out.write_all(frm.body().as_bytes())?;
102 }
103 continue;
104 };
105
106 match state {
107 Some(state) => {
108 out.write_all(state.body().as_bytes())?;
109 }
110 None => {}
111 }
112 }
113 }
114 }
115
116 Ok(())
117 }
118
119 fn scan(s: &str) -> Option<Range<usize>> {
120 #[derive(PartialEq, Eq, Clone, Copy, Debug)]
121 enum Scan {
122 Initial,
123 SawType,
124 SawObject,
125 SawEvent,
126 SawLoc,
127 };
128 let mut pos = 0usize;
129
130 loop {
131 let start = pos;
132 let mut state = Scan::Initial;
133
134 loop {
136 match (state, s.get(pos..pos + 1)) {
137 (Scan::Initial, Some("*")) => {
138 state = Scan::SawType;
139 pos += Self::scan_uuid(&s[pos + 1..]) + 1;
140 }
141 (Scan::Initial, Some("#")) | (Scan::SawType, Some("#")) => {
142 state = Scan::SawObject;
143 pos += Self::scan_uuid(&s[pos + 1..]) + 1;
144 }
145 (Scan::Initial, Some("@"))
146 | (Scan::SawType, Some("@"))
147 | (Scan::SawObject, Some("@")) => {
148 state = Scan::SawEvent;
149 pos += Self::scan_uuid(&s[pos + 1..]) + 1;
150 }
151 (Scan::Initial, Some(":"))
152 | (Scan::SawType, Some(":"))
153 | (Scan::SawObject, Some(":"))
154 | (Scan::SawEvent, Some(":")) => {
155 state = Scan::SawLoc;
156 pos += Self::scan_uuid(&s[pos + 1..]) + 1;
157 }
158 (_, Some(x)) => {
159 if x.chars().next().unwrap().is_whitespace() {
160 pos += 1;
161 } else {
162 break;
163 }
164 }
165 _ => {
166 break;
167 }
168 }
169 }
170
171 if state == Scan::Initial {
172 return None;
173 }
174
175 loop {
177 match s.get(pos..pos + 1) {
178 Some("=") => {
180 pos += scan_for_integer(&s[pos + 1..]).unwrap_or(0) + 1;
181 }
182 Some("^") => {
183 pos += scan_for_float(&s[pos + 1..]).unwrap_or(0) + 1;
184 }
185 Some(">") => {
186 pos += Self::scan_uuid(&s[pos + 1..]) + 1;
187 }
188 Some("\'") => {
189 pos += scan_for_string(&s[pos + 1..]).unwrap_or(0) + 2;
190 }
191
192 Some("?") | Some(",") => {
194 pos += 1;
195 break;
196 }
197
198 Some("!") | Some(";") => {
199 return Some(start..pos + 1);
200 }
201
202 Some("*") | Some("#") | Some("@") | Some(":") => break,
204
205 Some(x) => {
207 if x.chars().next().unwrap().is_whitespace() {
208 pos += 1;
209 } else {
210 return None;
211 }
212 }
213
214 _ => {
215 return None;
216 }
217 }
218 }
219 }
220 }
221
222 fn scan_uuid(s: &str) -> usize {
223 let mut ret = 0;
224
225 while let Some(ch) = s.get(ret..ret + 1) {
226 let is_uuid_ch =
227 ch.chars().next().map(|x| x.is_digit(36)).unwrap_or(false)
228 || ch == "~"
229 || ch == "_"
230 || ch == "-"
231 || ch == "+"
232 || ch == "%"
233 || ch == "("
234 || ch == "{"
235 || ch == "["
236 || ch == ")"
237 || ch == "}"
238 || ch == "]";
239
240 if !is_uuid_ch {
241 return ret;
242 }
243
244 ret += 1;
245 }
246
247 ret
248 }
249}
250
251impl<'a> Iterator for Batch<'a> {
252 type Item = Frame<'a>;
253
254 fn next(&mut self) -> Option<Frame<'a>> {
255 if self.body.is_empty() || self.body.starts_with(".") {
256 return None;
257 }
258
259 let p = self.next.take();
260 let end = p.clone().map(|x| x.start).unwrap_or(self.body.len());
261 let ret = match &mut self.body {
262 &mut Cow::Borrowed(s) => Frame::parse(&s[..end]),
263 &mut Cow::Owned(ref mut s) => Frame::parse(s[..end].to_string()),
264 };
265
266 match p {
267 Some(rgn) => {
268 let start = rgn.start;
269 let end = rgn.end;
270
271 self.next = Batch::scan(&self.body[end..]).map(|x| {
272 let l = end - start;
273 (x.start + l)..(x.end + l)
274 });
275
276 match &mut self.body {
277 b @ &mut Cow::Borrowed(_) => {
278 let s = match b {
279 &mut Cow::Borrowed(s) => &s[start..],
280 _ => unreachable!(),
281 };
282
283 *b = Cow::Borrowed(s);
284 }
285 &mut Cow::Owned(ref mut s) => s.replace_range(0..start, ""),
286 }
287 }
288 None => {
289 self.body = Cow::Owned(String::default());
290 }
291 }
292
293 Some(ret)
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use simple_logger;
301 use std::str::FromStr;
302
303 #[test]
304 fn batch_parse_none() {
305 let mut b1 = Batch::parse("");
306
307 assert!(b1.next().is_none());
308 assert!(b1.next().is_none());
309 }
310
311 #[test]
312 fn batch_parse_no_hdr() {
313 let mut b1 = Batch::parse("*a#a@a:0,");
314
315 assert!(b1.next().is_some());
316 assert!(b1.next().is_none());
317 }
318
319 #[test]
320 fn batch_parse_multi() {
321 let b1 = Batch::parse("*lww#test@0:0! @1:key'value' @2:number=1 *rga#text@3:0'T'! *rga#text@6:3, @4'e' @5'x' @6't' *lww#more:a=1;.");
322
323 for frm in b1.clone() {
324 println!("frm {}", frm.body());
325 }
326 assert_eq!(b1.count(), 3);
327 }
328
329 #[test]
330 fn batch_reduce_all() {
331 use std::io::Cursor;
332 use std::str;
333
334 let _ = simple_logger::init_with_level(log::Level::Trace);
335
336 let b = Batch::parse(include_str!(concat!(
337 env!("CARGO_MANIFEST_DIR"),
338 "/tests/batch-test"
339 )));
340 let mut c = Cursor::new(Vec::default());
341 b.reduce_all(&mut c).unwrap();
342
343 let s = c.into_inner();
344 println!("{}", str::from_utf8(&s).unwrap());
345 }
346
347 #[test]
348 fn index_one_obj() {
349 let b1 = Batch::parse(
350 "*lww#test@0:0! @1:key'value' *lww#test@2:0! @3:number=1",
351 );
352 let idx = b1.index().unwrap();
353 let obj = UUID::from_str("test").unwrap();
354 let ty = UUID::from_str("lww").unwrap();
355
356 assert_eq!(idx.len(), 1);
357 assert_eq!(idx[&obj].0, ty);
358 assert_eq!(idx[&obj].1.len(), 2);
359 }
360
361 #[test]
362 fn index_multiple_obj() {
363 let b1 = Batch::parse("*lww#test@0:0! @1:key'value' @2:number=1 *rga#text@3:0'T'! *rga#text@6:3, @4'e' @5'x' @6't' *lww#more:a=1;.");
364 let idx = b1.index().unwrap();
365 let obj1 = UUID::from_str("test").unwrap();
366 let obj2 = UUID::from_str("text").unwrap();
367 let ty1 = UUID::from_str("lww").unwrap();
368 let ty2 = UUID::from_str("rga").unwrap();
369
370 assert_eq!(idx.len(), 2);
371 assert_eq!(idx[&obj1].0, ty1);
372 assert_eq!(idx[&obj1].1.len(), 1);
373 assert_eq!(idx[&obj2].0, ty2);
374 assert_eq!(idx[&obj2].1.len(), 1);
375 }
376
377 #[test]
378 fn index_diff_type() {
379 let b1 = Batch::parse(
380 "*lww#test@0:0! @1:key'value' *rga#test@2:0! @3:number=1",
381 );
382
383 assert!(b1.index().is_none());
384 }
385
386 #[test]
387 fn index_empty_batch() {
388 let b1 = Batch::parse("");
389 let idx = b1.index().unwrap();
390
391 assert!(idx.is_empty());
392 }
393}