1use crate::{
2 connection::ConnectionManager,
3 error::{RabbitError, Result},
4};
5use lapin::{
6 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
7 types::FieldTable,
8 BasicProperties, Channel, ExchangeKind,
9};
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetryPolicy {
17 pub max_retries: u32,
19
20 pub initial_delay: Duration,
22
23 pub max_delay: Duration,
25
26 pub backoff_multiplier: f64,
28
29 pub jitter: f64,
31
32 pub retry_queue_pattern: String,
34
35 pub dead_letter_exchange: Option<String>,
37
38 pub dead_letter_queue: Option<String>,
40}
41
42impl Default for RetryPolicy {
43 fn default() -> Self {
44 Self {
45 max_retries: 3,
46 initial_delay: Duration::from_millis(1000),
47 max_delay: Duration::from_secs(60),
48 backoff_multiplier: 2.0,
49 jitter: 0.1,
50 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
51 dead_letter_exchange: Some("dead-letter".to_string()),
52 dead_letter_queue: Some("dead-letter-queue".to_string()),
53 }
54 }
55}
56
57impl RetryPolicy {
58 pub fn calculate_delay(&self, attempt: u32) -> Duration {
60 let base_delay = Duration::from_millis(
61 (self.initial_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
62 as u64,
63 );
64
65 let delay = if base_delay > self.max_delay {
66 self.max_delay
67 } else {
68 base_delay
69 };
70
71 if self.jitter > 0.0 {
73 let jitter_amount = (delay.as_millis() as f64 * self.jitter) as u64;
74 let jitter = fastrand::u64(0..=jitter_amount);
75 Duration::from_millis(delay.as_millis() as u64 + jitter)
76 } else {
77 delay
78 }
79 }
80
81 pub fn get_retry_queue_name(&self, original_queue: &str, attempt: u32) -> String {
83 self.retry_queue_pattern
84 .replace("{queue_name}", original_queue)
85 .replace("{attempt}", &attempt.to_string())
86 }
87}
88
89pub struct DelayedMessageExchange {
91 connection_manager: ConnectionManager,
92 exchange_name: String,
93 retry_policy: RetryPolicy,
94}
95
96impl DelayedMessageExchange {
97 pub fn new(
99 connection_manager: ConnectionManager,
100 exchange_name: String,
101 retry_policy: RetryPolicy,
102 ) -> Self {
103 Self {
104 connection_manager,
105 exchange_name,
106 retry_policy,
107 }
108 }
109
110 pub async fn setup(&self) -> Result<()> {
112 let connection = self.connection_manager.get_connection().await?;
113 let channel = connection.create_channel().await?;
114
115 self.declare_delayed_exchange(&channel).await?;
117
118 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
120 self.setup_dead_letter_infrastructure(&channel, dle).await?;
121 }
122
123 info!(
124 "Delayed message exchange setup completed: {}",
125 self.exchange_name
126 );
127 Ok(())
128 }
129
130 async fn declare_delayed_exchange(&self, channel: &Channel) -> Result<()> {
132 let mut arguments = FieldTable::default();
134 arguments.insert(
135 "x-delayed-type".into(),
136 lapin::types::AMQPValue::LongString("direct".into()),
137 );
138
139 let options = ExchangeDeclareOptions {
140 passive: false,
141 durable: true,
142 auto_delete: false,
143 internal: false,
144 nowait: false,
145 };
146
147 channel
148 .exchange_declare(
149 &self.exchange_name,
150 ExchangeKind::Custom("x-delayed-message".to_string()),
151 options,
152 arguments,
153 )
154 .await?;
155
156 debug!("Declared delayed message exchange: {}", self.exchange_name);
157 Ok(())
158 }
159
160 async fn setup_dead_letter_infrastructure(
162 &self,
163 channel: &Channel,
164 dle_name: &str,
165 ) -> Result<()> {
166 let dle_options = ExchangeDeclareOptions {
168 passive: false,
169 durable: true,
170 auto_delete: false,
171 internal: false,
172 nowait: false,
173 };
174
175 channel
176 .exchange_declare(
177 dle_name,
178 ExchangeKind::Direct,
179 dle_options,
180 FieldTable::default(),
181 )
182 .await?;
183
184 if let Some(ref dlq_name) = self.retry_policy.dead_letter_queue {
186 let dlq_options = QueueDeclareOptions {
187 passive: false,
188 durable: true,
189 exclusive: false,
190 auto_delete: false,
191 nowait: false,
192 };
193
194 channel
195 .queue_declare(dlq_name, dlq_options, FieldTable::default())
196 .await?;
197
198 channel
200 .queue_bind(
201 dlq_name,
202 dle_name,
203 "dead-letter",
204 QueueBindOptions::default(),
205 FieldTable::default(),
206 )
207 .await?;
208
209 debug!("Setup dead letter queue: {}", dlq_name);
210 }
211
212 debug!("Setup dead letter exchange: {}", dle_name);
213 Ok(())
214 }
215
216 pub async fn publish_with_retry<T>(
218 &self,
219 original_queue: &str,
220 message: &T,
221 retry_count: u32,
222 original_headers: Option<FieldTable>,
223 ) -> Result<()>
224 where
225 T: Serialize,
226 {
227 if retry_count >= self.retry_policy.max_retries {
228 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
230 return self
231 .send_to_dead_letter(message, dle, original_headers)
232 .await;
233 } else {
234 return Err(RabbitError::RetryExhausted(format!(
235 "Max retries ({}) exceeded for queue: {}",
236 self.retry_policy.max_retries, original_queue
237 )));
238 }
239 }
240
241 let delay = self.retry_policy.calculate_delay(retry_count);
242 let connection = self.connection_manager.get_connection().await?;
243 let channel = connection.create_channel().await?;
244
245 let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
247
248 let mut properties = BasicProperties::default()
250 .with_content_type("application/json".into())
251 .with_delivery_mode(2); let mut headers = original_headers.unwrap_or_default();
255 headers.insert(
256 "x-delay".into(),
257 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
258 );
259 headers.insert(
260 "x-retry-count".into(),
261 lapin::types::AMQPValue::LongInt(retry_count as i32),
262 );
263 headers.insert(
264 "x-original-queue".into(),
265 lapin::types::AMQPValue::LongString(original_queue.into()),
266 );
267
268 properties = properties.with_headers(headers);
269
270 channel
272 .basic_publish(
273 &self.exchange_name,
274 original_queue, BasicPublishOptions::default(),
276 &payload,
277 properties,
278 )
279 .await?;
280
281 info!(
282 "Published retry message for queue: {} (attempt: {}, delay: {:?})",
283 original_queue,
284 retry_count + 1,
285 delay
286 );
287
288 Ok(())
289 }
290
291 async fn send_to_dead_letter<T>(
293 &self,
294 message: &T,
295 dead_letter_exchange: &str,
296 original_headers: Option<FieldTable>,
297 ) -> Result<()>
298 where
299 T: Serialize,
300 {
301 let connection = self.connection_manager.get_connection().await?;
302 let channel = connection.create_channel().await?;
303
304 let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
306
307 let mut properties = BasicProperties::default()
309 .with_content_type("application/json".into())
310 .with_delivery_mode(2); let mut headers = original_headers.unwrap_or_default();
314 headers.insert(
315 "x-death-reason".into(),
316 lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
317 );
318 headers.insert(
319 "x-death-timestamp".into(),
320 lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp()),
321 );
322
323 properties = properties.with_headers(headers);
324
325 channel
327 .basic_publish(
328 dead_letter_exchange,
329 "dead-letter", BasicPublishOptions::default(),
331 &payload,
332 properties,
333 )
334 .await?;
335
336 warn!(
337 "Message sent to dead letter exchange: {}",
338 dead_letter_exchange
339 );
340 Ok(())
341 }
342
343 pub async fn setup_retry_queues(&self, original_queue: &str) -> Result<()> {
345 let connection = self.connection_manager.get_connection().await?;
346 let channel = connection.create_channel().await?;
347
348 for attempt in 1..=self.retry_policy.max_retries {
350 let retry_queue_name = self
351 .retry_policy
352 .get_retry_queue_name(original_queue, attempt);
353
354 let mut arguments = FieldTable::default();
356
357 let delay = self.retry_policy.calculate_delay(attempt - 1);
359 arguments.insert(
360 "x-message-ttl".into(),
361 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
362 );
363
364 if attempt < self.retry_policy.max_retries {
366 arguments.insert(
367 "x-dead-letter-exchange".into(),
368 lapin::types::AMQPValue::LongString("".into()), );
370 arguments.insert(
371 "x-dead-letter-routing-key".into(),
372 lapin::types::AMQPValue::LongString(original_queue.into()),
373 );
374 } else {
375 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
377 arguments.insert(
378 "x-dead-letter-exchange".into(),
379 lapin::types::AMQPValue::LongString(dle.clone().into()),
380 );
381 arguments.insert(
382 "x-dead-letter-routing-key".into(),
383 lapin::types::AMQPValue::LongString("dead-letter".into()),
384 );
385 }
386 }
387
388 let queue_options = QueueDeclareOptions {
390 passive: false,
391 durable: true,
392 exclusive: false,
393 auto_delete: false,
394 nowait: false,
395 };
396
397 channel
398 .queue_declare(&retry_queue_name, queue_options, arguments)
399 .await?;
400
401 debug!(
402 "Setup retry queue: {} for attempt: {}",
403 retry_queue_name, attempt
404 );
405 }
406
407 info!("Retry queues setup completed for: {}", original_queue);
408 Ok(())
409 }
410}
411
412#[derive(Debug, Serialize, Deserialize)]
414pub struct RetryMessage<T> {
415 pub original_message: T,
416 pub retry_count: u32,
417 pub original_queue: String,
418 pub original_headers: Option<serde_json::Value>,
419 pub retry_timestamp: chrono::DateTime<chrono::Utc>,
420}
421
422impl<T> RetryMessage<T> {
423 pub fn new(
424 original_message: T,
425 retry_count: u32,
426 original_queue: String,
427 original_headers: Option<serde_json::Value>,
428 ) -> Self {
429 Self {
430 original_message,
431 retry_count,
432 original_queue,
433 original_headers,
434 retry_timestamp: chrono::Utc::now(),
435 }
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use std::time::Duration;
443
444 #[test]
445 fn test_retry_policy_default() {
446 let policy = RetryPolicy::default();
447 assert_eq!(policy.max_retries, 3);
448 assert_eq!(policy.initial_delay, Duration::from_millis(1000));
449 assert_eq!(policy.max_delay, Duration::from_secs(60));
450 assert_eq!(policy.backoff_multiplier, 2.0);
451 assert_eq!(policy.jitter, 0.1);
452 }
453
454 #[test]
455 fn test_retry_policy_calculate_delay() {
456 let policy = RetryPolicy {
457 initial_delay: Duration::from_millis(1000),
458 max_delay: Duration::from_secs(30),
459 backoff_multiplier: 2.0,
460 jitter: 0.0, ..Default::default()
462 };
463
464 let delay1 = policy.calculate_delay(0);
465 assert_eq!(delay1, Duration::from_millis(1000));
466
467 let delay2 = policy.calculate_delay(1);
468 assert_eq!(delay2, Duration::from_millis(2000));
469
470 let delay3 = policy.calculate_delay(2);
471 assert_eq!(delay3, Duration::from_millis(4000));
472
473 let delay_large = policy.calculate_delay(10);
475 assert_eq!(delay_large, Duration::from_secs(30));
476 }
477
478 #[test]
479 fn test_retry_queue_name_generation() {
480 let policy = RetryPolicy::default();
481 let queue_name = policy.get_retry_queue_name("orders", 1);
482 assert_eq!(queue_name, "orders.retry.1");
483
484 let queue_name = policy.get_retry_queue_name("user-events", 3);
485 assert_eq!(queue_name, "user-events.retry.3");
486 }
487
488 #[test]
489 fn test_retry_message_creation() {
490 let original_message = "test message";
491 let retry_msg = RetryMessage::new(original_message, 2, "test-queue".to_string(), None);
492
493 assert_eq!(retry_msg.original_message, "test message");
494 assert_eq!(retry_msg.retry_count, 2);
495 assert_eq!(retry_msg.original_queue, "test-queue");
496 assert!(retry_msg.original_headers.is_none());
497 }
498}