use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use std::{sync::Arc, time::Duration};
use tracing::warn;
#[derive(Clone, Copy, Debug)]
pub struct RedeliveryPolicy {
pub max_redeliveries: u32,
pub initial_backoff: Duration,
pub backoff_multiplier: f64,
}
impl RedeliveryPolicy {
pub fn once() -> Self {
Self {
max_redeliveries: 0,
initial_backoff: Duration::ZERO,
backoff_multiplier: 1.0,
}
}
pub fn times(n: u32, initial_backoff: Duration) -> Self {
Self {
max_redeliveries: n,
initial_backoff,
backoff_multiplier: 2.0,
}
}
}
pub struct OnException {
target: Arc<dyn Processor>,
policy: RedeliveryPolicy,
dead_letter: Option<Arc<dyn Processor>>,
}
impl OnException {
pub fn new(
target: Arc<dyn Processor>,
policy: RedeliveryPolicy,
dead_letter: Option<Arc<dyn Processor>>,
) -> Arc<Self> {
Arc::new(Self {
target,
policy,
dead_letter,
})
}
}
#[async_trait]
impl Processor for OnException {
async fn process(&self, exchange: &mut Exchange) -> Result<()> {
let mut attempt: u32 = 0;
let mut backoff = self.policy.initial_backoff;
loop {
match self.target.process(exchange).await {
Ok(()) => return Ok(()),
Err(e) if attempt < self.policy.max_redeliveries => {
warn!(
attempt = attempt + 1,
max = self.policy.max_redeliveries,
error = %e,
"retrying after error"
);
if backoff > Duration::ZERO {
tokio::time::sleep(backoff).await;
backoff = backoff.mul_f64(self.policy.backoff_multiplier);
}
attempt += 1;
}
Err(e) => {
exchange
.properties
.insert("camel.exception".into(), e.to_string());
if let Some(dlc) = &self.dead_letter {
return dlc.process(exchange).await;
}
return Err(e);
}
}
}
}
}