streamson_lib/strategy/
convert.rs

1//! The main logic of JSON converting
2//!
3//! It substitutes a part of output with other data.
4//!
5//! Nested matches are not considered. Data are converted only by the
6//! first match.
7
8use super::{Output, Strategy};
9use crate::{
10    error,
11    handler::Handler,
12    matcher::Matcher,
13    path::Path,
14    streamer::{Streamer, Token},
15};
16use std::sync::{Arc, Mutex};
17
18/// Item in matcher list
19type MatcherItem = (Box<dyn Matcher>, Arc<Mutex<dyn Handler>>);
20
21/// Processes data from input and triggers handler
22pub struct Convert {
23    /// Input idx against total idx
24    input_start: usize,
25    /// Currently matched path and matcher index
26    matched: Option<(Path, usize)>,
27    /// Path matchers and a handler
28    matchers: Vec<MatcherItem>,
29    /// Responsible for data extraction
30    streamer: Streamer,
31    /// Current json level
32    level: usize,
33}
34
35impl Default for Convert {
36    fn default() -> Self {
37        Self {
38            input_start: 0,
39            matched: None,
40            matchers: vec![],
41            streamer: Streamer::new(),
42            level: 0,
43        }
44    }
45}
46
47impl Strategy for Convert {
48    fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General> {
49        self.streamer.feed(input);
50        let mut inner_idx = 0;
51
52        let mut result = vec![];
53        loop {
54            match self.streamer.read()? {
55                Token::Start(idx, kind) => {
56                    if self.level == 0 {
57                        result.push(Output::Start(None));
58                    }
59                    self.level += 1;
60
61                    if self.matched.is_none() {
62                        // try to check whether it matches
63                        for (matcher_idx, (matcher, _)) in self.matchers.iter().enumerate() {
64                            if matcher.match_path(self.streamer.current_path(), kind) {
65                                // start collecting
66                                self.matched =
67                                    Some((self.streamer.current_path().clone(), matcher_idx));
68
69                                // Flush remaining data to output
70                                let to = idx - self.input_start;
71                                result.push(Output::Data(input[inner_idx..to].to_vec()));
72                                inner_idx = to;
73
74                                // Notify handler that match has started
75                                let mut handler = self.matchers[matcher_idx].1.lock().unwrap();
76                                if let Some(data) = handler.start(
77                                    self.streamer.current_path(),
78                                    matcher_idx,
79                                    Token::Start(idx, kind),
80                                )? {
81                                    result.push(Output::Data(data));
82                                }
83                                break;
84                            }
85                        }
86                    }
87                }
88                Token::End(idx, kind) => {
89                    let mut clear = false;
90                    self.level -= 1;
91                    if let Some((matched_path, matcher_idx)) = self.matched.take() {
92                        if self.streamer.current_path() == &matched_path {
93                            clear = true;
94
95                            // move the buffer
96                            let to = idx - self.input_start;
97                            let data = &input[inner_idx..to];
98                            inner_idx = to;
99
100                            let mut handler = self.matchers[matcher_idx].1.lock().unwrap();
101
102                            // consume the data
103                            if let Some(to_output) = handler.feed(data, matcher_idx)? {
104                                result.push(Output::Data(to_output));
105                            }
106
107                            // Notify handlers that match has ended
108                            if let Some(data) = handler.end(
109                                self.streamer.current_path(),
110                                matcher_idx,
111                                Token::Start(idx, kind),
112                            )? {
113                                result.push(Output::Data(data));
114                            }
115                        }
116                        if !clear {
117                            self.matched = Some((matched_path, matcher_idx));
118                        }
119                    } else if self.level == 0 {
120                        // Finish the output before Output::End
121                        let to = idx - self.input_start;
122                        let data = &input[inner_idx..to];
123                        inner_idx = to;
124                        result.push(Output::Data(data.to_vec()));
125                    }
126
127                    if self.level == 0 {
128                        result.push(Output::End);
129                    }
130                }
131                Token::Pending => {
132                    self.input_start += input.len();
133                    if let Some((_, matcher_idx)) = self.matched {
134                        let mut handler = self.matchers[matcher_idx].1.lock().unwrap();
135                        if let Some(to_output) = handler.feed(&input[inner_idx..], matcher_idx)? {
136                            result.push(Output::Data(to_output));
137                        }
138                    } else {
139                        // don't export empty vec
140                        if inner_idx < input.len() {
141                            result.push(Output::Data(input[inner_idx..].to_vec()))
142                        }
143                    }
144                    return Ok(result);
145                }
146                Token::Separator(_) => {}
147            }
148        }
149    }
150
151    fn terminate(&mut self) -> Result<Vec<Output>, error::General> {
152        if self.level == 0 {
153            Ok(vec![])
154        } else {
155            Err(error::InputTerminated::new(self.input_start).into())
156        }
157    }
158}
159
160impl Convert {
161    /// Creates a new `Convert`
162    ///
163    /// It should replace a parts of the JSON using custom bytes
164    /// data are read.
165    pub fn new() -> Self {
166        Self::default()
167    }
168
169    /// Adds a mathcher and a handler to `Convert`
170    ///
171    /// # Arguments
172    /// * `matcher` - matcher which matches the path
173    /// * `handlers` - funtions which should be run to convert the data
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// use streamson_lib::{strategy, matcher, handler, path::Path};
179    /// use std::sync::{Arc, Mutex};
180    ///
181    /// let mut convert = strategy::Convert::new();
182    ///
183    /// let matcher = matcher::Simple::new(r#"{"list"}[]"#).unwrap();
184    /// convert.add_matcher(
185    ///     Box::new(matcher),
186    ///     Arc::new(Mutex::new(handler::Replace::new(vec![b'"', b'*', b'*', b'*', b'"']))),
187    /// );
188    /// ```
189    pub fn add_matcher(&mut self, matcher: Box<dyn Matcher>, handler: Arc<Mutex<dyn Handler>>) {
190        self.matchers.push((matcher, handler));
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::{Convert, Output, Strategy};
197    use crate::{
198        handler::{Group, Replace, Shorten},
199        matcher::Simple,
200        strategy::OutputConverter,
201        test::{Single, Splitter, Window},
202    };
203    use rstest::*;
204    use std::sync::{Arc, Mutex};
205
206    fn make_replace_handler() -> Arc<Mutex<Replace>> {
207        return Arc::new(Mutex::new(Replace::new(vec![b'"', b'*', b'*', b'*', b'"'])));
208    }
209
210    #[test]
211    fn basic() {
212        let mut convert = Convert::new();
213        let matcher = Simple::new(r#"[]{"password"}"#).unwrap();
214        convert.add_matcher(Box::new(matcher), make_replace_handler());
215
216        let mut output = convert
217            .process(br#"[{"id": 1, "password": "secret1"}, {"id": 2, "password": "secret2"}]"#)
218            .unwrap();
219
220        assert_eq!(output.len(), 7);
221        assert_eq!(output.remove(0), Output::Start(None),);
222        assert_eq!(
223            output.remove(0),
224            Output::Data(br#"[{"id": 1, "password": "#.to_vec()),
225        );
226        assert_eq!(output.remove(0), Output::Data(br#""***""#.to_vec()));
227        assert_eq!(
228            output.remove(0),
229            Output::Data(br#"}, {"id": 2, "password": "#.to_vec())
230        );
231        assert_eq!(output.remove(0), Output::Data(br#""***""#.to_vec()));
232        assert_eq!(output.remove(0), Output::Data(br#"}]"#.to_vec()));
233        assert_eq!(output.remove(0), Output::End);
234    }
235
236    #[rstest(
237        splitter,
238        case::single(Box::new(Single::new())),
239        case::window1(Box::new(Window::new(1))),
240        case::window5(Box::new(Window::new(5))),
241        case::window100(Box::new(Window::new(100)))
242    )]
243    fn basic_pending(splitter: Box<dyn Splitter>) {
244        for parts in splitter.split(
245            br#"[{"id": 1, "password": "secret1"}, {"id": 2, "password": "secret2"}]"#.to_vec(),
246        ) {
247            let mut convert = Convert::new();
248            let matcher = Simple::new(r#"[]{"password"}"#).unwrap();
249            convert.add_matcher(Box::new(matcher), make_replace_handler());
250
251            let mut result = vec![];
252            let mut converter = OutputConverter::new();
253            for part in parts {
254                let converted = convert.process(&part).unwrap();
255                let output = converter.convert(&converted);
256                result.extend(output.into_iter().map(|e| e.1));
257            }
258
259            assert_eq!(
260                String::from_utf8(result.into_iter().flatten().collect()).unwrap(),
261                r#"[{"id": 1, "password": "***"}, {"id": 2, "password": "***"}]"#
262            );
263        }
264    }
265
266    #[test]
267    fn chaining_handlers() {
268        let mut convert = Convert::new();
269        let matcher = Simple::new(r#"[]{"password"}"#).unwrap();
270        let replace = Arc::new(Mutex::new(Replace::new(br#""*****************""#.to_vec())));
271        let shorten = Arc::new(Mutex::new(Shorten::new(4, "...\"".into())));
272        convert.add_matcher(
273            Box::new(matcher),
274            Arc::new(Mutex::new(
275                Group::new().add_handler(replace).add_handler(shorten),
276            )),
277        );
278
279        let output = OutputConverter::new()
280            .convert(
281                &convert
282                    .process(
283                        br#"[{"id": 1, "password": "secret1"}, {"id": 2, "password": "secret2"}]"#,
284                    )
285                    .unwrap(),
286            )
287            .into_iter()
288            .map(|e| e.1);
289
290        assert_eq!(
291            String::from_utf8(output.into_iter().flatten().collect()).unwrap(),
292            r#"[{"id": 1, "password": "****..."}, {"id": 2, "password": "****..."}]"#
293        );
294    }
295}