coreon_eip/
on_exception.rs1use async_trait::async_trait;
9use coreon_core::{Exchange, Processor, Result};
10use std::{sync::Arc, time::Duration};
11use tracing::warn;
12
13#[derive(Clone, Copy, Debug)]
14pub struct RedeliveryPolicy {
15 pub max_redeliveries: u32,
16 pub initial_backoff: Duration,
17 pub backoff_multiplier: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn once() -> Self {
23 Self {
24 max_redeliveries: 0,
25 initial_backoff: Duration::ZERO,
26 backoff_multiplier: 1.0,
27 }
28 }
29
30 pub fn times(n: u32, initial_backoff: Duration) -> Self {
31 Self {
32 max_redeliveries: n,
33 initial_backoff,
34 backoff_multiplier: 2.0,
35 }
36 }
37}
38
39pub struct OnException {
40 target: Arc<dyn Processor>,
41 policy: RedeliveryPolicy,
42 dead_letter: Option<Arc<dyn Processor>>,
43}
44
45impl OnException {
46 pub fn new(
47 target: Arc<dyn Processor>,
48 policy: RedeliveryPolicy,
49 dead_letter: Option<Arc<dyn Processor>>,
50 ) -> Arc<Self> {
51 Arc::new(Self {
52 target,
53 policy,
54 dead_letter,
55 })
56 }
57}
58
59#[async_trait]
60impl Processor for OnException {
61 async fn process(&self, exchange: &mut Exchange) -> Result<()> {
62 let mut attempt: u32 = 0;
63 let mut backoff = self.policy.initial_backoff;
64 loop {
65 match self.target.process(exchange).await {
66 Ok(()) => return Ok(()),
67 Err(e) if attempt < self.policy.max_redeliveries => {
68 warn!(
69 attempt = attempt + 1,
70 max = self.policy.max_redeliveries,
71 error = %e,
72 "retrying after error"
73 );
74 if backoff > Duration::ZERO {
75 tokio::time::sleep(backoff).await;
76 backoff = backoff.mul_f64(self.policy.backoff_multiplier);
77 }
78 attempt += 1;
79 }
80 Err(e) => {
81 exchange
82 .properties
83 .insert("camel.exception".into(), e.to_string());
84 if let Some(dlc) = &self.dead_letter {
85 return dlc.process(exchange).await;
86 }
87 return Err(e);
88 }
89 }
90 }
91 }
92}