streamson_lib/
strategy.rs

1//! Collection of json processing strategies
2
3pub mod all;
4pub mod convert;
5pub mod extract;
6pub mod filter;
7pub mod trigger;
8
9pub use all::All;
10pub use convert::Convert;
11pub use extract::Extract;
12pub use filter::Filter;
13pub use trigger::Trigger;
14
15use crate::{error, path::Path};
16use std::mem;
17
18#[derive(Debug, PartialEq)]
19pub enum Output {
20    Start(Option<Path>),
21    Data(Vec<u8>),
22    End,
23}
24
25#[derive(Default)]
26pub struct OutputConverter {
27    buffer: Vec<u8>,
28    paths: Vec<Option<Path>>,
29}
30
31impl OutputConverter {
32    pub fn new() -> Self {
33        Default::default()
34    }
35
36    pub fn convert(&mut self, input: &[Output]) -> Vec<(Option<Path>, Vec<u8>)> {
37        let mut res = vec![];
38        for field in input {
39            match field {
40                Output::Start(path_opt) => {
41                    self.paths.push(path_opt.clone());
42                }
43                Output::Data(data) => {
44                    self.buffer.extend(data);
45                }
46                Output::End => {
47                    let mut output = vec![];
48                    mem::swap(&mut output, &mut self.buffer);
49                    res.push((self.paths.pop().unwrap_or(None), output));
50                }
51            }
52        }
53        res
54    }
55}
56
57pub trait Strategy {
58    /// Processes input data
59    ///
60    /// # Arguments
61    /// * `input` - input data
62    ///
63    /// # Returns
64    /// * `Ok(_) processing passed
65    /// * `Err(_)` - error occured during processing
66    ///
67    /// # Errors
68    ///
69    /// If parsing logic finds that JSON is not valid,
70    /// it returns `error::General`.
71    ///
72    /// Note that streamson assumes that its input is a valid
73    /// JSONs and if not, it still might be processed without an error.
74    /// This is caused because streamson does not validate JSON.
75    fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General>;
76
77    /// Should be called when input data terminates
78    ///
79    /// # Returns
80    /// * `Ok(_) processing passed
81    /// * `Err(_)` - error occured during processing
82    ///
83    /// # Errors
84    ///
85    /// Should return an error if strategy is in unterminated state
86    /// (still expects data in input)
87    fn terminate(&mut self) -> Result<Vec<Output>, error::General>;
88}
89
90#[cfg(test)]
91mod test {
92    use super::{Output, OutputConverter, Path};
93    use std::convert::TryFrom;
94
95    #[test]
96    fn converter() {
97        let mut converter = OutputConverter::new();
98        let data = converter.convert(&[
99            Output::Start(None),
100            Output::Data(b"1234".to_vec()),
101            Output::End,
102        ]);
103        assert_eq!(data, vec![(None, b"1234".to_vec())]);
104
105        let data = converter.convert(&[
106            Output::Start(Some(Path::try_from("").unwrap())),
107            Output::Data(b"567".to_vec()),
108        ]);
109        assert_eq!(data, vec![]);
110
111        let data = converter.convert(&[Output::Data(b"89".to_vec()), Output::End]);
112        assert_eq!(
113            data,
114            vec![(Some(Path::try_from("").unwrap()), b"56789".to_vec())]
115        );
116    }
117}