coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! OnException — wrap a processor with a retry policy and a dead-letter fallback.
//!
//! Combines three Camel concepts into one MVP struct:
//! - RedeliveryPolicy: how many times, with what backoff
//! - OnException: when to engage (matched errors — MVP: always)
//! - Dead-Letter-Channel: fallback processor if all retries fail

use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use std::{sync::Arc, time::Duration};
use tracing::warn;

#[derive(Clone, Copy, Debug)]
pub struct RedeliveryPolicy {
    pub max_redeliveries: u32,
    pub initial_backoff: Duration,
    /// Backoff multiplier applied between attempts (>= 1.0). 1.0 = constant.
    pub backoff_multiplier: f64,
}

impl RedeliveryPolicy {
    pub fn once() -> Self {
        Self {
            max_redeliveries: 0,
            initial_backoff: Duration::ZERO,
            backoff_multiplier: 1.0,
        }
    }

    pub fn times(n: u32, initial_backoff: Duration) -> Self {
        Self {
            max_redeliveries: n,
            initial_backoff,
            backoff_multiplier: 2.0,
        }
    }
}

pub struct OnException {
    target: Arc<dyn Processor>,
    policy: RedeliveryPolicy,
    dead_letter: Option<Arc<dyn Processor>>,
}

impl OnException {
    pub fn new(
        target: Arc<dyn Processor>,
        policy: RedeliveryPolicy,
        dead_letter: Option<Arc<dyn Processor>>,
    ) -> Arc<Self> {
        Arc::new(Self {
            target,
            policy,
            dead_letter,
        })
    }
}

#[async_trait]
impl Processor for OnException {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let mut attempt: u32 = 0;
        let mut backoff = self.policy.initial_backoff;
        loop {
            match self.target.process(exchange).await {
                Ok(()) => return Ok(()),
                Err(e) if attempt < self.policy.max_redeliveries => {
                    warn!(
                        attempt = attempt + 1,
                        max = self.policy.max_redeliveries,
                        error = %e,
                        "retrying after error"
                    );
                    if backoff > Duration::ZERO {
                        tokio::time::sleep(backoff).await;
                        backoff = backoff.mul_f64(self.policy.backoff_multiplier);
                    }
                    attempt += 1;
                }
                Err(e) => {
                    exchange
                        .properties
                        .insert("camel.exception".into(), e.to_string());
                    if let Some(dlc) = &self.dead_letter {
                        return dlc.process(exchange).await;
                    }
                    return Err(e);
                }
            }
        }
    }
}