coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! RecipientList — compute recipient URIs from the exchange at runtime and
//! send to each, sequentially.

use async_trait::async_trait;
use coreon_core::{CamelContext, Exchange, Processor, Result};
use std::sync::{Arc, Weak};

pub type RecipientFn = dyn Fn(&Exchange) -> Vec<String> + Send + Sync;

pub struct RecipientList {
    recipients: Arc<RecipientFn>,
    ctx: Weak<CamelContext>,
}

impl RecipientList {
    pub fn new<F>(f: F, ctx: &Arc<CamelContext>) -> Arc<Self>
    where
        F: Fn(&Exchange) -> Vec<String> + Send + Sync + 'static,
    {
        Arc::new(Self {
            recipients: Arc::new(f),
            ctx: Arc::downgrade(ctx),
        })
    }
}

#[async_trait]
impl Processor for RecipientList {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let ctx = self
            .ctx
            .upgrade()
            .ok_or_else(|| coreon_core::CamelError::NotRunning)?;
        let uris = (self.recipients)(exchange);
        for uri in uris {
            let mut copy = exchange.clone();
            ctx.send(&uri, &mut copy).await?;
        }
        Ok(())
    }
}