1use std::{
28 any::Any,
29 ops,
30 sync::{Arc, Mutex},
31};
32
33use crate::{error, path::Path, streamer::Token};
34
35use super::Handler;
36
37#[derive(Default, Clone)]
39pub struct Group {
40 handlers: Vec<Arc<Mutex<dyn Handler>>>,
41}
42
43impl Group {
44 pub fn new() -> Self {
45 Default::default()
46 }
47
48 pub fn add_handler(mut self, handler: Arc<Mutex<dyn Handler>>) -> Self {
56 self.handlers.push(handler);
57 self
58 }
59
60 pub fn add_handler_mut(&mut self, handler: Arc<Mutex<dyn Handler>>) {
65 self.handlers.push(handler);
66 }
67
68 pub fn subhandlers(&self) -> &[Arc<Mutex<dyn Handler>>] {
70 &self.handlers
71 }
72}
73
74impl Handler for Group {
75 fn start(
76 &mut self,
77 path: &Path,
78 matcher_idx: usize,
79 token: Token,
80 ) -> Result<Option<Vec<u8>>, error::Handler> {
81 let mut result = None;
82 for handler in self.handlers.iter() {
83 let mut guard = handler.lock().unwrap();
84 if guard.is_converter() {
85 let orig_result = result.take();
86 result = guard.start(path, matcher_idx, token.clone())?;
87 if let Some(orig_data) = orig_result {
88 let feed_output = guard.feed(&orig_data, matcher_idx)?;
89 if let Some(mut data) = result.take() {
90 if let Some(feed_data) = feed_output {
91 data.extend(feed_data);
92 result = Some(data);
93 }
94 } else {
95 result = feed_output;
96 }
97 }
98 } else {
99 guard.start(path, matcher_idx, token.clone())?;
100 if let Some(data) = result.as_ref() {
101 guard.feed(data, matcher_idx)?;
102 }
103 }
104 }
105 Ok(result)
106 }
107
108 fn feed(&mut self, data: &[u8], matcher_idx: usize) -> Result<Option<Vec<u8>>, error::Handler> {
109 let mut result = Some(data.to_vec());
110 for handler in self.handlers.iter() {
111 let mut guard = handler.lock().unwrap();
112 if let Some(data) = result.take() {
113 if guard.is_converter() {
114 result = guard.feed(&data, matcher_idx)?;
115 } else {
116 guard.feed(&data, matcher_idx)?;
117 result = Some(data)
118 }
119 } else {
120 break;
122 }
123 }
124 Ok(result)
125 }
126
127 fn end(
128 &mut self,
129 path: &Path,
130 matcher_idx: usize,
131 token: Token,
132 ) -> Result<Option<Vec<u8>>, error::Handler> {
133 let mut result: Option<Vec<u8>> = None;
134 for handler in self.handlers.iter() {
135 let mut guard = handler.lock().unwrap();
136 if guard.is_converter() {
137 if let Some(data) = result.take() {
139 result = guard.feed(&data, matcher_idx)?;
140 }
141
142 if let Some(data) = guard.end(path, matcher_idx, token.clone())? {
143 if let Some(mut result_data) = result.take() {
144 result_data.extend(data);
145 result = Some(result_data);
146 } else {
147 result = Some(data);
148 }
149 }
150 } else {
151 if let Some(data) = result.as_ref() {
152 guard.feed(data, matcher_idx)?;
153 }
154 guard.end(path, matcher_idx, token.clone())?;
155 }
156 }
157 Ok(result)
158 }
159
160 fn is_converter(&self) -> bool {
161 self.handlers
162 .iter()
163 .any(|e| e.lock().unwrap().is_converter())
164 }
165
166 fn as_any(&self) -> &dyn Any {
167 self
168 }
169}
170
171impl ops::Add for Group {
172 type Output = Self;
173 fn add(self, rhs: Self) -> Self::Output {
174 let mut joined = Self::new();
175 self.subhandlers()
176 .iter()
177 .for_each(|h| joined.add_handler_mut(h.clone()));
178 rhs.subhandlers()
179 .iter()
180 .for_each(|h| joined.add_handler_mut(h.clone()));
181
182 joined
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::Group;
189 use crate::{
190 handler::{Buffer, Replace, Shorten},
191 matcher::Simple,
192 strategy::{Convert, Extract, Filter, OutputConverter, Strategy, Trigger},
193 };
194 use std::sync::{Arc, Mutex};
195
196 fn prepare_handlers() -> (
197 Arc<Mutex<Buffer>>,
198 Arc<Mutex<Buffer>>,
199 Arc<Mutex<Buffer>>,
200 Arc<Mutex<Replace>>,
201 Arc<Mutex<Shorten>>,
202 ) {
203 (
204 Arc::new(Mutex::new(Buffer::new())),
205 Arc::new(Mutex::new(Buffer::new())),
206 Arc::new(Mutex::new(Buffer::new())),
207 Arc::new(Mutex::new(Replace::new(br#""ccccc""#.to_vec()))),
208 Arc::new(Mutex::new(Shorten::new(3, r#"..""#.into()))),
209 )
210 }
211
212 #[test]
213 fn test_convert() {
214 let mut convert = Convert::new();
215 let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
216 let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
217 let group = Group::new()
218 .add_handler(buffer1.clone())
219 .add_handler(replace.clone())
220 .add_handler(buffer2.clone())
221 .add_handler(shorten.clone())
222 .add_handler(buffer3.clone());
223
224 convert.add_matcher(Box::new(matcher), Arc::new(Mutex::new(group)));
225
226 let output = OutputConverter::new()
227 .convert(
228 &convert
229 .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
230 .unwrap(),
231 )
232 .into_iter()
233 .map(|e| e.1)
234 .collect::<Vec<Vec<u8>>>();
235
236 assert_eq!(
238 String::from_utf8(output.into_iter().flatten().collect()).unwrap(),
239 r#"[{"desc": "ccc.."}, {"desc": "ccc.."}]"#
240 );
241
242 assert_eq!(
244 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
245 r#""aa""#
246 );
247 assert_eq!(
248 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
249 r#""bbbbbb""#
250 );
251 assert!(buffer1.lock().unwrap().pop().is_none());
252
253 assert_eq!(
255 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
256 r#""ccccc""#
257 );
258 assert_eq!(
259 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
260 r#""ccccc""#
261 );
262 assert!(buffer2.lock().unwrap().pop().is_none());
263
264 assert_eq!(
266 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
267 r#""ccc..""#
268 );
269 assert_eq!(
270 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
271 r#""ccc..""#
272 );
273 assert!(buffer3.lock().unwrap().pop().is_none());
274 }
275
276 #[test]
277 fn test_trigger() {
278 let mut trigger = Trigger::new();
279 let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
280 let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
281 let group = Group::new()
282 .add_handler(buffer1.clone())
283 .add_handler(replace.clone())
284 .add_handler(buffer2.clone())
285 .add_handler(shorten.clone())
286 .add_handler(buffer3.clone());
287
288 trigger.add_matcher(Box::new(matcher), Arc::new(Mutex::new(group)));
289
290 trigger
291 .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
292 .unwrap();
293
294 assert_eq!(
296 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
297 r#""aa""#
298 );
299 assert_eq!(
300 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
301 r#""bbbbbb""#
302 );
303 assert!(buffer1.lock().unwrap().pop().is_none());
304
305 assert_eq!(
307 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
308 r#""ccccc""#
309 );
310 assert_eq!(
311 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
312 r#""ccccc""#
313 );
314 assert!(buffer2.lock().unwrap().pop().is_none());
315
316 assert_eq!(
318 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
319 r#""ccc..""#
320 );
321 assert_eq!(
322 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
323 r#""ccc..""#
324 );
325 assert!(buffer3.lock().unwrap().pop().is_none());
326 }
327
328 #[test]
329 fn test_filter() {
330 let mut filter = Filter::new();
331 let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
332 let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
333 let group = Group::new()
334 .add_handler(buffer1.clone())
335 .add_handler(replace.clone())
336 .add_handler(buffer2.clone())
337 .add_handler(shorten.clone())
338 .add_handler(buffer3.clone());
339
340 filter.add_matcher(Box::new(matcher), Some(Arc::new(Mutex::new(group))));
341
342 let output: Vec<u8> = OutputConverter::new()
343 .convert(
344 &filter
345 .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
346 .unwrap(),
347 )
348 .into_iter()
349 .map(|e| e.1)
350 .flatten()
351 .collect();
352
353 assert_eq!(String::from_utf8(output).unwrap(), r#"[{}, {}]"#);
355
356 assert_eq!(
358 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
359 r#""aa""#
360 );
361 assert_eq!(
362 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
363 r#""bbbbbb""#
364 );
365 assert!(buffer1.lock().unwrap().pop().is_none());
366
367 assert_eq!(
369 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
370 r#""ccccc""#
371 );
372 assert_eq!(
373 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
374 r#""ccccc""#
375 );
376 assert!(buffer2.lock().unwrap().pop().is_none());
377
378 assert_eq!(
380 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
381 r#""ccc..""#
382 );
383 assert_eq!(
384 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
385 r#""ccc..""#
386 );
387 assert!(buffer3.lock().unwrap().pop().is_none());
388 }
389
390 #[test]
391 fn test_extract() {
392 let mut extract = Extract::new();
393 let (buffer1, buffer2, buffer3, replace, shorten) = prepare_handlers();
394 let matcher = Simple::new(r#"[]{"desc"}"#).unwrap();
395 let group = Group::new()
396 .add_handler(buffer1.clone())
397 .add_handler(replace.clone())
398 .add_handler(buffer2.clone())
399 .add_handler(shorten.clone())
400 .add_handler(buffer3.clone());
401
402 extract.add_matcher(Box::new(matcher), Some(Arc::new(Mutex::new(group))));
403
404 let output: Vec<u8> = OutputConverter::new()
405 .convert(
406 &extract
407 .process(br#"[{"desc": "aa"}, {"desc": "bbbbbb"}]"#)
408 .unwrap(),
409 )
410 .into_iter()
411 .map(|e| e.1)
412 .flatten()
413 .collect();
414
415 assert_eq!(String::from_utf8(output).unwrap(), r#""aa""bbbbbb""#);
417
418 assert_eq!(
420 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
421 r#""aa""#
422 );
423 assert_eq!(
424 String::from_utf8(buffer1.lock().unwrap().pop().unwrap().1).unwrap(),
425 r#""bbbbbb""#
426 );
427 assert!(buffer1.lock().unwrap().pop().is_none());
428
429 assert_eq!(
431 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
432 r#""ccccc""#
433 );
434 assert_eq!(
435 String::from_utf8(buffer2.lock().unwrap().pop().unwrap().1).unwrap(),
436 r#""ccccc""#
437 );
438 assert!(buffer2.lock().unwrap().pop().is_none());
439
440 assert_eq!(
442 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
443 r#""ccc..""#
444 );
445 assert_eq!(
446 String::from_utf8(buffer3.lock().unwrap().pop().unwrap().1).unwrap(),
447 r#""ccc..""#
448 );
449 assert!(buffer3.lock().unwrap().pop().is_none());
450 }
451}