streamson_lib/strategy/
filter.rs

1//! The main logic of JSON filtering
2//!
3//! It uses matchers and filters matched parts
4//! from output
5
6use std::{
7    collections::VecDeque,
8    mem::swap,
9    sync::{Arc, Mutex},
10};
11
12use crate::{
13    error,
14    handler::Handler,
15    matcher::Matcher,
16    path::Path,
17    streamer::{Streamer, Token},
18};
19
20use super::{Output, Strategy};
21
22type MatcherItem = (Box<dyn Matcher>, Option<Arc<Mutex<dyn Handler>>>);
23
24/// Processes data from input and remove matched parts (and keeps the json valid)
25pub struct Filter {
26    /// Input idx against total idx
27    input_start: usize,
28    /// Buffer idx against total idx
29    buffer_idx: usize,
30    /// Buffer use for input buffering
31    buffer: VecDeque<u8>,
32    /// Responsible for data extraction
33    streamer: Streamer,
34    /// Matchers which will cause filtering
35    matchers: Vec<MatcherItem>,
36    /// What is currently matched - path and indexes to matchers
37    matches: Option<(Path, Vec<usize>)>,
38    /// Path which data were written to stream for the last time
39    last_streaming_path: Option<Path>,
40    /// Current json level
41    level: usize,
42}
43
44impl Default for Filter {
45    fn default() -> Self {
46        Self {
47            input_start: 0,
48            buffer_idx: 0,
49            buffer: VecDeque::new(),
50            matchers: vec![],
51            streamer: Streamer::new(),
52            matches: None,
53            last_streaming_path: None,
54            level: 0,
55        }
56    }
57}
58
59impl Strategy for Filter {
60    fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General> {
61        // Feed the streamer
62        self.streamer.feed(input);
63
64        // Feed the input buffer
65        self.buffer.extend(input);
66
67        // initialize result
68        let mut result = Vec::new();
69
70        // Finish skip
71
72        loop {
73            match self.streamer.read()? {
74                Token::Start(idx, kind) => {
75                    if self.level == 0 {
76                        result.push(Output::Start(None));
77                    }
78                    self.level += 1;
79                    if let Some((path, matched_indexes)) = self.matches.take() {
80                        let data = self.move_forward(idx);
81                        self.feed_handlers(&matched_indexes, data)?;
82                        self.matches = Some((path, matched_indexes));
83                    } else {
84                        // The path is not matched yet
85                        let current_path = self.streamer.current_path().clone();
86
87                        // Try to match current path
88                        let matcher_indexes: Vec<usize> = self
89                            .matchers
90                            .iter()
91                            .enumerate()
92                            .map(|(idx, matcher)| (idx, matcher.0.match_path(&current_path, kind)))
93                            .filter(|(_, matched)| *matched)
94                            .map(|(idx, _)| idx)
95                            .collect();
96
97                        if !matcher_indexes.is_empty() {
98                            // Trigger handlers start
99                            self.start_handlers(
100                                &current_path,
101                                &matcher_indexes,
102                                Token::Start(idx, kind),
103                            )?;
104                            self.matches = Some((current_path, matcher_indexes));
105                            self.move_forward(idx); // discard e.g. '"key": '
106                        } else {
107                            // no match here -> extend output
108                            self.last_streaming_path = Some(current_path);
109                            result
110                                .push(Output::Data(self.move_forward(idx + 1).drain(..).collect()));
111                        }
112                    }
113                }
114                Token::End(idx, kind) => {
115                    self.level -= 1;
116                    if let Some((path, matched_indexes)) = self.matches.take() {
117                        // Trigger handler feed
118                        let data = self.move_forward(idx);
119                        self.feed_handlers(&matched_indexes, data)?;
120
121                        if &path == self.streamer.current_path() {
122                            // Trigger handlers end
123                            self.end_handlers(&path, &matched_indexes, Token::End(idx, kind))?;
124                        } else {
125                            self.matches = Some((path, matched_indexes));
126                        }
127                    } else {
128                        self.last_streaming_path = Some(self.streamer.current_path().clone());
129                        result.push(Output::Data(self.move_forward(idx).drain(..).collect()));
130                    }
131                    if self.level == 0 {
132                        result.push(Output::End);
133                    }
134                }
135                Token::Pending => {
136                    self.input_start += input.len();
137                    return Ok(result);
138                }
139                Token::Separator(idx) => {
140                    if let Some(path) = self.last_streaming_path.as_ref() {
141                        if self.streamer.current_path() == path {
142                            // removing ',' if the first record from array / object was deleted
143                            self.move_forward(idx + 1);
144                        }
145                    }
146                }
147            }
148        }
149    }
150
151    fn terminate(&mut self) -> Result<Vec<Output>, error::General> {
152        if self.level == 0 {
153            Ok(vec![])
154        } else {
155            Err(error::InputTerminated::new(self.input_start).into())
156        }
157    }
158}
159
160impl Filter {
161    /// Create new filter
162    ///
163    /// It removes matched parts of the input
164    pub fn new() -> Self {
165        Self::default()
166    }
167
168    /// Split working buffer and return the removed part
169    ///
170    /// # Arguments
171    /// * `idx` - total idx to split
172    fn move_forward(&mut self, idx: usize) -> VecDeque<u8> {
173        let mut splitted = self.buffer.split_off(idx - self.buffer_idx);
174
175        // Swap to return cut part
176        swap(&mut self.buffer, &mut splitted);
177
178        self.buffer_idx = idx;
179
180        splitted
181    }
182
183    /// Adds new matcher into filtering
184    ///
185    /// # Arguments
186    /// * `matcher` - matcher which matches the path
187    /// * `handler` - optinal handler to be used to process data
188    ///
189    /// # Example
190    ///
191    /// ```
192    /// use streamson_lib::{strategy, matcher};
193    /// use std::sync::{Arc, Mutex};
194    ///
195    /// let mut filter = strategy::Filter::new();
196    /// let matcher = matcher::Simple::new(r#"{"list"}[]"#).unwrap();
197    /// filter.add_matcher(
198    ///     Box::new(matcher),
199    ///     None,
200    /// );
201    /// ```
202    pub fn add_matcher(
203        &mut self,
204        matcher: Box<dyn Matcher>,
205        handler: Option<Arc<Mutex<dyn Handler>>>,
206    ) {
207        self.matchers.push((matcher, handler));
208    }
209
210    fn start_handlers(
211        &self,
212        path: &Path,
213        matched_indexes: &[usize],
214        token: Token,
215    ) -> Result<(), error::General> {
216        for (matcher_idx, handler) in matched_indexes
217            .iter()
218            .filter(|idx| self.matchers[**idx].1.is_some())
219            .map(|idx| (idx, self.matchers[*idx].1.as_ref().unwrap()))
220        {
221            let mut guard = handler.lock().unwrap();
222            guard.start(&path, *matcher_idx, token.clone())?;
223        }
224        Ok(())
225    }
226
227    fn feed_handlers(
228        &self,
229        matched_indexes: &[usize],
230        data: VecDeque<u8>,
231    ) -> Result<(), error::General> {
232        let (first, second) = data.as_slices();
233        for (matcher_idx, handler) in matched_indexes
234            .iter()
235            .filter(|idx| self.matchers[**idx].1.is_some())
236            .map(|idx| (idx, self.matchers[*idx].1.as_ref().unwrap()))
237        {
238            let mut guard = handler.lock().unwrap();
239            guard.feed(first, *matcher_idx)?;
240            guard.feed(second, *matcher_idx)?;
241        }
242        Ok(())
243    }
244
245    fn end_handlers(
246        &self,
247        path: &Path,
248        matched_indexes: &[usize],
249        token: Token,
250    ) -> Result<(), error::General> {
251        // Trigger handlers start
252        for (matcher_idx, handler) in matched_indexes
253            .iter()
254            .filter(|idx| self.matchers[**idx].1.is_some())
255            .map(|idx| (idx, self.matchers[*idx].1.as_ref().unwrap()))
256        {
257            let mut guard = handler.lock().unwrap();
258            guard.end(&path, *matcher_idx, token.clone())?;
259        }
260        Ok(())
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::{Filter, Strategy};
267    use crate::{
268        matcher::{Combinator, Simple},
269        strategy::OutputConverter,
270        test::{Single, Splitter, Window},
271    };
272    use rstest::*;
273
274    fn get_input() -> Vec<u8> {
275        br#"{"users": [{"uid": 1}, {"uid": 2}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
276                .to_vec()
277    }
278
279    #[test]
280    fn single_matcher_no_match() {
281        let input = get_input();
282
283        let matcher = Simple::new(r#"{"no-existing"}[]{"uid"}"#).unwrap();
284        let mut filter = Filter::new();
285        filter.add_matcher(Box::new(matcher), None);
286
287        assert_eq!(
288            OutputConverter::new()
289                .convert(&filter.process(&input).unwrap())
290                .into_iter()
291                .map(|e| e.1)
292                .flatten()
293                .collect::<Vec<u8>>(),
294            input.clone()
295        );
296    }
297
298    #[test]
299    fn single_matcher_array_first() {
300        let input = get_input();
301        let matcher = Simple::new(r#"{"users"}[0]"#).unwrap();
302
303        let mut filter = Filter::new();
304        filter.add_matcher(Box::new(matcher), None);
305
306        assert_eq!(
307            String::from_utf8(
308                OutputConverter::new()
309                    .convert(&filter.process(&input).unwrap())
310                    .into_iter()
311                    .map(|e| e.1)
312                    .flatten()
313                    .collect()
314            )
315            .unwrap(),
316            r#"{"users": [ {"uid": 2}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
317        );
318    }
319
320    #[test]
321    fn single_matcher_array_last() {
322        let input = get_input();
323        let matcher = Simple::new(r#"{"users"}[2]"#).unwrap();
324
325        let mut filter = Filter::new();
326        filter.add_matcher(Box::new(matcher), None);
327
328        assert_eq!(
329            String::from_utf8(
330                OutputConverter::new()
331                    .convert(&filter.process(&input).unwrap())
332                    .into_iter()
333                    .map(|e| e.1)
334                    .flatten()
335                    .collect()
336            )
337            .unwrap(),
338            r#"{"users": [{"uid": 1}, {"uid": 2}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
339        );
340    }
341
342    #[test]
343    fn single_matcher_array_middle() {
344        let input = get_input();
345        let matcher = Simple::new(r#"{"users"}[1]"#).unwrap();
346
347        let mut filter = Filter::new();
348        filter.add_matcher(Box::new(matcher), None);
349
350        assert_eq!(
351            String::from_utf8(
352                OutputConverter::new()
353                    .convert(&filter.process(&input).unwrap())
354                    .into_iter()
355                    .map(|e| e.1)
356                    .flatten()
357                    .collect()
358            )
359            .unwrap(),
360            r#"{"users": [{"uid": 1}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
361        );
362    }
363
364    #[test]
365    fn single_matcher_array_all() {
366        let input = get_input();
367        let matcher = Simple::new(r#"{"users"}[]"#).unwrap();
368
369        let mut filter = Filter::new();
370        filter.add_matcher(Box::new(matcher), None);
371
372        assert_eq!(
373            String::from_utf8(
374                OutputConverter::new()
375                    .convert(&filter.process(&input).unwrap())
376                    .into_iter()
377                    .map(|e| e.1)
378                    .flatten()
379                    .collect()
380            )
381            .unwrap(),
382            r#"{"users": [], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
383        );
384    }
385
386    #[test]
387    fn single_matcher_object_first() {
388        let input = get_input();
389        let matcher = Simple::new(r#"{"users"}"#).unwrap();
390
391        let mut filter = Filter::new();
392        filter.add_matcher(Box::new(matcher), None);
393
394        assert_eq!(
395            String::from_utf8(
396                OutputConverter::new()
397                    .convert(&filter.process(&input).unwrap())
398                    .into_iter()
399                    .map(|e| e.1)
400                    .flatten()
401                    .collect()
402            )
403            .unwrap(),
404            r#"{ "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
405        );
406    }
407
408    #[test]
409    fn single_matcher_object_last() {
410        let input = get_input();
411        let matcher = Simple::new(r#"{"void"}"#).unwrap();
412
413        let mut filter = Filter::new();
414        filter.add_matcher(Box::new(matcher), None);
415
416        assert_eq!(
417            String::from_utf8(
418                OutputConverter::new()
419                    .convert(&filter.process(&input).unwrap())
420                    .into_iter()
421                    .map(|e| e.1)
422                    .flatten()
423                    .collect()
424            )
425            .unwrap(),
426            r#"{"users": [{"uid": 1}, {"uid": 2}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}]}"#
427        );
428    }
429
430    #[test]
431    fn single_matcher_object_middle() {
432        let input = get_input();
433        let matcher = Simple::new(r#"{"groups"}"#).unwrap();
434
435        let mut filter = Filter::new();
436        filter.add_matcher(Box::new(matcher), None);
437
438        assert_eq!(
439            String::from_utf8(
440                OutputConverter::new()
441                    .convert(&filter.process(&input).unwrap())
442                    .into_iter()
443                    .map(|e| e.1)
444                    .flatten()
445                    .collect()
446            )
447            .unwrap(),
448            r#"{"users": [{"uid": 1}, {"uid": 2}, {"uid": 3}], "void": {}}"#
449        );
450    }
451
452    #[test]
453    fn single_matcher_object_all() {
454        let input = get_input();
455        let matcher = Simple::new(r#"{}"#).unwrap();
456
457        let mut filter = Filter::new();
458        filter.add_matcher(Box::new(matcher), None);
459
460        assert_eq!(
461            String::from_utf8(
462                OutputConverter::new()
463                    .convert(&filter.process(&input).unwrap())
464                    .into_iter()
465                    .map(|e| e.1)
466                    .flatten()
467                    .collect()
468            )
469            .unwrap(),
470            r#"{}"#
471        );
472    }
473
474    #[rstest(
475        splitter,
476        case::single(Box::new(Single::new())),
477        case::window1(Box::new(Window::new(1))),
478        case::window5(Box::new(Window::new(5))),
479        case::window100(Box::new(Window::new(100)))
480    )]
481    fn combinator_slices(splitter: Box<dyn Splitter>) {
482        let input = get_input();
483        for parts in splitter.split(input) {
484            let matcher = Combinator::new(Simple::new(r#"{"users"}"#).unwrap())
485                | Combinator::new(Simple::new(r#"{"void"}"#).unwrap());
486            let mut filter = Filter::new();
487            filter.add_matcher(Box::new(matcher), None);
488            let mut result: Vec<u8> = Vec::new();
489
490            let mut converter = OutputConverter::new();
491            for part in parts {
492                result.extend(
493                    converter
494                        .convert(&filter.process(&part).unwrap())
495                        .into_iter()
496                        .map(|e| e.1)
497                        .flatten()
498                        .collect::<Vec<u8>>(),
499                );
500            }
501            assert_eq!(
502                String::from_utf8(result).unwrap(),
503                r#"{ "groups": [{"gid": 1}, {"gid": 2}]}"#
504            )
505        }
506    }
507}