use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
pub enum DelayStrategy {
#[default]
TTL,
DelayedExchange,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RetryMechanism {
Exponential {
base_delay: Duration,
max_delay: Duration,
},
Linear { delay: Duration },
Custom { delays: Vec<Duration> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_retries: u32,
pub mechanism: RetryMechanism,
#[serde(default)]
pub delay_strategy: DelayStrategy,
pub dead_letter_exchange: Option<String>,
pub dead_letter_queue: Option<String>,
pub dlq_ttl: Option<Duration>,
}
impl RetryConfig {
pub fn exponential_default() -> Self {
Self {
max_retries: 5,
mechanism: RetryMechanism::Exponential {
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(60),
},
delay_strategy: DelayStrategy::default(),
dead_letter_exchange: None,
dead_letter_queue: None,
dlq_ttl: None,
}
}
pub fn exponential(max_retries: u32, base_delay: Duration, max_delay: Duration) -> Self {
Self {
max_retries,
mechanism: RetryMechanism::Exponential {
base_delay,
max_delay,
},
delay_strategy: DelayStrategy::default(),
dead_letter_exchange: None,
dead_letter_queue: None,
dlq_ttl: None,
}
}
pub fn linear(max_retries: u32, delay: Duration) -> Self {
Self {
max_retries,
mechanism: RetryMechanism::Linear { delay },
delay_strategy: DelayStrategy::default(),
dead_letter_exchange: None,
dead_letter_queue: None,
dlq_ttl: None,
}
}
pub fn custom(delays: Vec<Duration>) -> Self {
let max_retries = delays.len() as u32;
Self {
max_retries,
mechanism: RetryMechanism::Custom { delays },
delay_strategy: DelayStrategy::default(),
dead_letter_exchange: None,
dead_letter_queue: None,
dlq_ttl: None,
}
}
pub fn no_retry() -> Self {
Self {
max_retries: 0,
mechanism: RetryMechanism::Linear {
delay: Duration::from_secs(0),
},
delay_strategy: DelayStrategy::default(),
dead_letter_exchange: None,
dead_letter_queue: None,
dlq_ttl: None,
}
}
pub fn with_dead_letter(mut self, exchange: String, queue: String) -> Self {
self.dead_letter_exchange = Some(exchange);
self.dead_letter_queue = Some(queue);
self
}
pub fn with_delay_strategy(mut self, strategy: DelayStrategy) -> Self {
self.delay_strategy = strategy;
self
}
pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
self.dlq_ttl = Some(ttl);
self
}
pub fn calculate_delay(&self, attempt: u32) -> Option<Duration> {
if attempt >= self.max_retries {
return None; }
let delay = match &self.mechanism {
RetryMechanism::Exponential {
base_delay,
max_delay,
} => {
let exponential_delay =
Duration::from_millis(base_delay.as_millis() as u64 * 2_u64.pow(attempt));
std::cmp::min(exponential_delay, *max_delay)
}
RetryMechanism::Linear { delay } => *delay,
RetryMechanism::Custom { delays } => {
if (attempt as usize) < delays.len() {
delays[attempt as usize]
} else {
return None; }
}
};
Some(delay)
}
pub fn get_dead_letter_exchange(&self, queue_name: &str) -> String {
self.dead_letter_exchange
.clone()
.unwrap_or_else(|| format!("{}.dlx", queue_name))
}
pub fn get_dead_letter_queue(&self, queue_name: &str) -> String {
self.dead_letter_queue
.clone()
.unwrap_or_else(|| format!("{}.dlq", queue_name))
}
pub fn get_retry_queue_name(&self, queue_name: &str, attempt: u32) -> String {
format!("{}.retry.{}", queue_name, attempt + 1)
}
pub fn get_delay_exchange(&self, queue_name: &str) -> String {
format!("{}.delay", queue_name)
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self::exponential_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exponential_retry() {
let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8))); assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16))); assert_eq!(config.calculate_delay(5), None); }
#[test]
fn test_exponential_retry_with_cap() {
let config = RetryConfig::exponential(10, Duration::from_secs(1), Duration::from_secs(5));
assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(5))); assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(5))); }
#[test]
fn test_linear_retry() {
let config = RetryConfig::linear(3, Duration::from_secs(5));
assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
assert_eq!(config.calculate_delay(3), None); }
#[test]
fn test_custom_retry() {
let delays = vec![
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_secs(30),
];
let config = RetryConfig::custom(delays);
assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(30)));
assert_eq!(config.calculate_delay(3), None); }
#[test]
fn test_no_retry() {
let config = RetryConfig::no_retry();
assert_eq!(config.calculate_delay(0), None); }
#[test]
fn test_dead_letter_names() {
let config = RetryConfig::exponential_default();
assert_eq!(config.get_dead_letter_exchange("orders"), "orders.dlx");
assert_eq!(config.get_dead_letter_queue("orders"), "orders.dlq");
let config_custom =
config.with_dead_letter("custom.dlx".to_string(), "custom.dlq".to_string());
assert_eq!(
config_custom.get_dead_letter_exchange("orders"),
"custom.dlx"
);
assert_eq!(config_custom.get_dead_letter_queue("orders"), "custom.dlq");
}
#[test]
fn test_retry_queue_names() {
let config = RetryConfig::exponential_default();
assert_eq!(config.get_retry_queue_name("orders", 0), "orders.retry.1");
assert_eq!(config.get_retry_queue_name("orders", 1), "orders.retry.2");
assert_eq!(config.get_retry_queue_name("orders", 4), "orders.retry.5");
}
#[test]
fn test_delay_exchange_names() {
let config = RetryConfig::exponential_default();
assert_eq!(config.get_delay_exchange("orders"), "orders.delay");
}
#[test]
fn test_delay_strategy_default() {
let config = RetryConfig::exponential_default();
assert!(matches!(config.delay_strategy, DelayStrategy::TTL));
}
#[test]
fn test_delay_strategy_configuration() {
let config =
RetryConfig::exponential_default().with_delay_strategy(DelayStrategy::DelayedExchange);
assert!(matches!(
config.delay_strategy,
DelayStrategy::DelayedExchange
));
}
}