streamson_lib/handler/
group.rs

1//! Handler which groups multiple handlers together
2//!
3//!
4//! ```
5//! use streamson_lib::{handler, matcher, strategy::{self, Strategy}};
6//! use std::{io, sync::{Arc, Mutex}};
7//!
8//!
9//! let group_handler = handler::Group::new()
10//!     .add_handler(Arc::new(Mutex::new(handler::Unstringify::new())))
11//!     .add_handler(Arc::new(Mutex::new(handler::Output::new(io::stdout()))));
12//!
13//! let matcher = matcher::Simple::new(r#"{"users"}[]{"name"}"#).unwrap();
14//! let mut trigger = strategy::Trigger::new();
15//! trigger.add_matcher(Box::new(matcher), Arc::new(Mutex::new(group_handler)));
16//!
17//! for input in vec![
18//!     br#"{"users": [{"id": 1, "name": "first"}, {"#.to_vec(),
19//!     br#""id": 2, "name": "second}]}"#.to_vec(),
20//! ] {
21//!     trigger.process(&input).unwrap();
22//! }
23//!
24//! ```
25//!
26
27use std::{
28    any::Any,
29    ops,
30    sync::{Arc, Mutex},
31};
32
33use crate::{error, path::Path, streamer::Token};
34
35use super::Handler;
36
37/// A structure which groups handlers and determines a way how handlers are triggered
38#[derive(Default, Clone)]
39pub struct Group {
40    handlers: Vec<Arc<Mutex<dyn Handler>>>,
41}
42
43impl Group {
44    pub fn new() -> Self {
45        Default::default()
46    }
47
48    /// Adds a handler to handler group (builder pattern)
49    ///
50    /// # Arguments
51    /// * `handler` - handler to add
52    ///
53    /// # Returns
54    /// * Group handler
55    pub fn add_handler(mut self, handler: Arc<Mutex<dyn Handler>>) -> Self {
56        self.handlers.push(handler);
57        self
58    }
59
60    /// Adds a handler to handler group (mut reference)
61    ///
62    /// # Arguments
63    /// * `handler` - handler to add
64    pub fn add_handler_mut(&mut self, handler: Arc<Mutex<dyn Handler>>) {
65        self.handlers.push(handler);
66    }
67
68    /// Iterates through handlers
69    pub fn subhandlers(&self) -> &[Arc<Mutex<dyn Handler>>] {
70        &self.handlers
71    }
72}
73
74impl Handler for Group {
75    fn start(
76        &mut self,
77        path: &Path,
78        matcher_idx: usize,
79        token: Token,
80    ) -> Result<Option<Vec<u8>>, error::Handler> {
81        let mut result = None;
82        for handler in self.handlers.iter() {
83            let mut guard = handler.lock().unwrap();
84            if guard.is_converter() {
85                let orig_result = result.take();
86                result = guard.start(path, matcher_idx, token.clone())?;
87                if let Some(orig_data) = orig_result {
88                    let feed_output = guard.feed(&orig_data, matcher_idx)?;
89                    if let Some(mut data) = result.take() {
90                        if let Some(feed_data) = feed_output {
91                            data.extend(feed_data);
92                            result = Some(data);
93                        }
94                    } else {
95                        result = feed_output;
96                    }
97                }
98            } else {
99                guard.start(path, matcher_idx, token.clone())?;
100                if let Some(data) = result.as_ref() {
101                    guard.feed(data, matcher_idx)?;
102                }
103            }
104        }
105        Ok(result)
106    }
107
108    fn feed(&mut self, data: &[u8], matcher_idx: usize) -> Result<Option<Vec<u8>>, error::Handler> {
109        let mut result = Some(data.to_vec());
110        for handler in self.handlers.iter() {
111            let mut guard = handler.lock().unwrap();
112            if let Some(data) = result.take() {
113                if guard.is_converter() {
114                    result = guard.feed(&data, matcher_idx)?;
115                } else {
116                    guard.feed(&data, matcher_idx)?;
117                    result = Some(data)
118                }
119            } else {
120                // data were consumed
121                break;
122            }
123        }
124        Ok(result)
125    }
126
127    fn end(
128        &mut self,
129        path: &Path,
130        matcher_idx: usize,
131        token: Token,
132    ) -> Result<Option<Vec<u8>>, error::Handler> {
133        let mut result: Option<Vec<u8>> = None;
134        for handler in self.handlers.iter() {
135            let mut guard = handler.lock().unwrap();
136            if guard.is_converter() {
137                // Feed with data if there are some data remaining
138                if let Some(data) = result.take() {
139                    result = guard.feed(&data, matcher_idx)?;
140                }
141
142                if let Some(data) = guard.end(path, matcher_idx, token.clone())? {
143                    if let Some(mut result_data) = result.take() {
144                        result_data.extend(data);
145                        result = Some(result_data);
146                    } else {
147                        result = Some(data);
148                    }
149                }
150            } else {
151                if let Some(data) = result.as_ref() {
152                    guard.feed(data, matcher_idx)?;
153                }
154                guard.end(path, matcher_idx, token.clone())?;
155            }
156        }
157        Ok(result)
158    }
159
160    fn is_converter(&self) -> bool {
161        self.handlers
162            .iter()
163            .any(|e| e.lock().unwrap().is_converter())
164    }
165
166    fn as_any(&self) -> &dyn Any {
167        self
168    }
169}
170
171impl ops::Add for Group {
172    type Output = Self;
173    fn add(self, rhs: Self) -> Self::Output {
174        let mut joined = Self::new();
175        self.subhandlers()
176            .iter()
177            .for_each(|h| joined.add_handler_mut(h.clone()));
178        rhs.subhandlers()
179            .iter()
180            .for_each(|h| joined.add_handler_mut(h.clone()));
181
182        joined
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::Group;
189    use crate::{
190        handler::{Buffer, Replace, Shorten},
191        matcher::Simple,
192        strategy::{Convert, Extract, Filter, OutputConverter, Strategy, Trigger},
193    };
194    use std::sync::{Arc, Mutex};
195
196    fn prepare_handlers() -> (
197        Arc<Mutex<Buffer>>,
198        Arc<Mutex<Buffer>>,
199        Arc<Mutex<Buffer>>,
200        Arc<Mutex<Replace>>,
201        Arc<Mutex<Shorten>>,
202    ) {
203        (
204            Arc::new(Mutex::new(Buffer::new())),
205            Arc::new(Mutex::new(Buffer::new())),
206            Arc::new(Mutex::new(Buffer::new())),
207            Arc::new(Mutex::new(Replace::new(br#""ccccc""#.to_vec()))),
208            Arc::new(Mutex::new(Shorten::new(3, r#"..""#.into()))),
209        )
210    }
211
212    #[test]
213    fn test_convert() {
214        let mut convert = Convert::new();
215        let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
216        let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
217        let group = Group::new()
218            .add_handler(buffer1.clone())
219            .add_handler(replace.clone())
220            .add_handler(buffer2.clone())
221            .add_handler(shorten.clone())
222            .add_handler(buffer3.clone());
223
224        convert.add_matcher(Box::new(matcher), Arc::new(Mutex::new(group)));
225
226        let output = OutputConverter::new()
227            .convert(
228                &convert
229                    .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
230                    .unwrap(),
231            )
232            .into_iter()
233            .map(|e| e.1)
234            .collect::<Vec<Vec<u8>>>();
235
236        // output
237        assert_eq!(
238            String::from_utf8(output.into_iter().flatten().collect()).unwrap(),
239            r#"[{"desc": "ccc.."}, {"desc": "ccc.."}]"#
240        );
241
242        // buffer1
243        assert_eq!(
244            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
245            r#""aa""#
246        );
247        assert_eq!(
248            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
249            r#""bbbbbb""#
250        );
251        assert!(buffer1.lock().unwrap().pop().is_none());
252
253        // buffer2
254        assert_eq!(
255            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
256            r#""ccccc""#
257        );
258        assert_eq!(
259            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
260            r#""ccccc""#
261        );
262        assert!(buffer2.lock().unwrap().pop().is_none());
263
264        // buffer3
265        assert_eq!(
266            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
267            r#""ccc..""#
268        );
269        assert_eq!(
270            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
271            r#""ccc..""#
272        );
273        assert!(buffer3.lock().unwrap().pop().is_none());
274    }
275
276    #[test]
277    fn test_trigger() {
278        let mut trigger = Trigger::new();
279        let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
280        let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
281        let group = Group::new()
282            .add_handler(buffer1.clone())
283            .add_handler(replace.clone())
284            .add_handler(buffer2.clone())
285            .add_handler(shorten.clone())
286            .add_handler(buffer3.clone());
287
288        trigger.add_matcher(Box::new(matcher), Arc::new(Mutex::new(group)));
289
290        trigger
291            .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
292            .unwrap();
293
294        // buffer1
295        assert_eq!(
296            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
297            r#""aa""#
298        );
299        assert_eq!(
300            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
301            r#""bbbbbb""#
302        );
303        assert!(buffer1.lock().unwrap().pop().is_none());
304
305        // buffer2
306        assert_eq!(
307            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
308            r#""ccccc""#
309        );
310        assert_eq!(
311            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
312            r#""ccccc""#
313        );
314        assert!(buffer2.lock().unwrap().pop().is_none());
315
316        // buffer3
317        assert_eq!(
318            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
319            r#""ccc..""#
320        );
321        assert_eq!(
322            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
323            r#""ccc..""#
324        );
325        assert!(buffer3.lock().unwrap().pop().is_none());
326    }
327
328    #[test]
329    fn test_filter() {
330        let mut filter = Filter::new();
331        let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
332        let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
333        let group = Group::new()
334            .add_handler(buffer1.clone())
335            .add_handler(replace.clone())
336            .add_handler(buffer2.clone())
337            .add_handler(shorten.clone())
338            .add_handler(buffer3.clone());
339
340        filter.add_matcher(Box::new(matcher), Some(Arc::new(Mutex::new(group))));
341
342        let output: Vec<u8> = OutputConverter::new()
343            .convert(
344                &filter
345                    .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
346                    .unwrap(),
347            )
348            .into_iter()
349            .map(|e| e.1)
350            .flatten()
351            .collect();
352
353        // output
354        assert_eq!(String::from_utf8(output).unwrap(), r#"[{}, {}]"#);
355
356        // buffer1
357        assert_eq!(
358            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
359            r#""aa""#
360        );
361        assert_eq!(
362            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
363            r#""bbbbbb""#
364        );
365        assert!(buffer1.lock().unwrap().pop().is_none());
366
367        // buffer2
368        assert_eq!(
369            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
370            r#""ccccc""#
371        );
372        assert_eq!(
373            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
374            r#""ccccc""#
375        );
376        assert!(buffer2.lock().unwrap().pop().is_none());
377
378        // buffer3
379        assert_eq!(
380            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
381            r#""ccc..""#
382        );
383        assert_eq!(
384            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
385            r#""ccc..""#
386        );
387        assert!(buffer3.lock().unwrap().pop().is_none());
388    }
389
390    #[test]
391    fn test_extract() {
392        let mut extract = Extract::new();
393        let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
394        let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
395        let group = Group::new()
396            .add_handler(buffer1.clone())
397            .add_handler(replace.clone())
398            .add_handler(buffer2.clone())
399            .add_handler(shorten.clone())
400            .add_handler(buffer3.clone());
401
402        extract.add_matcher(Box::new(matcher), Some(Arc::new(Mutex::new(group))));
403
404        let output: Vec<u8> = OutputConverter::new()
405            .convert(
406                &extract
407                    .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
408                    .unwrap(),
409            )
410            .into_iter()
411            .map(|e| e.1)
412            .flatten()
413            .collect();
414
415        // output
416        assert_eq!(String::from_utf8(output).unwrap(), r#""aa""bbbbbb""#);
417
418        // buffer1
419        assert_eq!(
420            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
421            r#""aa""#
422        );
423        assert_eq!(
424            String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
425            r#""bbbbbb""#
426        );
427        assert!(buffer1.lock().unwrap().pop().is_none());
428
429        // buffer2
430        assert_eq!(
431            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
432            r#""ccccc""#
433        );
434        assert_eq!(
435            String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
436            r#""ccccc""#
437        );
438        assert!(buffer2.lock().unwrap().pop().is_none());
439
440        // buffer3
441        assert_eq!(
442            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
443            r#""ccc..""#
444        );
445        assert_eq!(
446            String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
447            r#""ccc..""#
448        );
449        assert!(buffer3.lock().unwrap().pop().is_none());
450    }
451}