streamson_lib/strategy/
trigger.rs

1//! The main logic of trigger JSON processing
2//!
3//! It uses matchers, handlers and path extraction,
4//! to call a handler on the matched part of data
5//!
6//! Note that it doesn't change the json while processing,
7//! which makes it the fastest strategy in streamson-lib.
8
9use crate::{
10    error,
11    handler::Handler,
12    matcher::Matcher,
13    streamer::{Streamer, Token},
14};
15use std::{
16    collections::HashSet,
17    sync::{Arc, Mutex},
18};
19
20use super::{Output, Strategy};
21
22#[derive(Debug)]
23struct StackItem {
24    /// Total index
25    idx: usize,
26    /// Idx to vec of matchers
27    match_idx: usize,
28}
29
30/// Item in matcher list
31type MatcherItem = (Box<dyn Matcher>, Arc<Mutex<dyn Handler>>);
32
33/// Processes data from input and triggers handlers
34pub struct Trigger {
35    /// Input idx against total idx
36    input_start: usize,
37    /// Path matchers and handlers
38    matchers: Vec<MatcherItem>,
39    /// Responsible for data extraction
40    streamer: Streamer,
41    /// Matched stack
42    matched_stack: Vec<Vec<StackItem>>,
43    /// Current json level
44    level: usize,
45}
46
47impl Default for Trigger {
48    fn default() -> Self {
49        Self {
50            input_start: 0,
51            matchers: vec![],
52            streamer: Streamer::new(),
53            matched_stack: vec![],
54            level: 0,
55        }
56    }
57}
58
59impl Strategy for Trigger {
60    fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General> {
61        self.streamer.feed(input);
62        let mut inner_idx = 0;
63        loop {
64            match self.streamer.read()? {
65                Token::Start(idx, kind) => {
66                    self.level += 1;
67                    // trigger handler for matched
68                    let to = idx - self.input_start;
69                    self.feed(&input[inner_idx..to])?;
70                    inner_idx = to;
71
72                    let mut matched = vec![];
73                    let path = self.streamer.current_path();
74
75                    // try to check whether it matches
76                    for (match_idx, (matcher, _)) in self.matchers.iter().enumerate() {
77                        if matcher.match_path(path, kind) {
78                            // handler starts
79                            let mut guard = self.matchers[match_idx].1.lock().unwrap();
80                            guard.start(path, match_idx, Token::Start(idx, kind))?;
81                            matched.push(StackItem { idx, match_idx });
82                        }
83                    }
84
85                    self.matched_stack.push(matched);
86                }
87                Token::End(idx, kind) => {
88                    self.level -= 1;
89                    let to = idx - self.input_start;
90                    self.feed(&input[inner_idx..to])?;
91                    inner_idx = to;
92
93                    let current_path = self.streamer.current_path();
94                    let items = self.matched_stack.pop().unwrap();
95                    for item in items {
96                        // run handlers for the matches
97                        let mut guard = self.matchers[item.match_idx].1.lock().unwrap();
98                        guard.end(current_path, item.match_idx, Token::End(idx, kind))?;
99                    }
100                }
101                Token::Pending => {
102                    self.input_start += input.len();
103                    self.feed(&input[inner_idx..])?;
104                    return Ok(vec![]);
105                }
106                Token::Separator(_) => {}
107            }
108        }
109    }
110
111    fn terminate(&mut self) -> Result<Vec<Output>, error::General> {
112        if self.level == 0 {
113            Ok(vec![])
114        } else {
115            Err(error::InputTerminated::new(self.input_start).into())
116        }
117    }
118}
119
120impl Trigger {
121    /// Creates a new `Trigger`
122    ///
123    /// It collects matched data and triggers handlers when entire
124    /// data are read.
125    pub fn new() -> Self {
126        Self::default()
127    }
128
129    /// Adds a mathcher and a handler to `Trigger`
130    ///
131    /// # Arguments
132    /// * `matcher` - matcher which matches the path
133    /// * `handler` - handler to be triggers when path matches
134    ///
135    /// # Example
136    ///
137    /// ```
138    /// use streamson_lib::{strategy, matcher, handler};
139    /// use std::{io, sync::{Arc, Mutex}};
140    ///
141    /// let mut trigger = strategy::Trigger::new();
142    /// let handler = handler::Output::new(io::stdout());
143    /// let matcher = matcher::Simple::new(r#"{"list"}[]"#).unwrap();
144    /// trigger.add_matcher(
145    ///     Box::new(matcher),
146    ///     Arc::new(Mutex::new(handler))
147    /// );
148    /// ```
149    pub fn add_matcher(&mut self, matcher: Box<dyn Matcher>, handler: Arc<Mutex<dyn Handler>>) {
150        self.matchers.push((matcher, handler));
151    }
152
153    fn feed(&mut self, data: &[u8]) -> Result<(), error::Handler> {
154        // feed only once in case that there is some nested matcher
155        let mut seen_match_idx = HashSet::<usize>::new();
156        for matched_items in &self.matched_stack {
157            for matched_item in matched_items {
158                if seen_match_idx.insert(matched_item.match_idx) {
159                    let mut guard = self.matchers[matched_item.match_idx].1.lock().unwrap();
160                    guard.feed(data, matched_item.match_idx)?;
161                }
162            }
163        }
164        Ok(())
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::{Strategy, Trigger};
171    use crate::{
172        error,
173        handler::Handler,
174        matcher::Simple,
175        path::Path,
176        streamer::Token,
177        test::{Single, Splitter, Window},
178    };
179    use rstest::*;
180    use std::{
181        any::Any,
182        sync::{Arc, Mutex},
183    };
184
185    #[derive(Default)]
186    struct TestHandler {
187        paths: Vec<String>,
188        data: Vec<Vec<u8>>,
189        current: Vec<u8>,
190    }
191
192    impl Handler for TestHandler {
193        fn start(
194            &mut self,
195            path: &Path,
196            _matcher_idx: usize,
197            _kind: Token,
198        ) -> Result<Option<Vec<u8>>, error::Handler> {
199            self.paths.push(path.to_string());
200            Ok(None)
201        }
202        fn feed(
203            &mut self,
204            data: &[u8],
205            _matcher_idx: usize,
206        ) -> Result<Option<Vec<u8>>, error::Handler> {
207            self.current.extend(data.to_vec());
208            Ok(None)
209        }
210        fn end(
211            &mut self,
212            _path: &Path,
213            _matcher_idx: usize,
214            _kind: Token,
215        ) -> Result<Option<Vec<u8>>, error::Handler> {
216            self.data.push(self.current.clone());
217            self.current.clear();
218            Ok(None)
219        }
220
221        fn as_any(&self) -> &dyn Any {
222            self
223        }
224    }
225
226    #[test]
227    fn basic() {
228        let mut trigger = Trigger::new();
229        let handler = Arc::new(Mutex::new(TestHandler::default()));
230        let matcher = Simple::new(r#"{"elements"}[]"#).unwrap();
231        trigger.add_matcher(Box::new(matcher), handler.clone());
232        trigger.process(br#"{"elements": [1, 2, 3, 4]}"#).unwrap();
233
234        let guard = handler.lock().unwrap();
235        assert_eq!(guard.paths[0], r#"{"elements"}[0]"#);
236        assert_eq!(guard.data[0], br#"1"#.to_vec());
237
238        assert_eq!(guard.paths[1], r#"{"elements"}[1]"#);
239        assert_eq!(guard.data[1], br#"2"#.to_vec());
240
241        assert_eq!(guard.paths[2], r#"{"elements"}[2]"#);
242        assert_eq!(guard.data[2], br#"3"#.to_vec());
243
244        assert_eq!(guard.paths[3], r#"{"elements"}[3]"#);
245        assert_eq!(guard.data[3], br#"4"#.to_vec());
246    }
247
248    #[rstest(
249        splitter,
250        case::single(Box::new(Single::new())),
251        case::window1(Box::new(Window::new(1))),
252        case::window5(Box::new(Window::new(5))),
253        case::window100(Box::new(Window::new(100)))
254    )]
255    fn splitted(splitter: Box<dyn Splitter>) {
256        for parts in splitter.split(br#"{"elements": [1, 2, 3, 4]}"#.to_vec()) {
257            let mut trigger = Trigger::new();
258            let handler = Arc::new(Mutex::new(TestHandler::default()));
259            let matcher = Simple::new(r#"{"elements"}[]"#).unwrap();
260            trigger.add_matcher(Box::new(matcher), handler.clone());
261
262            for part in parts {
263                trigger.process(&part).unwrap();
264            }
265
266            let guard = handler.lock().unwrap();
267            assert_eq!(guard.paths.len(), 4);
268
269            assert_eq!(guard.paths[0], r#"{"elements"}[0]"#);
270            assert_eq!(guard.data[0], br#"1"#.to_vec());
271
272            assert_eq!(guard.paths[1], r#"{"elements"}[1]"#);
273            assert_eq!(guard.data[1], br#"2"#.to_vec());
274
275            assert_eq!(guard.paths[2], r#"{"elements"}[2]"#);
276            assert_eq!(guard.data[2], br#"3"#.to_vec());
277
278            assert_eq!(guard.paths[3], r#"{"elements"}[3]"#);
279            assert_eq!(guard.data[3], br#"4"#.to_vec());
280        }
281    }
282}