streamson_lib/strategy/
all.rs

1//! The main logic processing all elements from JSON
2//!
3//! This strategy doesn't require any matchers
4//! Handlers will be triggered on every element
5
6use super::{Output, Strategy};
7use crate::{
8    error,
9    handler::{Group, Handler},
10    streamer::{Streamer, Token},
11};
12use std::sync::{Arc, Mutex};
13
14/// Trigger handlers on every element
15#[derive(Default)]
16pub struct All {
17    /// Should handlers be used for converting
18    convert: bool,
19    /// Input idx against total idx
20    input_start: usize,
21    /// Responsible for data extraction
22    streamer: Streamer,
23    /// List of handlers to be triggered
24    handlers: Arc<Mutex<Group>>,
25    /// Current json level
26    level: usize,
27}
28
29impl Strategy for All {
30    fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General> {
31        self.streamer.feed(input);
32        let mut inner_idx = 0;
33        let mut result = vec![];
34        loop {
35            match self.streamer.read()? {
36                Token::Start(idx, kind) => {
37                    let path = self.streamer.current_path();
38
39                    if self.level == 0 {
40                        result.push(Output::Start(None));
41                    }
42
43                    let to = idx - self.input_start;
44                    let mut guard = self.handlers.lock().unwrap();
45                    if let Some(data) = guard.feed(&input[inner_idx..to], 0)? {
46                        if self.convert {
47                            result.push(Output::Data(data));
48                        }
49                    }
50                    if let Some(data) = guard.start(path, 0, Token::Start(idx, kind))? {
51                        if self.convert {
52                            result.push(Output::Data(data));
53                        }
54                    }
55                    self.level += 1;
56                    inner_idx = to;
57                }
58                Token::End(idx, kind) => {
59                    let path = self.streamer.current_path();
60
61                    let to = idx - self.input_start;
62                    let mut guard = self.handlers.lock().unwrap();
63                    if let Some(data) = guard.feed(&input[inner_idx..to], 0)? {
64                        if self.convert {
65                            result.push(Output::Data(data));
66                        }
67                    }
68                    if let Some(data) = guard.end(path, 0, Token::End(idx, kind))? {
69                        if self.convert {
70                            result.push(Output::Data(data));
71                        }
72                    }
73                    inner_idx = to;
74                    self.level -= 1;
75                    if self.level == 0 {
76                        result.push(Output::End);
77                    }
78                }
79                Token::Pending => {
80                    self.input_start += input.len();
81                    let mut guard = self.handlers.lock().unwrap();
82                    if let Some(data) = guard.feed(&input[inner_idx..], 0)? {
83                        if self.convert {
84                            result.push(Output::Data(data));
85                        }
86                    }
87                    return Ok(if self.convert { result } else { vec![] });
88                }
89                Token::Separator(_) => {}
90            }
91        }
92    }
93
94    fn terminate(&mut self) -> Result<Vec<Output>, error::General> {
95        if self.level == 0 {
96            Ok(vec![])
97        } else {
98            Err(error::InputTerminated::new(self.input_start).into())
99        }
100    }
101}
102
103impl All {
104    /// Creates a new `All`
105    ///
106    /// It triggers handlers on all found elements
107    pub fn new() -> Self {
108        Default::default()
109    }
110
111    /// Sets whether handlers should be actually used to converting data
112    pub fn set_convert(&mut self, convert: bool) {
113        self.convert = convert;
114    }
115
116    /// Adds a handler to `All`
117    ///
118    /// # Arguments
119    /// * `handler` - handler to be triggers when path matches
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use streamson_lib::{strategy, matcher, handler};
125    /// use std::sync::{Arc, Mutex};
126    ///
127    /// let mut trigger = strategy::All::new();
128    /// let handler = handler::Analyser::new();
129    /// trigger.add_handler(
130    ///     Arc::new(Mutex::new(handler))
131    /// );
132    /// ```
133    pub fn add_handler(&mut self, handler: Arc<Mutex<dyn Handler>>) {
134        self.handlers.lock().unwrap().add_handler_mut(handler);
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::{All, Strategy};
141    use crate::{
142        handler::{Analyser, Replace},
143        strategy::OutputConverter,
144        test::{Single, Splitter, Window},
145    };
146    use rstest::*;
147    use std::sync::{Arc, Mutex};
148
149    fn get_input() -> Vec<u8> {
150        br#"{"elements": [1, 2, 3, 4, [5, 6], {"another": null}]}"#.to_vec()
151    }
152
153    #[rstest(
154        splitter,
155        case::single(Box::new(Single::new())),
156        case::window1(Box::new(Window::new(1))),
157        case::window5(Box::new(Window::new(5))),
158        case::window100(Box::new(Window::new(100)))
159    )]
160    fn no_convert(splitter: Box<dyn Splitter>) {
161        for part in splitter.split(get_input()) {
162            let mut all = All::new();
163            let handler = Arc::new(Mutex::new(Analyser::new()));
164            all.add_handler(handler.clone());
165            for input in part {
166                all.process(&input).unwrap();
167            }
168
169            let guard = handler.lock().unwrap();
170            let results = guard.results();
171            assert_eq!(results.len(), 5);
172            assert_eq!(results[0], ("".into(), 1));
173            assert_eq!(results[1], (r#"{"elements"}"#.into(), 1));
174            assert_eq!(results[2], (r#"{"elements"}[]"#.into(), 6));
175            assert_eq!(results[3], (r#"{"elements"}[][]"#.into(), 2));
176            assert_eq!(results[4], (r#"{"elements"}[]{"another"}"#.into(), 1));
177        }
178    }
179
180    #[rstest(
181        splitter,
182        case::single(Box::new(Single::new())),
183        case::window1(Box::new(Window::new(1))),
184        case::window5(Box::new(Window::new(5))),
185        case::window100(Box::new(Window::new(100)))
186    )]
187    fn convert(splitter: Box<dyn Splitter>) {
188        for part in splitter.split(get_input()) {
189            let mut all = All::new();
190            all.set_convert(true);
191            let handler = Arc::new(Mutex::new(Replace::new(br#"."#.to_vec())));
192            all.add_handler(handler);
193            let mut result = vec![];
194            let mut converter = OutputConverter::new();
195            for input in part {
196                let output = converter.convert(&all.process(&input).unwrap());
197                for data in output {
198                    result.extend(data.1);
199                }
200            }
201
202            assert_eq!(result, br#"..........."#);
203        }
204    }
205}