Skip to main content

coreon_eip/
splitter.rs

1//! Splitter — split an exchange's body into N items; run the sub-pipeline
2//! once per item. Matches Camel's `split().body()` pattern.
3//!
4//! The split function returns a `Vec<Body>`; each becomes a fresh Exchange
5//! whose `in.body` is that item. Other headers/properties are copied from
6//! the original (MVP). Results are discarded — aggregation is out of scope
7//! until the Aggregator EIP arrives.
8
9use async_trait::async_trait;
10use coreon_core::{message::Body, Exchange, Processor, Result};
11use std::sync::Arc;
12
13pub type SplitFn = dyn Fn(&Exchange) -> Vec<Body> + Send + Sync;
14
15pub struct Splitter {
16    split: Arc<SplitFn>,
17    sub: Arc<dyn Processor>,
18}
19
20impl Splitter {
21    pub fn new<F>(split: F, sub: Arc<dyn Processor>) -> Arc<Self>
22    where
23        F: Fn(&Exchange) -> Vec<Body> + Send + Sync + 'static,
24    {
25        Arc::new(Self {
26            split: Arc::new(split),
27            sub,
28        })
29    }
30}
31
32#[async_trait]
33impl Processor for Splitter {
34    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
35        let parts = (self.split)(exchange);
36        for body in parts {
37            let mut sub_ex = exchange.clone();
38            sub_ex.r#in.body = body;
39            sub_ex.out = None;
40            self.sub.process(&mut sub_ex).await?;
41        }
42        Ok(())
43    }
44}