coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! Multicast — run N processors, each on a clone of the exchange, in parallel.
//!
//! Mirrors Camel's parallel multicast (no aggregation — see Aggregator for that).
//! The original exchange's in-message is unchanged; any branch's `out` is
//! discarded.

use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use futures::future::join_all;
use std::sync::Arc;

pub struct Multicast {
    branches: Vec<Arc<dyn Processor>>,
}

impl Multicast {
    pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
        Arc::new(Self { branches })
    }
}

#[async_trait]
impl Processor for Multicast {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let futures: Vec<_> = self
            .branches
            .iter()
            .map(|b| {
                let mut copy = exchange.clone();
                let b = b.clone();
                async move { b.process(&mut copy).await }
            })
            .collect();
        // Collect all results; surface the first error but let all branches
        // attempt. This matches Camel's stopOnException=false default.
        let results = join_all(futures).await;
        for r in results {
            r?;
        }
        Ok(())
    }
}