coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! Splitter — split an exchange's body into N items; run the sub-pipeline
//! once per item. Matches Camel's `split().body()` pattern.
//!
//! The split function returns a `Vec<Body>`; each becomes a fresh Exchange
//! whose `in.body` is that item. Other headers/properties are copied from
//! the original (MVP). Results are discarded — aggregation is out of scope
//! until the Aggregator EIP arrives.

use async_trait::async_trait;
use coreon_core::{message::Body, Exchange, Processor, Result};
use std::sync::Arc;

pub type SplitFn = dyn Fn(&Exchange) -> Vec<Body> + Send + Sync;

pub struct Splitter {
    split: Arc<SplitFn>,
    sub: Arc<dyn Processor>,
}

impl Splitter {
    pub fn new<F>(split: F, sub: Arc<dyn Processor>) -> Arc<Self>
    where
        F: Fn(&Exchange) -> Vec<Body> + Send + Sync + 'static,
    {
        Arc::new(Self {
            split: Arc::new(split),
            sub,
        })
    }
}

#[async_trait]
impl Processor for Splitter {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let parts = (self.split)(exchange);
        for body in parts {
            let mut sub_ex = exchange.clone();
            sub_ex.r#in.body = body;
            sub_ex.out = None;
            self.sub.process(&mut sub_ex).await?;
        }
        Ok(())
    }
}