rust_rabbit/
retry.rs

1use crate::{
2    connection::ConnectionManager,
3    error::{RabbitError, Result},
4};
5use lapin::{
6    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
7    types::FieldTable,
8    BasicProperties, Channel, ExchangeKind,
9};
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14/// Retry policy configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetryPolicy {
17    /// Maximum number of retry attempts
18    pub max_retries: u32,
19
20    /// Initial delay between retries
21    pub initial_delay: Duration,
22
23    /// Maximum delay between retries
24    pub max_delay: Duration,
25
26    /// Multiplier for exponential backoff
27    pub backoff_multiplier: f64,
28
29    /// Jitter factor (0.0 to 1.0) to add randomness to delays
30    pub jitter: f64,
31
32    /// Retry queue naming pattern
33    pub retry_queue_pattern: String,
34
35    /// Dead letter exchange for failed messages
36    pub dead_letter_exchange: Option<String>,
37
38    /// Dead letter queue for failed messages
39    pub dead_letter_queue: Option<String>,
40}
41
42impl Default for RetryPolicy {
43    fn default() -> Self {
44        Self {
45            max_retries: 3,
46            initial_delay: Duration::from_millis(1000),
47            max_delay: Duration::from_secs(60),
48            backoff_multiplier: 2.0,
49            jitter: 0.1,
50            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
51            dead_letter_exchange: Some("dead-letter".to_string()),
52            dead_letter_queue: Some("dead-letter-queue".to_string()),
53        }
54    }
55}
56
57impl RetryPolicy {
58    /// Calculate delay for a specific retry attempt
59    pub fn calculate_delay(&self, attempt: u32) -> Duration {
60        let base_delay = Duration::from_millis(
61            (self.initial_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
62                as u64,
63        );
64
65        let delay = if base_delay > self.max_delay {
66            self.max_delay
67        } else {
68            base_delay
69        };
70
71        // Add jitter to prevent thundering herd
72        if self.jitter > 0.0 {
73            let jitter_amount = (delay.as_millis() as f64 * self.jitter) as u64;
74            let jitter = fastrand::u64(0..=jitter_amount);
75            Duration::from_millis(delay.as_millis() as u64 + jitter)
76        } else {
77            delay
78        }
79    }
80
81    /// Generate retry queue name for a specific attempt
82    pub fn get_retry_queue_name(&self, original_queue: &str, attempt: u32) -> String {
83        self.retry_queue_pattern
84            .replace("{queue_name}", original_queue)
85            .replace("{attempt}", &attempt.to_string())
86    }
87}
88
89/// Delayed Message Exchange handler for implementing retry mechanism
90pub struct DelayedMessageExchange {
91    connection_manager: ConnectionManager,
92    exchange_name: String,
93    retry_policy: RetryPolicy,
94}
95
96impl DelayedMessageExchange {
97    /// Create a new DelayedMessageExchange
98    pub fn new(
99        connection_manager: ConnectionManager,
100        exchange_name: String,
101        retry_policy: RetryPolicy,
102    ) -> Self {
103        Self {
104            connection_manager,
105            exchange_name,
106            retry_policy,
107        }
108    }
109
110    /// Setup the delayed message exchange and retry infrastructure
111    pub async fn setup(&self) -> Result<()> {
112        let connection = self.connection_manager.get_connection().await?;
113        let channel = connection.create_channel().await?;
114
115        // Declare the delayed message exchange
116        self.declare_delayed_exchange(&channel).await?;
117
118        // Setup dead letter exchange and queue if configured
119        if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
120            self.setup_dead_letter_infrastructure(&channel, dle).await?;
121        }
122
123        info!(
124            "Delayed message exchange setup completed: {}",
125            self.exchange_name
126        );
127        Ok(())
128    }
129
130    /// Declare the delayed message exchange
131    async fn declare_delayed_exchange(&self, channel: &Channel) -> Result<()> {
132        // Arguments for delayed message exchange
133        let mut arguments = FieldTable::default();
134        arguments.insert(
135            "x-delayed-type".into(),
136            lapin::types::AMQPValue::LongString("direct".into()),
137        );
138
139        let options = ExchangeDeclareOptions {
140            passive: false,
141            durable: true,
142            auto_delete: false,
143            internal: false,
144            nowait: false,
145        };
146
147        channel
148            .exchange_declare(
149                &self.exchange_name,
150                ExchangeKind::Custom("x-delayed-message".to_string()),
151                options,
152                arguments,
153            )
154            .await?;
155
156        debug!("Declared delayed message exchange: {}", self.exchange_name);
157        Ok(())
158    }
159
160    /// Setup dead letter exchange and queue
161    async fn setup_dead_letter_infrastructure(
162        &self,
163        channel: &Channel,
164        dle_name: &str,
165    ) -> Result<()> {
166        // Declare dead letter exchange
167        let dle_options = ExchangeDeclareOptions {
168            passive: false,
169            durable: true,
170            auto_delete: false,
171            internal: false,
172            nowait: false,
173        };
174
175        channel
176            .exchange_declare(
177                dle_name,
178                ExchangeKind::Direct,
179                dle_options,
180                FieldTable::default(),
181            )
182            .await?;
183
184        // Declare dead letter queue if configured
185        if let Some(ref dlq_name) = self.retry_policy.dead_letter_queue {
186            let dlq_options = QueueDeclareOptions {
187                passive: false,
188                durable: true,
189                exclusive: false,
190                auto_delete: false,
191                nowait: false,
192            };
193
194            channel
195                .queue_declare(dlq_name, dlq_options, FieldTable::default())
196                .await?;
197
198            // Bind dead letter queue to dead letter exchange
199            channel
200                .queue_bind(
201                    dlq_name,
202                    dle_name,
203                    "dead-letter",
204                    QueueBindOptions::default(),
205                    FieldTable::default(),
206                )
207                .await?;
208
209            debug!("Setup dead letter queue: {}", dlq_name);
210        }
211
212        debug!("Setup dead letter exchange: {}", dle_name);
213        Ok(())
214    }
215
216    /// Publish a message with retry mechanism
217    pub async fn publish_with_retry<T>(
218        &self,
219        original_queue: &str,
220        message: &T,
221        retry_count: u32,
222        original_headers: Option<FieldTable>,
223    ) -> Result<()>
224    where
225        T: Serialize,
226    {
227        if retry_count >= self.retry_policy.max_retries {
228            // Send to dead letter exchange
229            if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
230                return self
231                    .send_to_dead_letter(message, dle, original_headers)
232                    .await;
233            } else {
234                return Err(RabbitError::RetryExhausted(format!(
235                    "Max retries ({}) exceeded for queue: {}",
236                    self.retry_policy.max_retries, original_queue
237                )));
238            }
239        }
240
241        let delay = self.retry_policy.calculate_delay(retry_count);
242        let connection = self.connection_manager.get_connection().await?;
243        let channel = connection.create_channel().await?;
244
245        // Serialize message
246        let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
247
248        // Build properties with delay header
249        let mut properties = BasicProperties::default()
250            .with_content_type("application/json".into())
251            .with_delivery_mode(2); // Persistent
252
253        // Add delay header for delayed message exchange
254        let mut headers = original_headers.unwrap_or_default();
255        headers.insert(
256            "x-delay".into(),
257            lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
258        );
259        headers.insert(
260            "x-retry-count".into(),
261            lapin::types::AMQPValue::LongInt(retry_count as i32),
262        );
263        headers.insert(
264            "x-original-queue".into(),
265            lapin::types::AMQPValue::LongString(original_queue.into()),
266        );
267
268        properties = properties.with_headers(headers);
269
270        // Publish to delayed exchange with original queue as routing key
271        channel
272            .basic_publish(
273                &self.exchange_name,
274                original_queue, // Use original queue name as routing key
275                BasicPublishOptions::default(),
276                &payload,
277                properties,
278            )
279            .await?;
280
281        info!(
282            "Published retry message for queue: {} (attempt: {}, delay: {:?})",
283            original_queue,
284            retry_count + 1,
285            delay
286        );
287
288        Ok(())
289    }
290
291    /// Send message to dead letter exchange
292    async fn send_to_dead_letter<T>(
293        &self,
294        message: &T,
295        dead_letter_exchange: &str,
296        original_headers: Option<FieldTable>,
297    ) -> Result<()>
298    where
299        T: Serialize,
300    {
301        let connection = self.connection_manager.get_connection().await?;
302        let channel = connection.create_channel().await?;
303
304        // Serialize message
305        let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
306
307        // Build properties
308        let mut properties = BasicProperties::default()
309            .with_content_type("application/json".into())
310            .with_delivery_mode(2); // Persistent
311
312        // Add failure headers
313        let mut headers = original_headers.unwrap_or_default();
314        headers.insert(
315            "x-death-reason".into(),
316            lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
317        );
318        headers.insert(
319            "x-death-timestamp".into(),
320            lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp()),
321        );
322
323        properties = properties.with_headers(headers);
324
325        // Publish to dead letter exchange
326        channel
327            .basic_publish(
328                dead_letter_exchange,
329                "dead-letter", // Fixed routing key for dead letters
330                BasicPublishOptions::default(),
331                &payload,
332                properties,
333            )
334            .await?;
335
336        warn!(
337            "Message sent to dead letter exchange: {}",
338            dead_letter_exchange
339        );
340        Ok(())
341    }
342
343    /// Setup retry queues for a specific original queue
344    pub async fn setup_retry_queues(&self, original_queue: &str) -> Result<()> {
345        let connection = self.connection_manager.get_connection().await?;
346        let channel = connection.create_channel().await?;
347
348        // Setup retry queues for each retry attempt
349        for attempt in 1..=self.retry_policy.max_retries {
350            let retry_queue_name = self
351                .retry_policy
352                .get_retry_queue_name(original_queue, attempt);
353
354            // Create retry queue arguments
355            let mut arguments = FieldTable::default();
356
357            // Set message TTL based on retry delay
358            let delay = self.retry_policy.calculate_delay(attempt - 1);
359            arguments.insert(
360                "x-message-ttl".into(),
361                lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
362            );
363
364            // Set dead letter exchange to route back to original queue or next retry
365            if attempt < self.retry_policy.max_retries {
366                arguments.insert(
367                    "x-dead-letter-exchange".into(),
368                    lapin::types::AMQPValue::LongString("".into()), // Default exchange
369                );
370                arguments.insert(
371                    "x-dead-letter-routing-key".into(),
372                    lapin::types::AMQPValue::LongString(original_queue.into()),
373                );
374            } else {
375                // Last retry attempt - send to dead letter exchange
376                if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
377                    arguments.insert(
378                        "x-dead-letter-exchange".into(),
379                        lapin::types::AMQPValue::LongString(dle.clone().into()),
380                    );
381                    arguments.insert(
382                        "x-dead-letter-routing-key".into(),
383                        lapin::types::AMQPValue::LongString("dead-letter".into()),
384                    );
385                }
386            }
387
388            // Declare retry queue
389            let queue_options = QueueDeclareOptions {
390                passive: false,
391                durable: true,
392                exclusive: false,
393                auto_delete: false,
394                nowait: false,
395            };
396
397            channel
398                .queue_declare(&retry_queue_name, queue_options, arguments)
399                .await?;
400
401            debug!(
402                "Setup retry queue: {} for attempt: {}",
403                retry_queue_name, attempt
404            );
405        }
406
407        info!("Retry queues setup completed for: {}", original_queue);
408        Ok(())
409    }
410}
411
412/// Retry message wrapper for serialization
413#[derive(Debug, Serialize, Deserialize)]
414pub struct RetryMessage<T> {
415    pub original_message: T,
416    pub retry_count: u32,
417    pub original_queue: String,
418    pub original_headers: Option<serde_json::Value>,
419    pub retry_timestamp: chrono::DateTime<chrono::Utc>,
420}
421
422impl<T> RetryMessage<T> {
423    pub fn new(
424        original_message: T,
425        retry_count: u32,
426        original_queue: String,
427        original_headers: Option<serde_json::Value>,
428    ) -> Self {
429        Self {
430            original_message,
431            retry_count,
432            original_queue,
433            original_headers,
434            retry_timestamp: chrono::Utc::now(),
435        }
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use std::time::Duration;
443
444    #[test]
445    fn test_retry_policy_default() {
446        let policy = RetryPolicy::default();
447        assert_eq!(policy.max_retries, 3);
448        assert_eq!(policy.initial_delay, Duration::from_millis(1000));
449        assert_eq!(policy.max_delay, Duration::from_secs(60));
450        assert_eq!(policy.backoff_multiplier, 2.0);
451        assert_eq!(policy.jitter, 0.1);
452    }
453
454    #[test]
455    fn test_retry_policy_calculate_delay() {
456        let policy = RetryPolicy {
457            initial_delay: Duration::from_millis(1000),
458            max_delay: Duration::from_secs(30),
459            backoff_multiplier: 2.0,
460            jitter: 0.0, // No jitter for predictable tests
461            ..Default::default()
462        };
463
464        let delay1 = policy.calculate_delay(0);
465        assert_eq!(delay1, Duration::from_millis(1000));
466
467        let delay2 = policy.calculate_delay(1);
468        assert_eq!(delay2, Duration::from_millis(2000));
469
470        let delay3 = policy.calculate_delay(2);
471        assert_eq!(delay3, Duration::from_millis(4000));
472
473        // Test max delay cap
474        let delay_large = policy.calculate_delay(10);
475        assert_eq!(delay_large, Duration::from_secs(30));
476    }
477
478    #[test]
479    fn test_retry_queue_name_generation() {
480        let policy = RetryPolicy::default();
481        let queue_name = policy.get_retry_queue_name("orders", 1);
482        assert_eq!(queue_name, "orders.retry.1");
483
484        let queue_name = policy.get_retry_queue_name("user-events", 3);
485        assert_eq!(queue_name, "user-events.retry.3");
486    }
487
488    #[test]
489    fn test_retry_message_creation() {
490        let original_message = "test message";
491        let retry_msg = RetryMessage::new(original_message, 2, "test-queue".to_string(), None);
492
493        assert_eq!(retry_msg.original_message, "test message");
494        assert_eq!(retry_msg.retry_count, 2);
495        assert_eq!(retry_msg.original_queue, "test-queue");
496        assert!(retry_msg.original_headers.is_none());
497    }
498}