strev 0.6.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use crate::error::HandlerError;
use crate::handler::{Handler, HandlerResult};
use crate::message::{Message, Pending};
use crate::middleware::Middleware;

pub struct DelayOnError {
    pub initial_interval: Duration,
    pub max_interval: Duration,
    pub multiplier: f64,
}

impl Middleware for DelayOnError {
    fn wrap(&self, next: Box<dyn Handler>) -> Box<dyn Handler> {
        Box::new(DelayOnErrorHandler {
            initial_interval: self.initial_interval,
            max_interval: self.max_interval,
            multiplier: self.multiplier,
            consecutive_errors: AtomicU32::new(0),
            next,
        })
    }
}

struct DelayOnErrorHandler {
    initial_interval: Duration,
    max_interval: Duration,
    multiplier: f64,
    consecutive_errors: AtomicU32,
    next: Box<dyn Handler>,
}

#[async_trait::async_trait]
impl Handler for DelayOnErrorHandler {
    async fn handle(&self, msg: Message<Pending>) -> Result<HandlerResult, HandlerError> {
        match self.next.handle(msg).await {
            Ok(result) => {
                self.consecutive_errors.store(0, Ordering::SeqCst);
                Ok(result)
            }
            Err(e) => {
                let n = self.consecutive_errors.fetch_add(1, Ordering::SeqCst);
                let delay = Duration::from_secs_f64(
                    (self.initial_interval.as_secs_f64() * self.multiplier.powi(n as i32))
                        .min(self.max_interval.as_secs_f64()),
                );
                tokio::time::sleep(delay).await;
                Err(e)
            }
        }
    }
}