1use super::Handler;
31use crate::{error, path::Path, streamer::Token};
32use std::{any::Any, collections::VecDeque, str::FromStr};
33
34#[derive(Debug)]
36pub struct Buffer {
37 buffer: Vec<u8>,
39
40 buffer_idx: usize,
42
43 buffer_parts: Vec<usize>,
45
46 results: VecDeque<(Option<String>, Vec<u8>)>,
48
49 use_path: bool,
51
52 current_buffer_size: usize,
54
55 max_buffer_size: Option<usize>,
57}
58
59impl Default for Buffer {
60 fn default() -> Self {
61 Self {
62 use_path: false,
63 current_buffer_size: 0,
64 max_buffer_size: None,
65 buffer: vec![],
66 buffer_idx: 0,
67 buffer_parts: vec![],
68 results: VecDeque::new(),
69 }
70 }
71}
72
73impl FromStr for Buffer {
74 type Err = error::Handler;
75 fn from_str(input: &str) -> Result<Self, Self::Err> {
76 let splitted: Vec<_> = input.split(',').collect();
77 match splitted.len() {
78 0 => Ok(Self::default()),
79 1 => Ok(Self::default()
80 .set_use_path(FromStr::from_str(splitted[0]).map_err(error::Handler::new)?)),
81 2 => Ok(Self::default()
82 .set_use_path(FromStr::from_str(splitted[0]).map_err(error::Handler::new)?)
83 .set_max_buffer_size(Some(
84 FromStr::from_str(splitted[1]).map_err(error::Handler::new)?,
85 ))),
86 _ => Err(error::Handler::new("Failed to parse")),
87 }
88 }
89}
90
91trait Buff: Handler {
92 fn _start(
93 &mut self,
94 _path: &Path,
95 _matcher_idx: usize,
96 token: Token,
97 ) -> Result<Option<Vec<u8>>, error::Handler> {
98 if let Token::Start(idx, _) = token {
99 if self.buffer_parts().is_empty() {
100 *self.buffer_idx() = idx;
101 }
102 let buffer_idx = *self.buffer_idx();
103 self.buffer_parts().push(idx - buffer_idx);
104 Ok(None)
105 } else {
106 Err(error::Handler::new("Invalid token"))
107 }
108 }
109
110 fn _feed(
111 &mut self,
112 data: &[u8],
113 _matcher_idx: usize,
114 ) -> Result<Option<Vec<u8>>, error::Handler> {
115 if !self.buffer_parts().is_empty() {
117 if let Some(limit) = *self.max_buffer_size() {
119 if *self.current_buffer_size() + data.len() > limit {
120 return Err(error::Handler::new(format!(
121 "Max buffer size {} was reached",
122 limit
123 )));
124 }
125 }
126 self.buffer().extend(data);
127 *self.current_buffer_size() += data.len();
128 }
129
130 Ok(None)
131 }
132
133 fn _end(
134 &mut self,
135 path: &Path,
136 _matcher_idx: usize,
137 _token: Token,
138 ) -> Result<Option<Vec<u8>>, error::Handler> {
139 if let Some(idx) = self.buffer_parts().pop() {
141 let data = self.buffer()[idx..].to_vec();
142 self.store_result(path, data);
143 if self.buffer_parts().is_empty() {
144 self.buffer().clear();
145 }
146 Ok(None)
147 } else {
148 Err(error::Handler::new("Invalid token"))
149 }
150 }
151
152 fn store_result(&mut self, path: &Path, data: Vec<u8>);
153 fn buffer(&mut self) -> &mut Vec<u8>;
154 fn buffer_parts(&mut self) -> &mut Vec<usize>;
155 fn buffer_idx(&mut self) -> &mut usize;
156 fn max_buffer_size(&mut self) -> &mut Option<usize>;
157 fn current_buffer_size(&mut self) -> &mut usize;
158 fn use_path(&mut self) -> &mut bool;
159}
160
161impl Handler for Buffer {
162 fn start(
163 &mut self,
164 _path: &Path,
165 _matcher_idx: usize,
166 token: Token,
167 ) -> Result<Option<Vec<u8>>, error::Handler> {
168 self._start(_path, _matcher_idx, token)
169 }
170
171 fn feed(
172 &mut self,
173 data: &[u8],
174 _matcher_idx: usize,
175 ) -> Result<Option<Vec<u8>>, error::Handler> {
176 self._feed(data, _matcher_idx)
177 }
178
179 fn end(
180 &mut self,
181 _path: &Path,
182 _matcher_idx: usize,
183 token: Token,
184 ) -> Result<Option<Vec<u8>>, error::Handler> {
185 self._end(_path, _matcher_idx, token)
186 }
187
188 fn as_any(&self) -> &dyn Any {
189 self
190 }
191}
192
193impl Buff for Buffer {
194 fn store_result(&mut self, path: &Path, data: Vec<u8>) {
195 let use_path = *self.use_path();
196 self.results.push_back((
197 if use_path {
198 Some(path.to_string())
199 } else {
200 None
201 },
202 data,
203 ));
204 }
205
206 fn buffer(&mut self) -> &mut Vec<u8> {
207 &mut self.buffer
208 }
209
210 fn buffer_parts(&mut self) -> &mut Vec<usize> {
211 &mut self.buffer_parts
212 }
213
214 fn buffer_idx(&mut self) -> &mut usize {
215 &mut self.buffer_idx
216 }
217
218 fn max_buffer_size(&mut self) -> &mut Option<usize> {
219 &mut self.max_buffer_size
220 }
221
222 fn current_buffer_size(&mut self) -> &mut usize {
223 &mut self.current_buffer_size
224 }
225
226 fn use_path(&mut self) -> &mut bool {
227 &mut self.use_path
228 }
229}
230
231impl Buffer {
232 pub fn new() -> Self {
234 Self::default()
235 }
236
237 pub fn set_use_path(mut self, use_path: bool) -> Self {
248 self.use_path = use_path;
249 self
250 }
251
252 pub fn pop(&mut self) -> Option<(Option<String>, Vec<u8>)> {
270 let popped = self.results.pop_front();
271 if popped.is_some() {
272 self.current_buffer_size =
276 self.results.iter().fold(0, |e, y| e + y.1.len()) + self.buffer.len();
277 }
278 popped
279 }
280
281 pub fn set_max_buffer_size(mut self, max_size: Option<usize>) -> Self {
286 self.max_buffer_size = max_size;
287 self
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::Buffer;
294 use crate::{
295 matcher::{Combinator, Simple},
296 strategy::{Strategy, Trigger},
297 };
298 use std::sync::{Arc, Mutex};
299
300 #[test]
301 fn max_buffer_size_error() {
302 let mut trigger = Trigger::new();
303 let buffer_handler = Arc::new(Mutex::new(Buffer::new().set_max_buffer_size(Some(22))));
304 let matcher = Simple::new(r#"[]{"description"}"#).unwrap();
305
306 trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
307
308 assert!(trigger.process(br#"[{"description": "short"}, "#).is_ok());
310 assert!(trigger
312 .process(br#"{"description": "too long description"}]"#)
313 .is_err());
314 }
315
316 #[test]
317 fn max_buffer_size_consumed() {
318 let mut trigger = Trigger::new();
319 let buffer_handler = Arc::new(Mutex::new(Buffer::new().set_max_buffer_size(Some(22))));
320 let matcher = Simple::new(r#"[]{"description"}"#).unwrap();
321
322 trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
323
324 assert!(trigger.process(br#"[{"description": "short"}, "#).is_ok());
326 assert_eq!(
328 buffer_handler.lock().unwrap().pop().unwrap(),
329 (None, br#""short""#.to_vec())
330 );
331 assert!(trigger
332 .process(br#"{"description": "too long description"}]"#)
333 .is_ok());
334 assert_eq!(
336 buffer_handler.lock().unwrap().pop().unwrap(),
337 (None, br#""too long description""#.to_vec())
338 );
339 }
340
341 #[test]
342 fn nested_matches() {
343 let mut trigger = Trigger::new();
344 let buffer_handler = Arc::new(Mutex::new(Buffer::new()));
345 let matcher = Combinator::new(Simple::new(r#"{"nested"}"#).unwrap())
346 | Combinator::new(Simple::new(r#"{"nested"}[]"#).unwrap());
347
348 trigger.add_matcher(Box::new(matcher), buffer_handler.clone());
349 assert!(trigger.process(br#"{"nested": ["1", "2", "3"]}"#).is_ok());
350
351 let mut guard = buffer_handler.lock().unwrap();
352 assert_eq!(String::from_utf8(guard.pop().unwrap().1).unwrap(), r#""1""#);
353 assert_eq!(String::from_utf8(guard.pop().unwrap().1).unwrap(), r#""2""#);
354 assert_eq!(String::from_utf8(guard.pop().unwrap().1).unwrap(), r#""3""#);
355 assert_eq!(
356 String::from_utf8(guard.pop().unwrap().1).unwrap(),
357 r#"["1", "2", "3"]"#
358 );
359 assert_eq!(guard.pop(), None);
360 }
361}