1use async_trait::async_trait;
10use coreon_core::{message::Body, Exchange, Processor, Result};
11use std::sync::Arc;
12
13pub type SplitFn = dyn Fn(&Exchange) -> Vec<Body> + Send + Sync;
14
15pub struct Splitter {
16 split: Arc<SplitFn>,
17 sub: Arc<dyn Processor>,
18}
19
20impl Splitter {
21 pub fn new<F>(split: F, sub: Arc<dyn Processor>) -> Arc<Self>
22 where
23 F: Fn(&Exchange) -> Vec<Body> + Send + Sync + 'static,
24 {
25 Arc::new(Self {
26 split: Arc::new(split),
27 sub,
28 })
29 }
30}
31
32#[async_trait]
33impl Processor for Splitter {
34 async fn process(&self, exchange: &mut Exchange) -> Result<()> {
35 let parts = (self.split)(exchange);
36 for body in parts {
37 let mut sub_ex = exchange.clone();
38 sub_ex.r#in.body = body;
39 sub_ex.out = None;
40 self.sub.process(&mut sub_ex).await?;
41 }
42 Ok(())
43 }
44}