streamson_lib/handler/
buffer.rs

1//! Handler which buffers output which can be manually extracted
2//!
3//! # Example
4//! ```
5//! use streamson_lib::{handler, matcher, strategy::{self, Strategy}};
6//! use std::sync::{Arc, Mutex};
7//!
8//! let buffer_handler = Arc::new(Mutex::new(handler::Buffer::new().set_use_path(true)));
9//!
10//! let matcher = matcher::Simple::new(r#"{"users"}[]{"name"}"#).unwrap();
11//!
12//! let mut trigger = strategy::Trigger::new();
13//!
14//! // Set the matcher for trigger strategy
15//! trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
16//!
17//! for input in vec![
18//!     br#"{"users": [{"id": 1, "name": "first"}, {"#.to_vec(),
19//!     br#""id": 2, "name": "second}]}"#.to_vec(),
20//! ] {
21//!     trigger.process(&input).unwrap();
22//!     let mut guard = buffer_handler.lock().unwrap();
23//!     while let Some((path, data)) = guard.pop() {
24//!         // Do something with the data
25//!         println!("{} (len {})", path.unwrap(), data.len());
26//!     }
27//! }
28//! ```
29
30use super::Handler;
31use crate::{error, path::Path, streamer::Token};
32use std::{any::Any, collections::VecDeque, str::FromStr};
33
34/// Buffer handler responsible for storing slitted JSONs into memory
35#[derive(Debug)]
36pub struct Buffer {
37    /// For storing unterminated data
38    buffer: Vec<u8>,
39
40    /// Buffer idx to total index
41    buffer_idx: usize,
42
43    /// Indexes for the Path and size
44    buffer_parts: Vec<usize>,
45
46    /// Queue with stored jsons in (path, data) format
47    results: VecDeque<(Option<String>, Vec<u8>)>,
48
49    /// Not to show path will spare some allocation
50    use_path: bool,
51
52    /// Current buffer size (in bytes)
53    current_buffer_size: usize,
54
55    /// Max buffer size
56    max_buffer_size: Option<usize>,
57}
58
59impl Default for Buffer {
60    fn default() -> Self {
61        Self {
62            use_path: false,
63            current_buffer_size: 0,
64            max_buffer_size: None,
65            buffer: vec![],
66            buffer_idx: 0,
67            buffer_parts: vec![],
68            results: VecDeque::new(),
69        }
70    }
71}
72
73impl FromStr for Buffer {
74    type Err = error::Handler;
75    fn from_str(input: &str) -> Result<Self, Self::Err> {
76        let splitted: Vec<_> = input.split(',').collect();
77        match splitted.len() {
78            0 => Ok(Self::default()),
79            1 => Ok(Self::default()
80                .set_use_path(FromStr::from_str(splitted[0]).map_err(error::Handler::new)?)),
81            2 => Ok(Self::default()
82                .set_use_path(FromStr::from_str(splitted[0]).map_err(error::Handler::new)?)
83                .set_max_buffer_size(Some(
84                    FromStr::from_str(splitted[1]).map_err(error::Handler::new)?,
85                ))),
86            _ => Err(error::Handler::new("Failed to parse")),
87        }
88    }
89}
90
91trait Buff: Handler {
92    fn _start(
93        &mut self,
94        _path: &Path,
95        _matcher_idx: usize,
96        token: Token,
97    ) -> Result<Option<Vec<u8>>, error::Handler> {
98        if let Token::Start(idx, _) = token {
99            if self.buffer_parts().is_empty() {
100                *self.buffer_idx() = idx;
101            }
102            let buffer_idx = *self.buffer_idx();
103            self.buffer_parts().push(idx - buffer_idx);
104            Ok(None)
105        } else {
106            Err(error::Handler::new("Invalid token"))
107        }
108    }
109
110    fn _feed(
111        &mut self,
112        data: &[u8],
113        _matcher_idx: usize,
114    ) -> Result<Option<Vec<u8>>, error::Handler> {
115        // buffer is being used
116        if !self.buffer_parts().is_empty() {
117            // check whether buffer capacity hasn't been reached
118            if let Some(limit) = *self.max_buffer_size() {
119                if *self.current_buffer_size() + data.len() > limit {
120                    return Err(error::Handler::new(format!(
121                        "Max buffer size {} was reached",
122                        limit
123                    )));
124                }
125            }
126            self.buffer().extend(data);
127            *self.current_buffer_size() += data.len();
128        }
129
130        Ok(None)
131    }
132
133    fn _end(
134        &mut self,
135        path: &Path,
136        _matcher_idx: usize,
137        _token: Token,
138    ) -> Result<Option<Vec<u8>>, error::Handler> {
139        // Try to push buffer
140        if let Some(idx) = self.buffer_parts().pop() {
141            let data = self.buffer()[idx..].to_vec();
142            self.store_result(path, data);
143            if self.buffer_parts().is_empty() {
144                self.buffer().clear();
145            }
146            Ok(None)
147        } else {
148            Err(error::Handler::new("Invalid token"))
149        }
150    }
151
152    fn store_result(&mut self, path: &Path, data: Vec<u8>);
153    fn buffer(&mut self) -> &mut Vec<u8>;
154    fn buffer_parts(&mut self) -> &mut Vec<usize>;
155    fn buffer_idx(&mut self) -> &mut usize;
156    fn max_buffer_size(&mut self) -> &mut Option<usize>;
157    fn current_buffer_size(&mut self) -> &mut usize;
158    fn use_path(&mut self) -> &mut bool;
159}
160
161impl Handler for Buffer {
162    fn start(
163        &mut self,
164        _path: &Path,
165        _matcher_idx: usize,
166        token: Token,
167    ) -> Result<Option<Vec<u8>>, error::Handler> {
168        self._start(_path, _matcher_idx, token)
169    }
170
171    fn feed(
172        &mut self,
173        data: &[u8],
174        _matcher_idx: usize,
175    ) -> Result<Option<Vec<u8>>, error::Handler> {
176        self._feed(data, _matcher_idx)
177    }
178
179    fn end(
180        &mut self,
181        _path: &Path,
182        _matcher_idx: usize,
183        token: Token,
184    ) -> Result<Option<Vec<u8>>, error::Handler> {
185        self._end(_path, _matcher_idx, token)
186    }
187
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191}
192
193impl Buff for Buffer {
194    fn store_result(&mut self, path: &Path, data: Vec<u8>) {
195        let use_path = *self.use_path();
196        self.results.push_back((
197            if use_path {
198                Some(path.to_string())
199            } else {
200                None
201            },
202            data,
203        ));
204    }
205
206    fn buffer(&mut self) -> &mut Vec<u8> {
207        &mut self.buffer
208    }
209
210    fn buffer_parts(&mut self) -> &mut Vec<usize> {
211        &mut self.buffer_parts
212    }
213
214    fn buffer_idx(&mut self) -> &mut usize {
215        &mut self.buffer_idx
216    }
217
218    fn max_buffer_size(&mut self) -> &mut Option<usize> {
219        &mut self.max_buffer_size
220    }
221
222    fn current_buffer_size(&mut self) -> &mut usize {
223        &mut self.current_buffer_size
224    }
225
226    fn use_path(&mut self) -> &mut bool {
227        &mut self.use_path
228    }
229}
230
231impl Buffer {
232    /// Creates a new handler which stores output within itself
233    pub fn new() -> Self {
234        Self::default()
235    }
236
237    /// Set whether to show path
238    ///
239    /// # Arguments
240    /// * `use_path` - should path be store with data
241    ///
242    /// # Example
243    /// ```
244    /// use streamson_lib::handler;
245    /// let file = handler::Buffer::new().set_use_path(true);
246    /// ```
247    pub fn set_use_path(mut self, use_path: bool) -> Self {
248        self.use_path = use_path;
249        self
250    }
251
252    /// Pops the oldest value in the buffer
253    ///
254    /// # Returns
255    /// * `None` - queue is empty
256    /// * `Some((path, data))` - stored data remove from the queue and returned
257    ///
258    /// # Example
259    /// ```
260    /// use streamson_lib::handler;
261    /// let mut buffer = handler::buffer::Buffer::new().set_use_path(true);
262    /// while let Some((path, data)) = buffer.pop() {
263    ///     // Do something with the data
264    ///     println!("{} (len {})", path.unwrap(), data.len());
265    /// }
266    ///
267    ///
268    /// ```
269    pub fn pop(&mut self) -> Option<(Option<String>, Vec<u8>)> {
270        let popped = self.results.pop_front();
271        if popped.is_some() {
272            // recalculate buffer size
273            // note that due to nested matches you can't simply substract
274            // length of popped data
275            self.current_buffer_size =
276                self.results.iter().fold(0, |e, y| e + y.1.len()) + self.buffer.len();
277        }
278        popped
279    }
280
281    /// Sets max buffer size
282    ///
283    /// # Arguments
284    /// * `use_path` - should path be store with data
285    pub fn set_max_buffer_size(mut self, max_size: Option<usize>) -> Self {
286        self.max_buffer_size = max_size;
287        self
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::Buffer;
294    use crate::{
295        matcher::{Combinator, Simple},
296        strategy::{Strategy, Trigger},
297    };
298    use std::sync::{Arc, Mutex};
299
300    #[test]
301    fn max_buffer_size_error() {
302        let mut trigger = Trigger::new();
303        let buffer_handler = Arc::new(Mutex::new(Buffer::new().set_max_buffer_size(Some(22))));
304        let matcher = Simple::new(r#"[]{"description"}"#).unwrap();
305
306        trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
307
308        // Fits into the buffer
309        assert!(trigger.process(br#"[{"description": "short"}, "#).is_ok());
310        // Doesn't fit into the buffer
311        assert!(trigger
312            .process(br#"{"description": "too long description"}]"#)
313            .is_err());
314    }
315
316    #[test]
317    fn max_buffer_size_consumed() {
318        let mut trigger = Trigger::new();
319        let buffer_handler = Arc::new(Mutex::new(Buffer::new().set_max_buffer_size(Some(22))));
320        let matcher = Simple::new(r#"[]{"description"}"#).unwrap();
321
322        trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
323
324        // Fits into the buffer
325        assert!(trigger.process(br#"[{"description": "short"}, "#).is_ok());
326        // Make the buffer shorter
327        assert_eq!(
328            buffer_handler.lock().unwrap().pop().unwrap(),
329            (None, br#""short""#.to_vec())
330        );
331        assert!(trigger
332            .process(br#"{"description": "too long description"}]"#)
333            .is_ok());
334        // Make the buffer shorter
335        assert_eq!(
336            buffer_handler.lock().unwrap().pop().unwrap(),
337            (None, br#""too long description""#.to_vec())
338        );
339    }
340
341    #[test]
342    fn nested_matches() {
343        let mut trigger = Trigger::new();
344        let buffer_handler = Arc::new(Mutex::new(Buffer::new()));
345        let matcher = Combinator::new(Simple::new(r#"{"nested"}"#).unwrap())
346            | Combinator::new(Simple::new(r#"{"nested"}[]"#).unwrap());
347
348        trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
349        assert!(trigger.process(br#"{"nested": ["1", "2", "3"]}"#).is_ok());
350
351        let mut guard = buffer_handler.lock().unwrap();
352        assert_eq!(String::from_utf8(guard.pop().unwrap().1).unwrap(), r#""1""#);
353        assert_eq!(String::from_utf8(guard.pop().unwrap().1).unwrap(), r#""2""#);
354        assert_eq!(String::from_utf8(guard.pop().unwrap().1).unwrap(), r#""3""#);
355        assert_eq!(
356            String::from_utf8(guard.pop().unwrap().1).unwrap(),
357            r#"["1", "2", "3"]"#
358        );
359        assert_eq!(guard.pop(), None);
360    }
361}