use async_trait::async_trait;
use std::time::Duration;
use crate::error::WorkerResult;
use crate::message::ReceivedMessage;
#[derive(Debug, Clone)]
pub enum BackoffStrategy {
Fixed(Duration),
Exponential {
initial: Duration,
max: Duration,
multiplier: f64,
},
}
impl BackoffStrategy {
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
match self {
BackoffStrategy::Fixed(duration) => *duration,
BackoffStrategy::Exponential {
initial,
max,
multiplier,
} => {
let delay = initial.mul_f64(multiplier.powi(attempt as i32));
delay.min(*max)
}
}
}
}
#[async_trait]
pub trait Worker: Send + Sync {
fn id(&self) -> &str;
fn name(&self) -> String {
self.id().to_string()
}
async fn process(&self, message: ReceivedMessage<serde_json::Value>) -> WorkerResult<()>;
async fn setup(&self) -> WorkerResult<()> {
Ok(())
}
async fn teardown(&self) {}
fn concurrency_limit(&self) -> Option<usize> {
None
}
fn restart_backoff_strategy(&self) -> BackoffStrategy {
BackoffStrategy::Exponential {
initial: Duration::from_secs(1),
max: Duration::from_secs(60),
multiplier: 2.0,
}
}
fn processing_timeout(&self) -> Option<Duration> {
None
}
}
pub fn box_worker<W: Worker + 'static>(worker: W) -> Box<dyn Worker> {
Box::new(worker)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fixed_backoff() {
let strategy = BackoffStrategy::Fixed(Duration::from_secs(5));
assert_eq!(strategy.delay_for_attempt(0), Duration::from_secs(5));
assert_eq!(strategy.delay_for_attempt(1), Duration::from_secs(5));
assert_eq!(strategy.delay_for_attempt(10), Duration::from_secs(5));
}
#[test]
fn test_exponential_backoff() {
let strategy = BackoffStrategy::Exponential {
initial: Duration::from_secs(1),
max: Duration::from_secs(60),
multiplier: 2.0,
};
assert_eq!(strategy.delay_for_attempt(0), Duration::from_secs(1));
assert_eq!(strategy.delay_for_attempt(1), Duration::from_secs(2));
assert_eq!(strategy.delay_for_attempt(2), Duration::from_secs(4));
assert_eq!(strategy.delay_for_attempt(3), Duration::from_secs(8));
assert_eq!(strategy.delay_for_attempt(4), Duration::from_secs(16));
assert_eq!(strategy.delay_for_attempt(5), Duration::from_secs(32));
assert_eq!(strategy.delay_for_attempt(10), Duration::from_secs(60));
}
#[test]
fn test_worker_default_methods() {
struct TestWorker;
#[async_trait]
impl Worker for TestWorker {
fn id(&self) -> &str {
"test-worker"
}
async fn process(
&self,
_message: ReceivedMessage<serde_json::Value>,
) -> WorkerResult<()> {
Ok(())
}
}
let worker = TestWorker;
assert_eq!(worker.id(), "test-worker");
assert_eq!(worker.name(), "test-worker");
assert!(worker.concurrency_limit().is_none());
let strategy = worker.restart_backoff_strategy();
match strategy {
BackoffStrategy::Exponential {
initial,
max,
multiplier,
} => {
assert_eq!(initial, Duration::from_secs(1));
assert_eq!(max, Duration::from_secs(60));
assert_eq!(multiplier, 2.0);
}
_ => panic!("Expected Exponential strategy"),
}
}
}