coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! LoadBalancer — distribute exchanges across processors.
//!
//! Two strategies for MVP:
//! - [`RoundRobin`]: cycle through branches.
//! - [`Failover`]: try branches in order; skip to next on error.

use async_trait::async_trait;
use coreon_core::{CamelError, Exchange, Processor, Result};
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};

pub struct RoundRobin {
    branches: Vec<Arc<dyn Processor>>,
    counter: AtomicUsize,
}

impl RoundRobin {
    pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
        assert!(!branches.is_empty(), "RoundRobin needs at least one branch");
        Arc::new(Self {
            branches,
            counter: AtomicUsize::new(0),
        })
    }
}

#[async_trait]
impl Processor for RoundRobin {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.branches.len();
        self.branches[idx].process(exchange).await
    }
}

pub struct Failover {
    branches: Vec<Arc<dyn Processor>>,
}

impl Failover {
    pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
        assert!(!branches.is_empty(), "Failover needs at least one branch");
        Arc::new(Self { branches })
    }
}

#[async_trait]
impl Processor for Failover {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let mut last_err: Option<CamelError> = None;
        for b in &self.branches {
            match b.process(exchange).await {
                Ok(()) => return Ok(()),
                Err(e) => last_err = Some(e),
            }
        }
        Err(last_err.unwrap_or_else(|| CamelError::Processor("Failover: no branches".into())))
    }
}