Skip to main content

coreon_eip/
multicast.rs

1//! Multicast — run N processors, each on a clone of the exchange, in parallel.
2//!
3//! Mirrors Camel's parallel multicast (no aggregation — see Aggregator for that).
4//! The original exchange's in-message is unchanged; any branch's `out` is
5//! discarded.
6
7use async_trait::async_trait;
8use coreon_core::{Exchange, Processor, Result};
9use futures::future::join_all;
10use std::sync::Arc;
11
12pub struct Multicast {
13    branches: Vec<Arc<dyn Processor>>,
14}
15
16impl Multicast {
17    pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
18        Arc::new(Self { branches })
19    }
20}
21
22#[async_trait]
23impl Processor for Multicast {
24    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
25        let futures: Vec<_> = self
26            .branches
27            .iter()
28            .map(|b| {
29                let mut copy = exchange.clone();
30                let b = b.clone();
31                async move { b.process(&mut copy).await }
32            })
33            .collect();
34        // Collect all results; surface the first error but let all branches
35        // attempt. This matches Camel's stopOnException=false default.
36        let results = join_all(futures).await;
37        for r in results {
38            r?;
39        }
40        Ok(())
41    }
42}