1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
//! Handler which buffers output which can be manually extracted //! //! # Example //! ``` //! use streamson_lib::{handler, matcher, strategy}; //! use std::sync::{Arc, Mutex}; //! //! let buffer_handler = Arc::new(Mutex::new(handler::Buffer::new().set_use_path(true))); //! //! let matcher = matcher::Simple::new(r#"{"users"}[]{"name"}"#).unwrap(); //! //! let mut trigger = strategy::Trigger::new(); //! //! // Set the matcher for trigger strategy //! trigger.add_matcher(Box::new(matcher), &[buffer_handler.clone()]); //! //! for input in vec![ //! br#"{"users": [{"id": 1, "name": "first"}, {"#.to_vec(), //! br#""id": 2, "name": "second}]}"#.to_vec(), //! ] { //! trigger.process(&input).unwrap(); //! let mut guard = buffer_handler.lock().unwrap(); //! while let Some((path, data)) = guard.pop() { //! // Do something with the data //! println!("{} (len {})", path.unwrap(), data.len()); //! } //! } //! ``` use super::Handler; use crate::{error, path::Path}; use std::collections::VecDeque; /// Buffer handler responsible for storing slitted JSONs into memory #[derive(Debug)] pub struct Buffer { /// Queue with stored jsons in (path, data) format stored: VecDeque<(Option<String>, Vec<u8>)>, /// Not to show path will spare some allocation use_path: bool, } impl Default for Buffer { fn default() -> Self { Self { stored: VecDeque::new(), use_path: false, } } } impl Handler for Buffer { fn handle( &mut self, path: &Path, _matcher_idx: usize, data: Option<&[u8]>, ) -> Result<(), error::Handler> { // TODO we may limit the max VecDeque size and raise // an error when reached // let path_opt = if self.use_path { Some(path.to_string()) } else { None }; self.stored.push_back((path_opt, data.unwrap().to_vec())); Ok(()) } fn use_path(&self) -> bool { self.use_path } } impl Buffer { /// Creates a new handler which stores output within itself pub fn new() -> Self { Self::default() } /// /// Set whether to show path /// /// # Arguments /// * `use_path` - should path be store with data /// /// # Example /// ``` /// use streamson_lib::handler; /// let file = handler::Buffer::new().set_use_path(true); /// ``` pub fn set_use_path(mut self, use_path: bool) -> Self { self.use_path = use_path; self } /// Pops the oldest value in the buffer /// /// # Returns /// * `None` - queue is empty /// * `Some((path, data))` - stored data remove from the queue and returned /// /// # Example /// ``` /// use streamson_lib::handler; /// let mut buffer = handler::buffer::Buffer::new().set_use_path(true); /// while let Some((path, data)) = buffer.pop() { /// // Do something with the data /// println!("{} (len {})", path.unwrap(), data.len()); /// } /// /// /// ``` pub fn pop(&mut self) -> Option<(Option<String>, Vec<u8>)> { self.stored.pop_front() } }