Skip to main content

coreon_eip/
on_exception.rs

1//! OnException — wrap a processor with a retry policy and a dead-letter fallback.
2//!
3//! Combines three Camel concepts into one MVP struct:
4//! - RedeliveryPolicy: how many times, with what backoff
5//! - OnException: when to engage (matched errors — MVP: always)
6//! - Dead-Letter-Channel: fallback processor if all retries fail
7
8use async_trait::async_trait;
9use coreon_core::{Exchange, Processor, Result};
10use std::{sync::Arc, time::Duration};
11use tracing::warn;
12
13#[derive(Clone, Copy, Debug)]
14pub struct RedeliveryPolicy {
15    pub max_redeliveries: u32,
16    pub initial_backoff: Duration,
17    /// Backoff multiplier applied between attempts (>= 1.0). 1.0 = constant.
18    pub backoff_multiplier: f64,
19}
20
21impl RedeliveryPolicy {
22    pub fn once() -> Self {
23        Self {
24            max_redeliveries: 0,
25            initial_backoff: Duration::ZERO,
26            backoff_multiplier: 1.0,
27        }
28    }
29
30    pub fn times(n: u32, initial_backoff: Duration) -> Self {
31        Self {
32            max_redeliveries: n,
33            initial_backoff,
34            backoff_multiplier: 2.0,
35        }
36    }
37}
38
39pub struct OnException {
40    target: Arc<dyn Processor>,
41    policy: RedeliveryPolicy,
42    dead_letter: Option<Arc<dyn Processor>>,
43}
44
45impl OnException {
46    pub fn new(
47        target: Arc<dyn Processor>,
48        policy: RedeliveryPolicy,
49        dead_letter: Option<Arc<dyn Processor>>,
50    ) -> Arc<Self> {
51        Arc::new(Self {
52            target,
53            policy,
54            dead_letter,
55        })
56    }
57}
58
59#[async_trait]
60impl Processor for OnException {
61    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
62        let mut attempt: u32 = 0;
63        let mut backoff = self.policy.initial_backoff;
64        loop {
65            match self.target.process(exchange).await {
66                Ok(()) => return Ok(()),
67                Err(e) if attempt < self.policy.max_redeliveries => {
68                    warn!(
69                        attempt = attempt + 1,
70                        max = self.policy.max_redeliveries,
71                        error = %e,
72                        "retrying after error"
73                    );
74                    if backoff > Duration::ZERO {
75                        tokio::time::sleep(backoff).await;
76                        backoff = backoff.mul_f64(self.policy.backoff_multiplier);
77                    }
78                    attempt += 1;
79                }
80                Err(e) => {
81                    exchange
82                        .properties
83                        .insert("camel.exception".into(), e.to_string());
84                    if let Some(dlc) = &self.dead_letter {
85                        return dlc.process(exchange).await;
86                    }
87                    return Err(e);
88                }
89            }
90        }
91    }
92}