rust_rabbit/
retry.rs

1//! Simplified and flexible retry system for rust-rabbit
2//!
3//! This module provides a simple but powerful retry mechanism with support for:
4//! - Configurable max retry count
5//! - Multiple retry mechanisms: exponential (1s->2s->4s->8s), linear, custom delays
6//! - Delay-based message retry using RabbitMQ delayed exchanges
7//! - Two delay strategies: TTL-based (original) or RabbitMQ delayed exchange plugin
8
9use serde::{Deserialize, Serialize};
10use std::time::Duration;
11
12/// Strategy for delaying retry messages
13///
14/// # TTL (Time-To-Live) Strategy
15/// Uses RabbitMQ's TTL feature with temporary retry queues.
16/// Messages are published to temporary queues that expire and route to the original queue.
17/// **Pros**: No plugin required, simple setup
18/// **Cons**: Less precise timing, requires queue cleanup
19///
20/// # DelayedExchange Strategy
21/// Uses the RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin).
22/// Messages are published to a delay exchange with x-delay header.
23/// The exchange automatically routes messages after the delay period.
24/// **Pros**: More precise, cleaner architecture, single exchange
25/// **Cons**: Requires RabbitMQ plugin installation
26///
27/// # Example
28/// ```ignore
29/// use rust_rabbit::retry::{DelayStrategy, RetryConfig, RetryMechanism};
30/// use std::time::Duration;
31///
32/// // Using TTL strategy (default, no plugin required)
33/// let config = RetryConfig {
34///     max_retries: 3,
35///     mechanism: RetryMechanism::Exponential {
36///         base_delay: Duration::from_secs(1),
37///         max_delay: Duration::from_secs(60),
38///     },
39///     delay_strategy: DelayStrategy::TTL,
40///     dead_letter_exchange: None,
41///     dead_letter_queue: None,
42/// };
43///
44/// // Using RabbitMQ delayed exchange plugin (requires plugin)
45/// let config = RetryConfig {
46///     max_retries: 3,
47///     mechanism: RetryMechanism::Linear { delay: Duration::from_secs(5) },
48///     delay_strategy: DelayStrategy::DelayedExchange,
49///     dead_letter_exchange: None,
50///     dead_letter_queue: None,
51/// };
52/// ```
53#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
54pub enum DelayStrategy {
55    /// TTL-based delays using temporary queues (no plugin required)
56    #[default]
57    TTL,
58    /// RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin)
59    ///
60    /// ⚠️ **IMPORTANT**: Using this strategy without the `rabbitmq_delayed_message_exchange` plugin
61    /// will cause your application to crash with "NOT_FOUND - operation not permitted on this exchange" error.
62    ///
63    /// Before deploying code with `DelayedExchange`, ensure:
64    /// 1. Plugin is installed: <https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases>
65    /// 2. Plugin is enabled: `rabbitmq-plugins enable rabbitmq_delayed_message_exchange`
66    /// 3. RabbitMQ is restarted after plugin installation
67    DelayedExchange,
68}
69
70/// Retry mechanism configuration
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum RetryMechanism {
73    /// Exponential backoff: base_delay * 2^attempt, capped at max_delay
74    /// Example: 1s -> 2s -> 4s -> 8s -> 16s (capped at max_delay)
75    Exponential {
76        base_delay: Duration,
77        max_delay: Duration,
78    },
79
80    /// Linear delay: same delay for each retry attempt
81    /// Example: 5s -> 5s -> 5s -> 5s
82    Linear { delay: Duration },
83
84    /// Custom delays: specify exact delay for each retry attempt
85    /// Example: [1s, 5s, 30s] for 3 retries with increasing delays
86    Custom { delays: Vec<Duration> },
87}
88
89/// Simple retry configuration
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct RetryConfig {
92    /// Maximum number of retry attempts (0 means no retries)
93    pub max_retries: u32,
94
95    /// Retry mechanism to use
96    pub mechanism: RetryMechanism,
97
98    /// Strategy for delaying retry messages (TTL or DelayedExchange)
99    #[serde(default)]
100    pub delay_strategy: DelayStrategy,
101
102    /// Dead letter exchange for messages that exceed max retries
103    /// If None, uses default: "{queue_name}.dlx"
104    pub dead_letter_exchange: Option<String>,
105
106    /// Dead letter queue for permanently failed messages
107    /// If None, uses default: "{queue_name}.dlq"
108    pub dead_letter_queue: Option<String>,
109
110    /// TTL for dead letter queue (auto-cleanup failed messages after this duration)
111    /// If set, messages in DLQ will be automatically removed after TTL expires
112    /// Example: Duration::from_secs(86400) = 1 day
113    pub dlq_ttl: Option<Duration>,
114}
115
116impl RetryConfig {
117    /// Create exponential retry: 1s -> 2s -> 4s -> 8s -> 16s (max 5 retries)
118    pub fn exponential_default() -> Self {
119        Self {
120            max_retries: 5,
121            mechanism: RetryMechanism::Exponential {
122                base_delay: Duration::from_secs(1),
123                max_delay: Duration::from_secs(60),
124            },
125            delay_strategy: DelayStrategy::default(),
126            dead_letter_exchange: None,
127            dead_letter_queue: None,
128            dlq_ttl: None,
129        }
130    }
131
132    /// Create exponential retry with custom parameters
133    pub fn exponential(max_retries: u32, base_delay: Duration, max_delay: Duration) -> Self {
134        Self {
135            max_retries,
136            mechanism: RetryMechanism::Exponential {
137                base_delay,
138                max_delay,
139            },
140            delay_strategy: DelayStrategy::default(),
141            dead_letter_exchange: None,
142            dead_letter_queue: None,
143            dlq_ttl: None,
144        }
145    }
146
147    /// Create linear retry: same delay for each attempt
148    pub fn linear(max_retries: u32, delay: Duration) -> Self {
149        Self {
150            max_retries,
151            mechanism: RetryMechanism::Linear { delay },
152            delay_strategy: DelayStrategy::default(),
153            dead_letter_exchange: None,
154            dead_letter_queue: None,
155            dlq_ttl: None,
156        }
157    }
158
159    /// Create custom retry with specific delays for each attempt
160    pub fn custom(delays: Vec<Duration>) -> Self {
161        let max_retries = delays.len() as u32;
162        Self {
163            max_retries,
164            mechanism: RetryMechanism::Custom { delays },
165            delay_strategy: DelayStrategy::default(),
166            dead_letter_exchange: None,
167            dead_letter_queue: None,
168            dlq_ttl: None,
169        }
170    }
171
172    /// No retries - fail immediately
173    pub fn no_retry() -> Self {
174        Self {
175            max_retries: 0,
176            mechanism: RetryMechanism::Linear {
177                delay: Duration::from_secs(0),
178            },
179            delay_strategy: DelayStrategy::default(),
180            dead_letter_exchange: None,
181            dead_letter_queue: None,
182            dlq_ttl: None,
183        }
184    }
185
186    /// Set custom dead letter exchange and queue names
187    pub fn with_dead_letter(mut self, exchange: String, queue: String) -> Self {
188        self.dead_letter_exchange = Some(exchange);
189        self.dead_letter_queue = Some(queue);
190        self
191    }
192
193    /// Set delay strategy to use (TTL or DelayedExchange)
194    pub fn with_delay_strategy(mut self, strategy: DelayStrategy) -> Self {
195        self.delay_strategy = strategy;
196        self
197    }
198
199    /// Set TTL for dead letter queue (auto-cleanup failed messages)
200    /// Messages in DLQ will be automatically removed after this duration
201    ///
202    /// # Example
203    /// ```ignore
204    /// let config = RetryConfig::exponential_default()
205    ///     .with_dlq_ttl(Duration::from_secs(86400));  // 1 day
206    /// ```
207    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
208        self.dlq_ttl = Some(ttl);
209        self
210    }
211
212    /// Calculate delay for a specific retry attempt (0-indexed)
213    pub fn calculate_delay(&self, attempt: u32) -> Option<Duration> {
214        if attempt >= self.max_retries {
215            return None; // No more retries
216        }
217
218        let delay = match &self.mechanism {
219            RetryMechanism::Exponential {
220                base_delay,
221                max_delay,
222            } => {
223                let exponential_delay =
224                    Duration::from_millis(base_delay.as_millis() as u64 * 2_u64.pow(attempt));
225                std::cmp::min(exponential_delay, *max_delay)
226            }
227            RetryMechanism::Linear { delay } => *delay,
228            RetryMechanism::Custom { delays } => {
229                if (attempt as usize) < delays.len() {
230                    delays[attempt as usize]
231                } else {
232                    return None; // No more custom delays
233                }
234            }
235        };
236
237        Some(delay)
238    }
239
240    /// Get dead letter exchange name (with fallback to default)
241    pub fn get_dead_letter_exchange(&self, queue_name: &str) -> String {
242        self.dead_letter_exchange
243            .clone()
244            .unwrap_or_else(|| format!("{}.dlx", queue_name))
245    }
246
247    /// Get dead letter queue name (with fallback to default)
248    pub fn get_dead_letter_queue(&self, queue_name: &str) -> String {
249        self.dead_letter_queue
250            .clone()
251            .unwrap_or_else(|| format!("{}.dlq", queue_name))
252    }
253
254    /// Generate retry queue name for delayed messages
255    pub fn get_retry_queue_name(&self, queue_name: &str, attempt: u32) -> String {
256        format!("{}.retry.{}", queue_name, attempt + 1)
257    }
258
259    /// Get delay exchange name (for delayed exchange strategy)
260    pub fn get_delay_exchange(&self, queue_name: &str) -> String {
261        format!("{}.delay", queue_name)
262    }
263}
264
265impl Default for RetryConfig {
266    fn default() -> Self {
267        Self::exponential_default()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn test_exponential_retry() {
277        let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
278
279        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
280        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
281        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
282        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8))); // 8s
283        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16))); // 16s
284        assert_eq!(config.calculate_delay(5), None); // No more retries
285    }
286
287    #[test]
288    fn test_exponential_retry_with_cap() {
289        let config = RetryConfig::exponential(10, Duration::from_secs(1), Duration::from_secs(5));
290
291        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
292        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
293        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
294        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(5))); // 5s (capped)
295        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(5))); // 5s (capped)
296    }
297
298    #[test]
299    fn test_linear_retry() {
300        let config = RetryConfig::linear(3, Duration::from_secs(5));
301
302        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
303        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
304        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
305        assert_eq!(config.calculate_delay(3), None); // No more retries
306    }
307
308    #[test]
309    fn test_custom_retry() {
310        let delays = vec![
311            Duration::from_secs(1),
312            Duration::from_secs(5),
313            Duration::from_secs(30),
314        ];
315        let config = RetryConfig::custom(delays);
316
317        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
318        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
319        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(30)));
320        assert_eq!(config.calculate_delay(3), None); // No more retries
321    }
322
323    #[test]
324    fn test_no_retry() {
325        let config = RetryConfig::no_retry();
326
327        assert_eq!(config.calculate_delay(0), None); // No retries at all
328    }
329
330    #[test]
331    fn test_dead_letter_names() {
332        let config = RetryConfig::exponential_default();
333
334        assert_eq!(config.get_dead_letter_exchange("orders"), "orders.dlx");
335        assert_eq!(config.get_dead_letter_queue("orders"), "orders.dlq");
336
337        let config_custom =
338            config.with_dead_letter("custom.dlx".to_string(), "custom.dlq".to_string());
339        assert_eq!(
340            config_custom.get_dead_letter_exchange("orders"),
341            "custom.dlx"
342        );
343        assert_eq!(config_custom.get_dead_letter_queue("orders"), "custom.dlq");
344    }
345
346    #[test]
347    fn test_retry_queue_names() {
348        let config = RetryConfig::exponential_default();
349
350        assert_eq!(config.get_retry_queue_name("orders", 0), "orders.retry.1");
351        assert_eq!(config.get_retry_queue_name("orders", 1), "orders.retry.2");
352        assert_eq!(config.get_retry_queue_name("orders", 4), "orders.retry.5");
353    }
354
355    #[test]
356    fn test_delay_exchange_names() {
357        let config = RetryConfig::exponential_default();
358
359        assert_eq!(config.get_delay_exchange("orders"), "orders.delay");
360    }
361
362    #[test]
363    fn test_delay_strategy_default() {
364        let config = RetryConfig::exponential_default();
365
366        assert!(matches!(config.delay_strategy, DelayStrategy::TTL));
367    }
368
369    #[test]
370    fn test_delay_strategy_configuration() {
371        let config =
372            RetryConfig::exponential_default().with_delay_strategy(DelayStrategy::DelayedExchange);
373
374        assert!(matches!(
375            config.delay_strategy,
376            DelayStrategy::DelayedExchange
377        ));
378    }
379}