rust_rabbit/
retry.rs

1//! Simplified and flexible retry system for rust-rabbit
2//!
3//! This module provides a simple but powerful retry mechanism with support for:
4//! - Configurable max retry count
5//! - Multiple retry mechanisms: exponential (1s->2s->4s->8s), linear, custom delays
6//! - Delay-based message retry using RabbitMQ delayed exchanges
7//! - Two delay strategies: TTL-based (original) or RabbitMQ delayed exchange plugin
8
9use serde::{Deserialize, Serialize};
10use std::time::Duration;
11
12/// Strategy for delaying retry messages
13///
14/// # TTL (Time-To-Live) Strategy
15/// Uses RabbitMQ's TTL feature with temporary retry queues.
16/// Messages are published to temporary queues that expire and route to the original queue.
17/// **Pros**: No plugin required, simple setup
18/// **Cons**: Less precise timing, requires queue cleanup
19///
20/// # DelayedExchange Strategy
21/// Uses the RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin).
22/// Messages are published to a delay exchange with x-delay header.
23/// The exchange automatically routes messages after the delay period.
24/// **Pros**: More precise, cleaner architecture, single exchange
25/// **Cons**: Requires RabbitMQ plugin installation
26///
27/// # Example
28/// ```ignore
29/// use rust_rabbit::retry::{DelayStrategy, RetryConfig, RetryMechanism};
30/// use std::time::Duration;
31///
32/// // Using TTL strategy (default, no plugin required)
33/// let config = RetryConfig {
34///     max_retries: 3,
35///     mechanism: RetryMechanism::Exponential {
36///         base_delay: Duration::from_secs(1),
37///         max_delay: Duration::from_secs(60),
38///     },
39///     delay_strategy: DelayStrategy::TTL,
40///     dead_letter_exchange: None,
41///     dead_letter_queue: None,
42/// };
43///
44/// // Using RabbitMQ delayed exchange plugin (requires plugin)
45/// let config = RetryConfig {
46///     max_retries: 3,
47///     mechanism: RetryMechanism::Linear { delay: Duration::from_secs(5) },
48///     delay_strategy: DelayStrategy::DelayedExchange,
49///     dead_letter_exchange: None,
50///     dead_letter_queue: None,
51/// };
52/// ```
53#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
54pub enum DelayStrategy {
55    /// TTL-based delays using temporary queues (no plugin required)
56    TTL,
57    /// RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin)
58    ///
59    /// ⚠️ **IMPORTANT**: Using this strategy without the `rabbitmq_delayed_message_exchange` plugin
60    /// will cause your application to crash with "NOT_FOUND - operation not permitted on this exchange" error.
61    ///
62    /// Before deploying code with `DelayedExchange`, ensure:
63    /// 1. Plugin is installed: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
64    /// 2. Plugin is enabled: `rabbitmq-plugins enable rabbitmq_delayed_message_exchange`
65    /// 3. RabbitMQ is restarted after plugin installation
66    DelayedExchange,
67}
68
69impl Default for DelayStrategy {
70    fn default() -> Self {
71        Self::TTL
72    }
73}
74
75/// Retry mechanism configuration
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub enum RetryMechanism {
78    /// Exponential backoff: base_delay * 2^attempt, capped at max_delay
79    /// Example: 1s -> 2s -> 4s -> 8s -> 16s (capped at max_delay)
80    Exponential {
81        base_delay: Duration,
82        max_delay: Duration,
83    },
84
85    /// Linear delay: same delay for each retry attempt
86    /// Example: 5s -> 5s -> 5s -> 5s
87    Linear { delay: Duration },
88
89    /// Custom delays: specify exact delay for each retry attempt
90    /// Example: [1s, 5s, 30s] for 3 retries with increasing delays
91    Custom { delays: Vec<Duration> },
92}
93
94/// Simple retry configuration
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct RetryConfig {
97    /// Maximum number of retry attempts (0 means no retries)
98    pub max_retries: u32,
99
100    /// Retry mechanism to use
101    pub mechanism: RetryMechanism,
102
103    /// Strategy for delaying retry messages (TTL or DelayedExchange)
104    #[serde(default)]
105    pub delay_strategy: DelayStrategy,
106
107    /// Dead letter exchange for messages that exceed max retries
108    /// If None, uses default: "{queue_name}.dlx"
109    pub dead_letter_exchange: Option<String>,
110
111    /// Dead letter queue for permanently failed messages
112    /// If None, uses default: "{queue_name}.dlq"
113    pub dead_letter_queue: Option<String>,
114
115    /// TTL for dead letter queue (auto-cleanup failed messages after this duration)
116    /// If set, messages in DLQ will be automatically removed after TTL expires
117    /// Example: Duration::from_secs(86400) = 1 day
118    pub dlq_ttl: Option<Duration>,
119}
120
121impl RetryConfig {
122    /// Create exponential retry: 1s -> 2s -> 4s -> 8s -> 16s (max 5 retries)
123    pub fn exponential_default() -> Self {
124        Self {
125            max_retries: 5,
126            mechanism: RetryMechanism::Exponential {
127                base_delay: Duration::from_secs(1),
128                max_delay: Duration::from_secs(60),
129            },
130            delay_strategy: DelayStrategy::default(),
131            dead_letter_exchange: None,
132            dead_letter_queue: None,
133            dlq_ttl: None,
134        }
135    }
136
137    /// Create exponential retry with custom parameters
138    pub fn exponential(max_retries: u32, base_delay: Duration, max_delay: Duration) -> Self {
139        Self {
140            max_retries,
141            mechanism: RetryMechanism::Exponential {
142                base_delay,
143                max_delay,
144            },
145            delay_strategy: DelayStrategy::default(),
146            dead_letter_exchange: None,
147            dead_letter_queue: None,
148            dlq_ttl: None,
149        }
150    }
151
152    /// Create linear retry: same delay for each attempt
153    pub fn linear(max_retries: u32, delay: Duration) -> Self {
154        Self {
155            max_retries,
156            mechanism: RetryMechanism::Linear { delay },
157            delay_strategy: DelayStrategy::default(),
158            dead_letter_exchange: None,
159            dead_letter_queue: None,
160            dlq_ttl: None,
161        }
162    }
163
164    /// Create custom retry with specific delays for each attempt
165    pub fn custom(delays: Vec<Duration>) -> Self {
166        let max_retries = delays.len() as u32;
167        Self {
168            max_retries,
169            mechanism: RetryMechanism::Custom { delays },
170            delay_strategy: DelayStrategy::default(),
171            dead_letter_exchange: None,
172            dead_letter_queue: None,
173            dlq_ttl: None,
174        }
175    }
176
177    /// No retries - fail immediately
178    pub fn no_retry() -> Self {
179        Self {
180            max_retries: 0,
181            mechanism: RetryMechanism::Linear {
182                delay: Duration::from_secs(0),
183            },
184            delay_strategy: DelayStrategy::default(),
185            dead_letter_exchange: None,
186            dead_letter_queue: None,
187            dlq_ttl: None,
188        }
189    }
190
191    /// Set custom dead letter exchange and queue names
192    pub fn with_dead_letter(mut self, exchange: String, queue: String) -> Self {
193        self.dead_letter_exchange = Some(exchange);
194        self.dead_letter_queue = Some(queue);
195        self
196    }
197
198    /// Set delay strategy to use (TTL or DelayedExchange)
199    pub fn with_delay_strategy(mut self, strategy: DelayStrategy) -> Self {
200        self.delay_strategy = strategy;
201        self
202    }
203
204    /// Set TTL for dead letter queue (auto-cleanup failed messages)
205    /// Messages in DLQ will be automatically removed after this duration
206    ///
207    /// # Example
208    /// ```ignore
209    /// let config = RetryConfig::exponential_default()
210    ///     .with_dlq_ttl(Duration::from_secs(86400));  // 1 day
211    /// ```
212    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
213        self.dlq_ttl = Some(ttl);
214        self
215    }
216
217    /// Calculate delay for a specific retry attempt (0-indexed)
218    pub fn calculate_delay(&self, attempt: u32) -> Option<Duration> {
219        if attempt >= self.max_retries {
220            return None; // No more retries
221        }
222
223        let delay = match &self.mechanism {
224            RetryMechanism::Exponential {
225                base_delay,
226                max_delay,
227            } => {
228                let exponential_delay =
229                    Duration::from_millis(base_delay.as_millis() as u64 * 2_u64.pow(attempt));
230                std::cmp::min(exponential_delay, *max_delay)
231            }
232            RetryMechanism::Linear { delay } => *delay,
233            RetryMechanism::Custom { delays } => {
234                if (attempt as usize) < delays.len() {
235                    delays[attempt as usize]
236                } else {
237                    return None; // No more custom delays
238                }
239            }
240        };
241
242        Some(delay)
243    }
244
245    /// Get dead letter exchange name (with fallback to default)
246    pub fn get_dead_letter_exchange(&self, queue_name: &str) -> String {
247        self.dead_letter_exchange
248            .clone()
249            .unwrap_or_else(|| format!("{}.dlx", queue_name))
250    }
251
252    /// Get dead letter queue name (with fallback to default)
253    pub fn get_dead_letter_queue(&self, queue_name: &str) -> String {
254        self.dead_letter_queue
255            .clone()
256            .unwrap_or_else(|| format!("{}.dlq", queue_name))
257    }
258
259    /// Generate retry queue name for delayed messages
260    pub fn get_retry_queue_name(&self, queue_name: &str, attempt: u32) -> String {
261        format!("{}.retry.{}", queue_name, attempt + 1)
262    }
263
264    /// Get delay exchange name (for delayed exchange strategy)
265    pub fn get_delay_exchange(&self, queue_name: &str) -> String {
266        format!("{}.delay", queue_name)
267    }
268}
269
270impl Default for RetryConfig {
271    fn default() -> Self {
272        Self::exponential_default()
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn test_exponential_retry() {
282        let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
283
284        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
285        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
286        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
287        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8))); // 8s
288        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16))); // 16s
289        assert_eq!(config.calculate_delay(5), None); // No more retries
290    }
291
292    #[test]
293    fn test_exponential_retry_with_cap() {
294        let config = RetryConfig::exponential(10, Duration::from_secs(1), Duration::from_secs(5));
295
296        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
297        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
298        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
299        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(5))); // 5s (capped)
300        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(5))); // 5s (capped)
301    }
302
303    #[test]
304    fn test_linear_retry() {
305        let config = RetryConfig::linear(3, Duration::from_secs(5));
306
307        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
308        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
309        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
310        assert_eq!(config.calculate_delay(3), None); // No more retries
311    }
312
313    #[test]
314    fn test_custom_retry() {
315        let delays = vec![
316            Duration::from_secs(1),
317            Duration::from_secs(5),
318            Duration::from_secs(30),
319        ];
320        let config = RetryConfig::custom(delays);
321
322        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
323        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
324        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(30)));
325        assert_eq!(config.calculate_delay(3), None); // No more retries
326    }
327
328    #[test]
329    fn test_no_retry() {
330        let config = RetryConfig::no_retry();
331
332        assert_eq!(config.calculate_delay(0), None); // No retries at all
333    }
334
335    #[test]
336    fn test_dead_letter_names() {
337        let config = RetryConfig::exponential_default();
338
339        assert_eq!(config.get_dead_letter_exchange("orders"), "orders.dlx");
340        assert_eq!(config.get_dead_letter_queue("orders"), "orders.dlq");
341
342        let config_custom =
343            config.with_dead_letter("custom.dlx".to_string(), "custom.dlq".to_string());
344        assert_eq!(
345            config_custom.get_dead_letter_exchange("orders"),
346            "custom.dlx"
347        );
348        assert_eq!(config_custom.get_dead_letter_queue("orders"), "custom.dlq");
349    }
350
351    #[test]
352    fn test_retry_queue_names() {
353        let config = RetryConfig::exponential_default();
354
355        assert_eq!(config.get_retry_queue_name("orders", 0), "orders.retry.1");
356        assert_eq!(config.get_retry_queue_name("orders", 1), "orders.retry.2");
357        assert_eq!(config.get_retry_queue_name("orders", 4), "orders.retry.5");
358    }
359
360    #[test]
361    fn test_delay_exchange_names() {
362        let config = RetryConfig::exponential_default();
363
364        assert_eq!(config.get_delay_exchange("orders"), "orders.delay");
365    }
366
367    #[test]
368    fn test_delay_strategy_default() {
369        let config = RetryConfig::exponential_default();
370
371        assert!(matches!(config.delay_strategy, DelayStrategy::TTL));
372    }
373
374    #[test]
375    fn test_delay_strategy_configuration() {
376        let config =
377            RetryConfig::exponential_default().with_delay_strategy(DelayStrategy::DelayedExchange);
378
379        assert!(matches!(
380            config.delay_strategy,
381            DelayStrategy::DelayedExchange
382        ));
383    }
384}