rust_rabbit/
retry.rs

1//! Simplified and flexible retry system for rust-rabbit
2//!
3//! This module provides a simple but powerful retry mechanism with support for:
4//! - Configurable max retry count
5//! - Multiple retry mechanisms: exponential (1s->2s->4s->8s), linear, custom delays
6//! - Delay-based message retry using RabbitMQ delayed exchanges
7//! - Two delay strategies: TTL-based (original) or RabbitMQ delayed exchange plugin
8
9use serde::{Deserialize, Serialize};
10use std::time::Duration;
11
12/// Strategy for delaying retry messages
13///
14/// # TTL (Time-To-Live) Strategy
15/// Uses RabbitMQ's TTL feature with temporary retry queues.
16/// Messages are published to temporary queues that expire and route to the original queue.
17/// **Pros**: No plugin required, simple setup
18/// **Cons**: Less precise timing, requires queue cleanup
19///
20/// # DelayedExchange Strategy
21/// Uses the RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin).
22/// Messages are published to a delay exchange with x-delay header.
23/// The exchange automatically routes messages after the delay period.
24/// **Pros**: More precise, cleaner architecture, single exchange
25/// **Cons**: Requires RabbitMQ plugin installation
26///
27/// # Example
28/// ```ignore
29/// use rust_rabbit::retry::{DelayStrategy, RetryConfig, RetryMechanism};
30/// use std::time::Duration;
31///
32/// // Using TTL strategy (default, no plugin required)
33/// let config = RetryConfig {
34///     max_retries: 3,
35///     mechanism: RetryMechanism::Exponential {
36///         base_delay: Duration::from_secs(1),
37///         max_delay: Duration::from_secs(60),
38///     },
39///     delay_strategy: DelayStrategy::TTL,
40///     dead_letter_exchange: None,
41///     dead_letter_queue: None,
42/// };
43///
44/// // Using RabbitMQ delayed exchange plugin (requires plugin)
45/// let config = RetryConfig {
46///     max_retries: 3,
47///     mechanism: RetryMechanism::Linear { delay: Duration::from_secs(5) },
48///     delay_strategy: DelayStrategy::DelayedExchange,
49///     dead_letter_exchange: None,
50///     dead_letter_queue: None,
51/// };
52/// ```
53#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
54pub enum DelayStrategy {
55    /// TTL-based delays using temporary queues (no plugin required)
56    TTL,
57    /// RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin)
58    DelayedExchange,
59}
60
61impl Default for DelayStrategy {
62    fn default() -> Self {
63        Self::TTL
64    }
65}
66
67/// Retry mechanism configuration
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub enum RetryMechanism {
70    /// Exponential backoff: base_delay * 2^attempt, capped at max_delay
71    /// Example: 1s -> 2s -> 4s -> 8s -> 16s (capped at max_delay)
72    Exponential {
73        base_delay: Duration,
74        max_delay: Duration,
75    },
76
77    /// Linear delay: same delay for each retry attempt
78    /// Example: 5s -> 5s -> 5s -> 5s
79    Linear { delay: Duration },
80
81    /// Custom delays: specify exact delay for each retry attempt
82    /// Example: [1s, 5s, 30s] for 3 retries with increasing delays
83    Custom { delays: Vec<Duration> },
84}
85
86/// Simple retry configuration
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct RetryConfig {
89    /// Maximum number of retry attempts (0 means no retries)
90    pub max_retries: u32,
91
92    /// Retry mechanism to use
93    pub mechanism: RetryMechanism,
94
95    /// Strategy for delaying retry messages (TTL or DelayedExchange)
96    #[serde(default)]
97    pub delay_strategy: DelayStrategy,
98
99    /// Dead letter exchange for messages that exceed max retries
100    /// If None, uses default: "{queue_name}.dlx"
101    pub dead_letter_exchange: Option<String>,
102
103    /// Dead letter queue for permanently failed messages
104    /// If None, uses default: "{queue_name}.dlq"
105    pub dead_letter_queue: Option<String>,
106
107    /// TTL for dead letter queue (auto-cleanup failed messages after this duration)
108    /// If set, messages in DLQ will be automatically removed after TTL expires
109    /// Example: Duration::from_secs(86400) = 1 day
110    pub dlq_ttl: Option<Duration>,
111}
112
113impl RetryConfig {
114    /// Create exponential retry: 1s -> 2s -> 4s -> 8s -> 16s (max 5 retries)
115    pub fn exponential_default() -> Self {
116        Self {
117            max_retries: 5,
118            mechanism: RetryMechanism::Exponential {
119                base_delay: Duration::from_secs(1),
120                max_delay: Duration::from_secs(60),
121            },
122            delay_strategy: DelayStrategy::default(),
123            dead_letter_exchange: None,
124            dead_letter_queue: None,
125            dlq_ttl: None,
126        }
127    }
128
129    /// Create exponential retry with custom parameters
130    pub fn exponential(max_retries: u32, base_delay: Duration, max_delay: Duration) -> Self {
131        Self {
132            max_retries,
133            mechanism: RetryMechanism::Exponential {
134                base_delay,
135                max_delay,
136            },
137            delay_strategy: DelayStrategy::default(),
138            dead_letter_exchange: None,
139            dead_letter_queue: None,
140            dlq_ttl: None,
141        }
142    }
143
144    /// Create linear retry: same delay for each attempt
145    pub fn linear(max_retries: u32, delay: Duration) -> Self {
146        Self {
147            max_retries,
148            mechanism: RetryMechanism::Linear { delay },
149            delay_strategy: DelayStrategy::default(),
150            dead_letter_exchange: None,
151            dead_letter_queue: None,
152            dlq_ttl: None,
153        }
154    }
155
156    /// Create custom retry with specific delays for each attempt
157    pub fn custom(delays: Vec<Duration>) -> Self {
158        let max_retries = delays.len() as u32;
159        Self {
160            max_retries,
161            mechanism: RetryMechanism::Custom { delays },
162            delay_strategy: DelayStrategy::default(),
163            dead_letter_exchange: None,
164            dead_letter_queue: None,
165            dlq_ttl: None,
166        }
167    }
168
169    /// No retries - fail immediately
170    pub fn no_retry() -> Self {
171        Self {
172            max_retries: 0,
173            mechanism: RetryMechanism::Linear {
174                delay: Duration::from_secs(0),
175            },
176            delay_strategy: DelayStrategy::default(),
177            dead_letter_exchange: None,
178            dead_letter_queue: None,
179            dlq_ttl: None,
180        }
181    }
182
183    /// Set custom dead letter exchange and queue names
184    pub fn with_dead_letter(mut self, exchange: String, queue: String) -> Self {
185        self.dead_letter_exchange = Some(exchange);
186        self.dead_letter_queue = Some(queue);
187        self
188    }
189
190    /// Set delay strategy to use (TTL or DelayedExchange)
191    pub fn with_delay_strategy(mut self, strategy: DelayStrategy) -> Self {
192        self.delay_strategy = strategy;
193        self
194    }
195
196    /// Set TTL for dead letter queue (auto-cleanup failed messages)
197    /// Messages in DLQ will be automatically removed after this duration
198    ///
199    /// # Example
200    /// ```ignore
201    /// let config = RetryConfig::exponential_default()
202    ///     .with_dlq_ttl(Duration::from_secs(86400));  // 1 day
203    /// ```
204    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
205        self.dlq_ttl = Some(ttl);
206        self
207    }
208
209    /// Calculate delay for a specific retry attempt (0-indexed)
210    pub fn calculate_delay(&self, attempt: u32) -> Option<Duration> {
211        if attempt >= self.max_retries {
212            return None; // No more retries
213        }
214
215        let delay = match &self.mechanism {
216            RetryMechanism::Exponential {
217                base_delay,
218                max_delay,
219            } => {
220                let exponential_delay =
221                    Duration::from_millis(base_delay.as_millis() as u64 * 2_u64.pow(attempt));
222                std::cmp::min(exponential_delay, *max_delay)
223            }
224            RetryMechanism::Linear { delay } => *delay,
225            RetryMechanism::Custom { delays } => {
226                if (attempt as usize) < delays.len() {
227                    delays[attempt as usize]
228                } else {
229                    return None; // No more custom delays
230                }
231            }
232        };
233
234        Some(delay)
235    }
236
237    /// Get dead letter exchange name (with fallback to default)
238    pub fn get_dead_letter_exchange(&self, queue_name: &str) -> String {
239        self.dead_letter_exchange
240            .clone()
241            .unwrap_or_else(|| format!("{}.dlx", queue_name))
242    }
243
244    /// Get dead letter queue name (with fallback to default)
245    pub fn get_dead_letter_queue(&self, queue_name: &str) -> String {
246        self.dead_letter_queue
247            .clone()
248            .unwrap_or_else(|| format!("{}.dlq", queue_name))
249    }
250
251    /// Generate retry queue name for delayed messages
252    pub fn get_retry_queue_name(&self, queue_name: &str, attempt: u32) -> String {
253        format!("{}.retry.{}", queue_name, attempt + 1)
254    }
255
256    /// Get delay exchange name (for delayed exchange strategy)
257    pub fn get_delay_exchange(&self, queue_name: &str) -> String {
258        format!("{}.delay", queue_name)
259    }
260}
261
262impl Default for RetryConfig {
263    fn default() -> Self {
264        Self::exponential_default()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn test_exponential_retry() {
274        let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
275
276        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
277        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
278        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
279        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8))); // 8s
280        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16))); // 16s
281        assert_eq!(config.calculate_delay(5), None); // No more retries
282    }
283
284    #[test]
285    fn test_exponential_retry_with_cap() {
286        let config = RetryConfig::exponential(10, Duration::from_secs(1), Duration::from_secs(5));
287
288        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
289        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
290        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
291        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(5))); // 5s (capped)
292        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(5))); // 5s (capped)
293    }
294
295    #[test]
296    fn test_linear_retry() {
297        let config = RetryConfig::linear(3, Duration::from_secs(5));
298
299        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
300        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
301        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
302        assert_eq!(config.calculate_delay(3), None); // No more retries
303    }
304
305    #[test]
306    fn test_custom_retry() {
307        let delays = vec![
308            Duration::from_secs(1),
309            Duration::from_secs(5),
310            Duration::from_secs(30),
311        ];
312        let config = RetryConfig::custom(delays);
313
314        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
315        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
316        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(30)));
317        assert_eq!(config.calculate_delay(3), None); // No more retries
318    }
319
320    #[test]
321    fn test_no_retry() {
322        let config = RetryConfig::no_retry();
323
324        assert_eq!(config.calculate_delay(0), None); // No retries at all
325    }
326
327    #[test]
328    fn test_dead_letter_names() {
329        let config = RetryConfig::exponential_default();
330
331        assert_eq!(config.get_dead_letter_exchange("orders"), "orders.dlx");
332        assert_eq!(config.get_dead_letter_queue("orders"), "orders.dlq");
333
334        let config_custom =
335            config.with_dead_letter("custom.dlx".to_string(), "custom.dlq".to_string());
336        assert_eq!(
337            config_custom.get_dead_letter_exchange("orders"),
338            "custom.dlx"
339        );
340        assert_eq!(config_custom.get_dead_letter_queue("orders"), "custom.dlq");
341    }
342
343    #[test]
344    fn test_retry_queue_names() {
345        let config = RetryConfig::exponential_default();
346
347        assert_eq!(config.get_retry_queue_name("orders", 0), "orders.retry.1");
348        assert_eq!(config.get_retry_queue_name("orders", 1), "orders.retry.2");
349        assert_eq!(config.get_retry_queue_name("orders", 4), "orders.retry.5");
350    }
351
352    #[test]
353    fn test_delay_exchange_names() {
354        let config = RetryConfig::exponential_default();
355
356        assert_eq!(config.get_delay_exchange("orders"), "orders.delay");
357    }
358
359    #[test]
360    fn test_delay_strategy_default() {
361        let config = RetryConfig::exponential_default();
362
363        assert!(matches!(config.delay_strategy, DelayStrategy::TTL));
364    }
365
366    #[test]
367    fn test_delay_strategy_configuration() {
368        let config =
369            RetryConfig::exponential_default().with_delay_strategy(DelayStrategy::DelayedExchange);
370
371        assert!(matches!(
372            config.delay_strategy,
373            DelayStrategy::DelayedExchange
374        ));
375    }
376}