Skip to main content

coreon_eip/
load_balancer.rs

1//! LoadBalancer — distribute exchanges across processors.
2//!
3//! Two strategies for MVP:
4//! - [`RoundRobin`]: cycle through branches.
5//! - [`Failover`]: try branches in order; skip to next on error.
6
7use async_trait::async_trait;
8use coreon_core::{CamelError, Exchange, Processor, Result};
9use std::sync::{
10    atomic::{AtomicUsize, Ordering},
11    Arc,
12};
13
14pub struct RoundRobin {
15    branches: Vec<Arc<dyn Processor>>,
16    counter: AtomicUsize,
17}
18
19impl RoundRobin {
20    pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
21        assert!(!branches.is_empty(), "RoundRobin needs at least one branch");
22        Arc::new(Self {
23            branches,
24            counter: AtomicUsize::new(0),
25        })
26    }
27}
28
29#[async_trait]
30impl Processor for RoundRobin {
31    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
32        let idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.branches.len();
33        self.branches[idx].process(exchange).await
34    }
35}
36
37pub struct Failover {
38    branches: Vec<Arc<dyn Processor>>,
39}
40
41impl Failover {
42    pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
43        assert!(!branches.is_empty(), "Failover needs at least one branch");
44        Arc::new(Self { branches })
45    }
46}
47
48#[async_trait]
49impl Processor for Failover {
50    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
51        let mut last_err: Option<CamelError> = None;
52        for b in &self.branches {
53            match b.process(exchange).await {
54                Ok(()) => return Ok(()),
55                Err(e) => last_err = Some(e),
56            }
57        }
58        Err(last_err.unwrap_or_else(|| CamelError::Processor("Failover: no branches".into())))
59    }
60}