coreon_eip/
load_balancer.rs1use async_trait::async_trait;
8use coreon_core::{CamelError, Exchange, Processor, Result};
9use std::sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12};
13
14pub struct RoundRobin {
15 branches: Vec<Arc<dyn Processor>>,
16 counter: AtomicUsize,
17}
18
19impl RoundRobin {
20 pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
21 assert!(!branches.is_empty(), "RoundRobin needs at least one branch");
22 Arc::new(Self {
23 branches,
24 counter: AtomicUsize::new(0),
25 })
26 }
27}
28
29#[async_trait]
30impl Processor for RoundRobin {
31 async fn process(&self, exchange: &mut Exchange) -> Result<()> {
32 let idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.branches.len();
33 self.branches[idx].process(exchange).await
34 }
35}
36
37pub struct Failover {
38 branches: Vec<Arc<dyn Processor>>,
39}
40
41impl Failover {
42 pub fn new(branches: Vec<Arc<dyn Processor>>) -> Arc<Self> {
43 assert!(!branches.is_empty(), "Failover needs at least one branch");
44 Arc::new(Self { branches })
45 }
46}
47
48#[async_trait]
49impl Processor for Failover {
50 async fn process(&self, exchange: &mut Exchange) -> Result<()> {
51 let mut last_err: Option<CamelError> = None;
52 for b in &self.branches {
53 match b.process(exchange).await {
54 Ok(()) => return Ok(()),
55 Err(e) => last_err = Some(e),
56 }
57 }
58 Err(last_err.unwrap_or_else(|| CamelError::Processor("Failover: no branches".into())))
59 }
60}