streamson_lib/strategy/
extract.rs

1//! The main logic of JSON extracting
2//!
3//! It uses matchers to extract a parts of JSON.
4//! Nested matches have no meaning here
5
6use crate::{
7    error,
8    handler::Handler,
9    matcher::Matcher,
10    path::Path,
11    streamer::{Streamer, Token},
12};
13use std::sync::{Arc, Mutex};
14
15use super::{Output, Strategy};
16
17type MatcherItem = (Box<dyn Matcher>, Option<Arc<Mutex<dyn Handler>>>);
18
19pub struct Extract {
20    /// Export path as well
21    export_path: bool,
22    /// Input idx against total idx
23    input_start: usize,
24    /// What is currently matched - path and indexes to matchers
25    matches: Option<(Path, Vec<usize>)>,
26    /// Path matchers
27    matchers: Vec<MatcherItem>,
28    /// Creates to token stream
29    streamer: Streamer,
30    /// Current json level
31    level: usize,
32}
33
34impl Default for Extract {
35    fn default() -> Self {
36        Self {
37            export_path: false,
38            input_start: 0,
39            matches: None,
40            matchers: vec![],
41            streamer: Streamer::new(),
42            level: 0,
43        }
44    }
45}
46
47impl Strategy for Extract {
48    fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General> {
49        self.streamer.feed(input);
50
51        let mut input_idx = 0;
52
53        let mut result = vec![];
54        loop {
55            match self.streamer.read()? {
56                Token::Start(idx, kind) => {
57                    self.level += 1;
58                    if self.matches.is_none() {
59                        let path = self.streamer.current_path();
60
61                        // try to check whether it matches
62                        let mut matched_indexes = vec![];
63                        for (matcher_idx, (matcher, _handler)) in self.matchers.iter().enumerate() {
64                            if matcher.match_path(path, kind) {
65                                matched_indexes.push(matcher_idx);
66                            }
67                        }
68                        if !matched_indexes.is_empty() {
69                            // New match appears here
70                            input_idx = idx - self.input_start;
71                            for matcher_idx in &matched_indexes {
72                                if let Some(handler) = self.matchers[*matcher_idx].1.as_ref() {
73                                    let mut guard = handler.lock().unwrap();
74                                    // triger handlers start
75                                    guard.start(path, *matcher_idx, Token::Start(idx, kind))?;
76                                }
77                            }
78                            self.matches = Some((path.clone(), matched_indexes));
79
80                            // Set output
81                            result.push(Output::Start(if self.export_path {
82                                Some(path.clone())
83                            } else {
84                                None
85                            }));
86                        }
87                    }
88                }
89                Token::Pending => {
90                    if let Some((_, matched_indexes)) = self.matches.as_ref() {
91                        for matcher_idx in matched_indexes {
92                            if let Some(handler) = self.matchers[*matcher_idx].1.as_ref() {
93                                let mut guard = handler.lock().unwrap();
94                                // feed handlers
95                                guard.feed(&input[input_idx..], *matcher_idx)?;
96                            }
97                        }
98                        result.push(Output::Data(input[input_idx..].to_vec()));
99                    }
100                    self.input_start += input.len();
101                    return Ok(result);
102                }
103                Token::End(idx, kind) => {
104                    self.level -= 1;
105                    if let Some((path, matched_indexes)) = self.matches.as_ref() {
106                        // Put the data to results
107                        if path == self.streamer.current_path() {
108                            let old_idx = input_idx;
109                            input_idx = idx - self.input_start;
110                            result.push(Output::Data(input[old_idx..input_idx].to_vec()));
111                            result.push(Output::End);
112                            // Feed and end handlers
113                            for matcher_idx in matched_indexes {
114                                if let Some(handler) = self.matchers[*matcher_idx].1.as_ref() {
115                                    let mut guard = handler.lock().unwrap();
116                                    // feed handlers
117                                    guard.feed(&input[old_idx..input_idx], *matcher_idx)?;
118                                    guard.end(&path, *matcher_idx, Token::End(idx, kind))?;
119                                }
120                            }
121                            self.matches = None;
122                        }
123                    }
124                }
125                _ => {}
126            }
127        }
128    }
129
130    fn terminate(&mut self) -> Result<Vec<Output>, error::General> {
131        if self.level == 0 {
132            Ok(vec![])
133        } else {
134            Err(error::InputTerminated::new(self.input_start).into())
135        }
136    }
137}
138
139impl Extract {
140    /// Creates a new `Extract`
141    ///
142    /// It exracts matched data parts (not nested)
143    pub fn new() -> Self {
144        Self::default()
145    }
146
147    /// Sets whether matched path should be exported with data
148    /// Output data will be enriched with the path from were the data
149    /// were extracted
150    ///
151    /// if path is not exported extraction can be a bit faster
152    pub fn set_export_path(mut self, export: bool) -> Self {
153        self.export_path = export;
154        self
155    }
156
157    /// Adds new matcher for data extraction
158    ///
159    /// # Arguments
160    /// * `matcher` - matcher which matches the path
161    /// * `handler` - optinal handler to be used to process data
162    ///
163    /// # Example
164    ///
165    /// ```
166    /// use streamson_lib::{strategy, matcher};
167    /// use std::sync::{Arc, Mutex};
168    ///
169    /// let mut extract = strategy::Extract::new();
170    /// let matcher = matcher::Simple::new(r#"{"list"}[]"#).unwrap();
171    /// let mut extract = strategy::Extract::new();
172    /// extract.add_matcher(
173    ///     Box::new(matcher),
174    ///     None,
175    /// );
176    /// ```
177    pub fn add_matcher(
178        &mut self,
179        matcher: Box<dyn Matcher>,
180        handler: Option<Arc<Mutex<dyn Handler>>>,
181    ) {
182        self.matchers.push((matcher, handler));
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::{Extract, Output, Strategy};
189    use crate::{
190        handler::Buffer,
191        matcher::Simple,
192        path::Path,
193        test::{Single, Splitter, Window},
194    };
195    use rstest::*;
196    use std::{
197        convert::TryFrom,
198        sync::{Arc, Mutex},
199    };
200
201    fn get_input() -> Vec<u8> {
202        br#"{"users": [{"name": "fred"}, {"name": "bob"}], "groups": [{"name": "admins"}]}"#
203            .to_vec()
204    }
205
206    #[test]
207    fn flat() {
208        // without path
209        let input = get_input();
210        let matcher = Simple::new(r#"{}[]{"name"}"#).unwrap();
211
212        let mut extract = Extract::new();
213        extract.add_matcher(Box::new(matcher.clone()), None);
214
215        let output = extract.process(&input).unwrap();
216        assert_eq!(output.len(), 9);
217        assert_eq!(output[0], Output::Start(None));
218        assert_eq!(output[1], Output::Data(br#""fred""#.to_vec()));
219        assert_eq!(output[2], Output::End);
220        assert_eq!(output[3], Output::Start(None));
221        assert_eq!(output[4], Output::Data(br#""bob""#.to_vec()));
222        assert_eq!(output[5], Output::End);
223        assert_eq!(output[6], Output::Start(None));
224        assert_eq!(output[7], Output::Data(br#""admins""#.to_vec()));
225        assert_eq!(output[8], Output::End);
226
227        // with path
228        let input = get_input();
229        let mut extract = Extract::new().set_export_path(true);
230        extract.add_matcher(Box::new(matcher), None);
231        let output = extract.process(&input).unwrap();
232        assert_eq!(output.len(), 9);
233        assert_eq!(
234            output[0],
235            Output::Start(Some(Path::try_from(r#"{"users"}[0]{"name"}"#).unwrap()))
236        );
237        assert_eq!(output[1], Output::Data(br#""fred""#.to_vec()));
238        assert_eq!(output[2], Output::End);
239        assert_eq!(
240            output[3],
241            Output::Start(Some(Path::try_from(r#"{"users"}[1]{"name"}"#).unwrap()))
242        );
243        assert_eq!(output[4], Output::Data(br#""bob""#.to_vec()));
244        assert_eq!(output[5], Output::End);
245        assert_eq!(
246            output[6],
247            Output::Start(Some(Path::try_from(r#"{"groups"}[0]{"name"}"#).unwrap()))
248        );
249        assert_eq!(output[7], Output::Data(br#""admins""#.to_vec()));
250        assert_eq!(output[8], Output::End);
251    }
252
253    #[test]
254    fn nested() {
255        let input = get_input();
256        let matcher = Simple::new(r#"{}[1]"#).unwrap();
257
258        let mut extract = Extract::new();
259        extract.add_matcher(Box::new(matcher), None);
260
261        let output = extract.process(&input).unwrap();
262        assert_eq!(output.len(), 3);
263        assert_eq!(output[0], Output::Start(None));
264        assert_eq!(output[1], Output::Data(br#"{"name": "bob"}"#.to_vec()));
265        assert_eq!(output[2], Output::End);
266    }
267
268    #[test]
269    fn pending() {
270        let input = get_input();
271        let input1 = &input[0..37];
272        let input2 = &input[37..];
273
274        let matcher = Simple::new(r#"{}[1]"#).unwrap();
275
276        let mut extract = Extract::new();
277        extract.add_matcher(Box::new(matcher), None);
278
279        let output = extract.process(input1).unwrap();
280        assert_eq!(output.len(), 2);
281        assert_eq!(output[0], Output::Start(None));
282        assert_eq!(output[1], Output::Data(br#"{"name":"#.to_vec()));
283
284        let output = extract.process(input2).unwrap();
285        assert_eq!(output.len(), 2);
286        assert_eq!(output[0], Output::Data(br#" "bob"}"#.to_vec()));
287        assert_eq!(output[1], Output::End);
288    }
289
290    #[test]
291    fn pending_handlers() {
292        let input = get_input();
293        let input1 = &input[0..37];
294        let input2 = &input[37..];
295
296        let matcher = Simple::new(r#"{}[1]"#).unwrap();
297        let buffer_handler = Arc::new(Mutex::new(Buffer::new().set_max_buffer_size(Some(22))));
298
299        let mut extract = Extract::new();
300        extract.add_matcher(Box::new(matcher), Some(buffer_handler.clone()));
301
302        let output = extract.process(input1).unwrap();
303        assert_eq!(output.len(), 2);
304        assert_eq!(output[0], Output::Start(None));
305        assert_eq!(output[1], Output::Data(br#"{"name":"#.to_vec()));
306
307        let output = extract.process(input2).unwrap();
308        assert_eq!(output.len(), 2);
309        assert_eq!(output[0], Output::Data(br#" "bob"}"#.to_vec()));
310        assert_eq!(output[1], Output::End);
311
312        assert_eq!(
313            buffer_handler.lock().unwrap().pop().unwrap(),
314            (None, br#"{"name": "bob"}"#.to_vec())
315        );
316    }
317
318    #[rstest(
319        splitter,
320        case::single(Box::new(Single::new())),
321        case::window1(Box::new(Window::new(1))),
322        case::window5(Box::new(Window::new(5))),
323        case::window100(Box::new(Window::new(100)))
324    )]
325    fn splitters(splitter: Box<dyn Splitter>) {
326        for parts in splitter.split(get_input()) {
327            let matcher = Simple::new(r#"{}[]{"name"}"#).unwrap();
328
329            let mut extract = Extract::new();
330            extract.add_matcher(Box::new(matcher.clone()), None);
331
332            let mut res = vec![];
333            for part in parts {
334                let output = extract.process(&part).unwrap();
335                for e in output {
336                    match e {
337                        Output::Data(data) => res.extend(data),
338                        _ => {}
339                    }
340                }
341            }
342            assert_eq!(String::from_utf8(res).unwrap(), r#""fred""bob""admins""#)
343        }
344    }
345}