rust_rabbit/
retry.rs

1//! Simplified and flexible retry system for rust-rabbit
2//!
3//! This module provides a simple but powerful retry mechanism with support for:
4//! - Configurable max retry count
5//! - Multiple retry mechanisms: exponential (1s->2s->4s->8s), linear, custom delays
6//! - Delay-based message retry using RabbitMQ delayed exchanges
7//! - Two delay strategies: TTL-based (original) or RabbitMQ delayed exchange plugin
8
9use serde::{Deserialize, Serialize};
10use std::time::Duration;
11
12/// Strategy for delaying retry messages
13///
14/// # TTL (Time-To-Live) Strategy
15/// Uses RabbitMQ's TTL feature with temporary retry queues.
16/// Messages are published to temporary queues that expire and route to the original queue.
17/// **Pros**: No plugin required, simple setup
18/// **Cons**: Less precise timing, requires queue cleanup
19///
20/// # DelayedExchange Strategy
21/// Uses the RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin).
22/// Messages are published to a delay exchange with x-delay header.
23/// The exchange automatically routes messages after the delay period.
24/// **Pros**: More precise, cleaner architecture, single exchange
25/// **Cons**: Requires RabbitMQ plugin installation
26///
27/// # Example
28/// ```rust
29/// use rust_rabbit::{DelayStrategy, RetryConfig};
30/// use std::time::Duration;
31///
32/// // Using TTL strategy (default, no plugin required)
33/// let config = RetryConfig::exponential_default()
34///     .with_delay_strategy(DelayStrategy::TTL);
35///
36/// // Using RabbitMQ delayed exchange plugin (requires plugin)
37/// let config = RetryConfig::linear(3, Duration::from_secs(5))
38///     .with_delay_strategy(DelayStrategy::DelayedExchange);
39/// ```
40#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
41pub enum DelayStrategy {
42    /// TTL-based delays using temporary queues (no plugin required)
43    #[default]
44    TTL,
45    /// RabbitMQ delayed message exchange plugin (requires x-delayed-message plugin)
46    ///
47    /// **IMPORTANT**: Using this strategy without the `rabbitmq_delayed_message_exchange` plugin
48    /// will cause your application to crash with "NOT_FOUND - operation not permitted on this exchange" error.
49    ///
50    /// Before deploying code with `DelayedExchange`, ensure:
51    /// 1. Plugin is installed: <https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases>
52    /// 2. Plugin is enabled: `rabbitmq-plugins enable rabbitmq_delayed_message_exchange`
53    /// 3. RabbitMQ is restarted after plugin installation
54    DelayedExchange,
55}
56
57/// Retry mechanism configuration
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum RetryMechanism {
60    /// Exponential backoff: base_delay * 2^attempt, capped at max_delay
61    /// Example: 1s -> 2s -> 4s -> 8s -> 16s (capped at max_delay)
62    Exponential {
63        base_delay: Duration,
64        max_delay: Duration,
65    },
66
67    /// Linear delay: same delay for each retry attempt
68    /// Example: 5s -> 5s -> 5s -> 5s
69    Linear { delay: Duration },
70
71    /// Custom delays: specify exact delay for each retry attempt
72    /// Example: [1s, 5s, 30s] for 3 retries with increasing delays
73    Custom { delays: Vec<Duration> },
74}
75
76/// Simple retry configuration
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RetryConfig {
79    /// Maximum number of retry attempts (0 means no retries)
80    pub max_retries: u32,
81
82    /// Retry mechanism to use
83    pub mechanism: RetryMechanism,
84
85    /// Strategy for delaying retry messages (TTL or DelayedExchange)
86    #[serde(default)]
87    pub delay_strategy: DelayStrategy,
88
89    /// Dead letter exchange for messages that exceed max retries
90    /// If None, uses default: "{queue_name}.dlx"
91    pub dead_letter_exchange: Option<String>,
92
93    /// Dead letter queue for permanently failed messages
94    /// If None, uses default: "{queue_name}.dlq"
95    pub dead_letter_queue: Option<String>,
96
97    /// TTL for dead letter queue (auto-cleanup failed messages after this duration)
98    /// If set, messages in DLQ will be automatically removed after TTL expires
99    /// Example: Duration::from_secs(86400) = 1 day
100    pub dlq_ttl: Option<Duration>,
101}
102
103impl RetryConfig {
104    /// Create exponential retry: 1s -> 2s -> 4s -> 8s -> 16s (max 5 retries)
105    pub fn exponential_default() -> Self {
106        Self {
107            max_retries: 5,
108            mechanism: RetryMechanism::Exponential {
109                base_delay: Duration::from_secs(1),
110                max_delay: Duration::from_secs(60),
111            },
112            delay_strategy: DelayStrategy::default(),
113            dead_letter_exchange: None,
114            dead_letter_queue: None,
115            dlq_ttl: None,
116        }
117    }
118
119    /// Create exponential retry with custom parameters
120    pub fn exponential(max_retries: u32, base_delay: Duration, max_delay: Duration) -> Self {
121        Self {
122            max_retries,
123            mechanism: RetryMechanism::Exponential {
124                base_delay,
125                max_delay,
126            },
127            delay_strategy: DelayStrategy::default(),
128            dead_letter_exchange: None,
129            dead_letter_queue: None,
130            dlq_ttl: None,
131        }
132    }
133
134    /// Create linear retry: same delay for each attempt
135    pub fn linear(max_retries: u32, delay: Duration) -> Self {
136        Self {
137            max_retries,
138            mechanism: RetryMechanism::Linear { delay },
139            delay_strategy: DelayStrategy::default(),
140            dead_letter_exchange: None,
141            dead_letter_queue: None,
142            dlq_ttl: None,
143        }
144    }
145
146    /// Create custom retry with specific delays for each attempt
147    pub fn custom(delays: Vec<Duration>) -> Self {
148        let max_retries = delays.len() as u32;
149        Self {
150            max_retries,
151            mechanism: RetryMechanism::Custom { delays },
152            delay_strategy: DelayStrategy::default(),
153            dead_letter_exchange: None,
154            dead_letter_queue: None,
155            dlq_ttl: None,
156        }
157    }
158
159    /// No retries - fail immediately
160    pub fn no_retry() -> Self {
161        Self {
162            max_retries: 0,
163            mechanism: RetryMechanism::Linear {
164                delay: Duration::from_secs(0),
165            },
166            delay_strategy: DelayStrategy::default(),
167            dead_letter_exchange: None,
168            dead_letter_queue: None,
169            dlq_ttl: None,
170        }
171    }
172
173    /// Set custom dead letter exchange and queue names
174    pub fn with_dead_letter(mut self, exchange: String, queue: String) -> Self {
175        self.dead_letter_exchange = Some(exchange);
176        self.dead_letter_queue = Some(queue);
177        self
178    }
179
180    /// Set delay strategy to use (TTL or DelayedExchange)
181    pub fn with_delay_strategy(mut self, strategy: DelayStrategy) -> Self {
182        self.delay_strategy = strategy;
183        self
184    }
185
186    /// Set TTL for dead letter queue (auto-cleanup failed messages)
187    /// Messages in DLQ will be automatically removed after this duration
188    ///
189    /// # Example
190    /// ```rust
191    /// # use rust_rabbit::RetryConfig;
192    /// # use std::time::Duration;
193    /// let config = RetryConfig::exponential_default()
194    ///     .with_dlq_ttl(Duration::from_secs(86400));  // 1 day
195    /// ```
196    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
197        self.dlq_ttl = Some(ttl);
198        self
199    }
200
201    /// Calculate delay for a specific retry attempt (0-indexed)
202    pub fn calculate_delay(&self, attempt: u32) -> Option<Duration> {
203        if attempt >= self.max_retries {
204            return None; // No more retries
205        }
206
207        let delay = match &self.mechanism {
208            RetryMechanism::Exponential {
209                base_delay,
210                max_delay,
211            } => {
212                let exponential_delay =
213                    Duration::from_millis(base_delay.as_millis() as u64 * 2_u64.pow(attempt));
214                std::cmp::min(exponential_delay, *max_delay)
215            }
216            RetryMechanism::Linear { delay } => *delay,
217            RetryMechanism::Custom { delays } => {
218                if (attempt as usize) < delays.len() {
219                    delays[attempt as usize]
220                } else {
221                    return None; // No more custom delays
222                }
223            }
224        };
225
226        Some(delay)
227    }
228
229    /// Get dead letter exchange name (with fallback to default)
230    pub fn get_dead_letter_exchange(&self, queue_name: &str) -> String {
231        self.dead_letter_exchange
232            .clone()
233            .unwrap_or_else(|| format!("{}.dlx", queue_name))
234    }
235
236    /// Get dead letter queue name (with fallback to default)
237    pub fn get_dead_letter_queue(&self, queue_name: &str) -> String {
238        self.dead_letter_queue
239            .clone()
240            .unwrap_or_else(|| format!("{}.dlq", queue_name))
241    }
242
243    /// Generate retry queue name for delayed messages
244    pub fn get_retry_queue_name(&self, queue_name: &str, attempt: u32) -> String {
245        format!("{}.retry.{}", queue_name, attempt + 1)
246    }
247
248    /// Get delay exchange name (for delayed exchange strategy)
249    pub fn get_delay_exchange(&self, queue_name: &str) -> String {
250        format!("{}.delay", queue_name)
251    }
252}
253
254impl Default for RetryConfig {
255    fn default() -> Self {
256        Self::exponential_default()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_exponential_retry() {
266        let config = RetryConfig::exponential(5, Duration::from_secs(1), Duration::from_secs(30));
267
268        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
269        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
270        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
271        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(8))); // 8s
272        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(16))); // 16s
273        assert_eq!(config.calculate_delay(5), None); // No more retries
274    }
275
276    #[test]
277    fn test_exponential_retry_with_cap() {
278        let config = RetryConfig::exponential(10, Duration::from_secs(1), Duration::from_secs(5));
279
280        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1))); // 1s
281        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(2))); // 2s
282        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(4))); // 4s
283        assert_eq!(config.calculate_delay(3), Some(Duration::from_secs(5))); // 5s (capped)
284        assert_eq!(config.calculate_delay(4), Some(Duration::from_secs(5))); // 5s (capped)
285    }
286
287    #[test]
288    fn test_linear_retry() {
289        let config = RetryConfig::linear(3, Duration::from_secs(5));
290
291        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(5)));
292        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
293        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(5)));
294        assert_eq!(config.calculate_delay(3), None); // No more retries
295    }
296
297    #[test]
298    fn test_custom_retry() {
299        let delays = vec![
300            Duration::from_secs(1),
301            Duration::from_secs(5),
302            Duration::from_secs(30),
303        ];
304        let config = RetryConfig::custom(delays);
305
306        assert_eq!(config.calculate_delay(0), Some(Duration::from_secs(1)));
307        assert_eq!(config.calculate_delay(1), Some(Duration::from_secs(5)));
308        assert_eq!(config.calculate_delay(2), Some(Duration::from_secs(30)));
309        assert_eq!(config.calculate_delay(3), None); // No more retries
310    }
311
312    #[test]
313    fn test_no_retry() {
314        let config = RetryConfig::no_retry();
315
316        assert_eq!(config.calculate_delay(0), None); // No retries at all
317    }
318
319    #[test]
320    fn test_dead_letter_names() {
321        let config = RetryConfig::exponential_default();
322
323        assert_eq!(config.get_dead_letter_exchange("orders"), "orders.dlx");
324        assert_eq!(config.get_dead_letter_queue("orders"), "orders.dlq");
325
326        let config_custom =
327            config.with_dead_letter("custom.dlx".to_string(), "custom.dlq".to_string());
328        assert_eq!(
329            config_custom.get_dead_letter_exchange("orders"),
330            "custom.dlx"
331        );
332        assert_eq!(config_custom.get_dead_letter_queue("orders"), "custom.dlq");
333    }
334
335    #[test]
336    fn test_retry_queue_names() {
337        let config = RetryConfig::exponential_default();
338
339        assert_eq!(config.get_retry_queue_name("orders", 0), "orders.retry.1");
340        assert_eq!(config.get_retry_queue_name("orders", 1), "orders.retry.2");
341        assert_eq!(config.get_retry_queue_name("orders", 4), "orders.retry.5");
342    }
343
344    #[test]
345    fn test_delay_exchange_names() {
346        let config = RetryConfig::exponential_default();
347
348        assert_eq!(config.get_delay_exchange("orders"), "orders.delay");
349    }
350
351    #[test]
352    fn test_delay_strategy_default() {
353        let config = RetryConfig::exponential_default();
354
355        assert!(matches!(config.delay_strategy, DelayStrategy::TTL));
356    }
357
358    #[test]
359    fn test_delay_strategy_configuration() {
360        let config =
361            RetryConfig::exponential_default().with_delay_strategy(DelayStrategy::DelayedExchange);
362
363        assert!(matches!(
364            config.delay_strategy,
365            DelayStrategy::DelayedExchange
366        ));
367    }
368}