1use crate::{
2 connection::ConnectionManager,
3 error::{RabbitError, Result},
4};
5use lapin::{
6 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
7 types::FieldTable,
8 BasicProperties, Channel, ExchangeKind,
9};
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetryPolicy {
17 pub max_retries: u32,
19
20 pub initial_delay: Duration,
22
23 pub max_delay: Duration,
25
26 pub backoff_multiplier: f64,
28
29 pub jitter: f64,
31
32 pub retry_queue_pattern: String,
34
35 pub dead_letter_exchange: Option<String>,
37
38 pub dead_letter_queue: Option<String>,
40}
41
42impl Default for RetryPolicy {
43 fn default() -> Self {
44 Self {
45 max_retries: 3,
46 initial_delay: Duration::from_millis(1000),
47 max_delay: Duration::from_secs(60),
48 backoff_multiplier: 2.0,
49 jitter: 0.1,
50 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
51 dead_letter_exchange: Some("dead-letter".to_string()),
52 dead_letter_queue: Some("dead-letter-queue".to_string()),
53 }
54 }
55}
56
57impl RetryPolicy {
58 pub fn calculate_delay(&self, attempt: u32) -> Duration {
60 let base_delay = Duration::from_millis(
61 (self.initial_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
62 as u64,
63 );
64
65 let delay = if base_delay > self.max_delay {
66 self.max_delay
67 } else {
68 base_delay
69 };
70
71 if self.jitter > 0.0 {
73 let jitter_amount = (delay.as_millis() as f64 * self.jitter) as u64;
74 let jitter = fastrand::u64(0..=jitter_amount);
75 Duration::from_millis(delay.as_millis() as u64 + jitter)
76 } else {
77 delay
78 }
79 }
80
81 pub fn get_retry_queue_name(&self, original_queue: &str, attempt: u32) -> String {
83 self.retry_queue_pattern
84 .replace("{queue_name}", original_queue)
85 .replace("{attempt}", &attempt.to_string())
86 }
87
88 pub fn fast() -> Self {
90 Self {
91 max_retries: 5,
92 initial_delay: Duration::from_millis(200),
93 max_delay: Duration::from_secs(10),
94 backoff_multiplier: 1.5,
95 jitter: 0.05,
96 dead_letter_exchange: Some("fast.dlx".to_string()),
97 dead_letter_queue: Some("fast.dlq".to_string()),
98 ..Default::default()
99 }
100 }
101
102 pub fn fast_for_queue<S: Into<String>>(queue_name: S) -> Self {
104 let queue = queue_name.into();
105 Self {
106 max_retries: 5,
107 initial_delay: Duration::from_millis(200),
108 max_delay: Duration::from_secs(10),
109 backoff_multiplier: 1.5,
110 jitter: 0.05,
111 dead_letter_exchange: Some(format!("{}.dlx", queue)),
112 dead_letter_queue: Some(format!("{}.dlq", queue)),
113 ..Default::default()
114 }
115 }
116
117 pub fn slow() -> Self {
119 Self {
120 max_retries: 3,
121 initial_delay: Duration::from_secs(5),
122 max_delay: Duration::from_secs(300), backoff_multiplier: 2.5,
124 jitter: 0.2,
125 dead_letter_exchange: Some("slow.dlx".to_string()),
126 dead_letter_queue: Some("slow.dlq".to_string()),
127 ..Default::default()
128 }
129 }
130
131 pub fn slow_for_queue<S: Into<String>>(queue_name: S) -> Self {
133 let queue = queue_name.into();
134 Self {
135 max_retries: 3,
136 initial_delay: Duration::from_secs(5),
137 max_delay: Duration::from_secs(300),
138 backoff_multiplier: 2.5,
139 jitter: 0.2,
140 dead_letter_exchange: Some(format!("{}.dlx", queue)),
141 dead_letter_queue: Some(format!("{}.dlq", queue)),
142 ..Default::default()
143 }
144 }
145
146 pub fn aggressive() -> Self {
148 Self {
149 max_retries: 10,
150 initial_delay: Duration::from_millis(100),
151 max_delay: Duration::from_secs(120), backoff_multiplier: 2.0,
153 jitter: 0.15,
154 dead_letter_exchange: Some("aggressive.dlx".to_string()),
155 dead_letter_queue: Some("aggressive.dlq".to_string()),
156 ..Default::default()
157 }
158 }
159
160 pub fn conservative() -> Self {
162 Self {
163 max_retries: 2,
164 initial_delay: Duration::from_secs(30),
165 max_delay: Duration::from_secs(600), backoff_multiplier: 2.0,
167 jitter: 0.3,
168 dead_letter_exchange: Some("conservative.dlx".to_string()),
169 dead_letter_queue: Some("conservative.dlq".to_string()),
170 ..Default::default()
171 }
172 }
173
174 pub fn linear(delay: Duration, max_retries: u32) -> Self {
176 Self {
177 max_retries,
178 initial_delay: delay,
179 max_delay: delay, backoff_multiplier: 1.0, jitter: 0.0,
182 dead_letter_exchange: Some("linear.dlx".to_string()),
183 dead_letter_queue: Some("linear.dlq".to_string()),
184 ..Default::default()
185 }
186 }
187
188 pub fn no_retry() -> Self {
190 Self {
191 max_retries: 0,
192 initial_delay: Duration::from_secs(0),
193 max_delay: Duration::from_secs(0),
194 backoff_multiplier: 1.0,
195 jitter: 0.0,
196 dead_letter_exchange: Some("immediate.dlx".to_string()),
197 dead_letter_queue: Some("immediate.dlq".to_string()),
198 ..Default::default()
199 }
200 }
201
202 pub fn minutes_exponential() -> Self {
204 Self {
205 max_retries: 5,
206 initial_delay: Duration::from_secs(60), max_delay: Duration::from_secs(1800), backoff_multiplier: 2.0, jitter: 0.1, retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
211 dead_letter_exchange: Some("minutes.dlx".to_string()),
212 dead_letter_queue: Some("minutes.dlq".to_string()),
213 }
214 }
215
216 pub fn minutes_exponential_for_queue<S: Into<String>>(queue_name: S) -> Self {
218 let queue = queue_name.into();
219 Self {
220 max_retries: 5,
221 initial_delay: Duration::from_secs(60),
222 max_delay: Duration::from_secs(1800),
223 backoff_multiplier: 2.0,
224 jitter: 0.1,
225 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
226 dead_letter_exchange: Some(format!("{}.dlx", queue)),
227 dead_letter_queue: Some(format!("{}.dlq", queue)),
228 }
229 }
230
231 pub fn builder() -> RetryPolicyBuilder {
233 RetryPolicyBuilder::new()
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct RetryPolicyBuilder {
240 max_retries: u32,
241 initial_delay: Duration,
242 max_delay: Duration,
243 backoff_multiplier: f64,
244 jitter: f64,
245 retry_queue_pattern: String,
246 dead_letter_exchange: Option<String>,
247 dead_letter_queue: Option<String>,
248}
249
250impl Default for RetryPolicyBuilder {
251 fn default() -> Self {
252 Self::new()
253 }
254}
255
256impl RetryPolicyBuilder {
257 pub fn new() -> Self {
259 Self {
260 max_retries: 3,
261 initial_delay: Duration::from_secs(1),
262 max_delay: Duration::from_secs(60),
263 backoff_multiplier: 2.0,
264 jitter: 0.1,
265 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
266 dead_letter_exchange: Some("dead-letter".to_string()),
267 dead_letter_queue: Some("dead-letter-queue".to_string()),
268 }
269 }
270
271 pub fn max_retries(mut self, max_retries: u32) -> Self {
273 self.max_retries = max_retries;
274 self
275 }
276
277 pub fn initial_delay(mut self, delay: Duration) -> Self {
279 self.initial_delay = delay;
280 self
281 }
282
283 pub fn max_delay(mut self, delay: Duration) -> Self {
285 self.max_delay = delay;
286 self
287 }
288
289 pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
291 self.backoff_multiplier = multiplier;
292 self
293 }
294
295 pub fn jitter(mut self, jitter: f64) -> Self {
297 self.jitter = jitter.clamp(0.0, 1.0);
298 self
299 }
300
301 pub fn dead_letter_exchange<S: Into<String>>(mut self, exchange: S) -> Self {
303 self.dead_letter_exchange = Some(exchange.into());
304 self
305 }
306
307 pub fn dead_letter_queue<S: Into<String>>(mut self, queue: S) -> Self {
309 self.dead_letter_queue = Some(queue.into());
310 self
311 }
312
313 pub fn no_dead_letter(mut self) -> Self {
315 self.dead_letter_exchange = None;
316 self.dead_letter_queue = None;
317 self
318 }
319
320 pub fn retry_queue_pattern<S: Into<String>>(mut self, pattern: S) -> Self {
322 self.retry_queue_pattern = pattern.into();
323 self
324 }
325
326 pub fn fast_preset(mut self) -> Self {
328 self.max_retries = 5;
329 self.initial_delay = Duration::from_millis(200);
330 self.max_delay = Duration::from_secs(10);
331 self.backoff_multiplier = 1.5;
332 self.jitter = 0.05;
333 self
334 }
335
336 pub fn slow_preset(mut self) -> Self {
338 self.max_retries = 3;
339 self.initial_delay = Duration::from_secs(5);
340 self.max_delay = Duration::from_secs(300);
341 self.backoff_multiplier = 2.5;
342 self.jitter = 0.2;
343 self
344 }
345
346 pub fn linear_preset(mut self, delay: Duration) -> Self {
348 self.initial_delay = delay;
349 self.max_delay = delay;
350 self.backoff_multiplier = 1.0;
351 self.jitter = 0.0;
352 self
353 }
354
355 pub fn build(self) -> RetryPolicy {
357 RetryPolicy {
358 max_retries: self.max_retries,
359 initial_delay: self.initial_delay,
360 max_delay: self.max_delay,
361 backoff_multiplier: self.backoff_multiplier,
362 jitter: self.jitter,
363 retry_queue_pattern: self.retry_queue_pattern,
364 dead_letter_exchange: self.dead_letter_exchange,
365 dead_letter_queue: self.dead_letter_queue,
366 }
367 }
368}
369
370pub struct DelayedMessageExchange {
372 connection_manager: ConnectionManager,
373 exchange_name: String,
374 retry_policy: RetryPolicy,
375}
376
377impl DelayedMessageExchange {
378 pub fn new(
380 connection_manager: ConnectionManager,
381 exchange_name: String,
382 retry_policy: RetryPolicy,
383 ) -> Self {
384 Self {
385 connection_manager,
386 exchange_name,
387 retry_policy,
388 }
389 }
390
391 pub async fn setup(&self) -> Result<()> {
393 let connection = self.connection_manager.get_connection().await?;
394 let channel = connection.create_channel().await?;
395
396 self.declare_delayed_exchange(&channel).await?;
398
399 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
401 self.setup_dead_letter_infrastructure(&channel, dle).await?;
402 }
403
404 info!(
405 "Delayed message exchange setup completed: {}",
406 self.exchange_name
407 );
408 Ok(())
409 }
410
411 async fn declare_delayed_exchange(&self, channel: &Channel) -> Result<()> {
413 let mut arguments = FieldTable::default();
415 arguments.insert(
416 "x-delayed-type".into(),
417 lapin::types::AMQPValue::LongString("direct".into()),
418 );
419
420 let options = ExchangeDeclareOptions {
421 passive: false,
422 durable: true,
423 auto_delete: false,
424 internal: false,
425 nowait: false,
426 };
427
428 channel
429 .exchange_declare(
430 &self.exchange_name,
431 ExchangeKind::Custom("x-delayed-message".to_string()),
432 options,
433 arguments,
434 )
435 .await?;
436
437 debug!("Declared delayed message exchange: {}", self.exchange_name);
438 Ok(())
439 }
440
441 async fn setup_dead_letter_infrastructure(
443 &self,
444 channel: &Channel,
445 dle_name: &str,
446 ) -> Result<()> {
447 let dle_options = ExchangeDeclareOptions {
449 passive: false,
450 durable: true,
451 auto_delete: false,
452 internal: false,
453 nowait: false,
454 };
455
456 channel
457 .exchange_declare(
458 dle_name,
459 ExchangeKind::Direct,
460 dle_options,
461 FieldTable::default(),
462 )
463 .await?;
464
465 if let Some(ref dlq_name) = self.retry_policy.dead_letter_queue {
467 let dlq_options = QueueDeclareOptions {
468 passive: false,
469 durable: true,
470 exclusive: false,
471 auto_delete: false,
472 nowait: false,
473 };
474
475 channel
476 .queue_declare(dlq_name, dlq_options, FieldTable::default())
477 .await?;
478
479 channel
481 .queue_bind(
482 dlq_name,
483 dle_name,
484 "dead-letter",
485 QueueBindOptions::default(),
486 FieldTable::default(),
487 )
488 .await?;
489
490 debug!("Setup dead letter queue: {}", dlq_name);
491 }
492
493 debug!("Setup dead letter exchange: {}", dle_name);
494 Ok(())
495 }
496
497 pub async fn publish_with_retry<T>(
499 &self,
500 original_queue: &str,
501 message: &T,
502 retry_count: u32,
503 original_headers: Option<FieldTable>,
504 ) -> Result<()>
505 where
506 T: Serialize,
507 {
508 if retry_count >= self.retry_policy.max_retries {
509 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
511 return self
512 .send_to_dead_letter(message, dle, original_headers)
513 .await;
514 } else {
515 return Err(RabbitError::RetryExhausted(format!(
516 "Max retries ({}) exceeded for queue: {}",
517 self.retry_policy.max_retries, original_queue
518 )));
519 }
520 }
521
522 let delay = self.retry_policy.calculate_delay(retry_count);
523 let connection = self.connection_manager.get_connection().await?;
524 let channel = connection.create_channel().await?;
525
526 let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
528
529 let mut properties = BasicProperties::default()
531 .with_content_type("application/json".into())
532 .with_delivery_mode(2); let mut headers = original_headers.unwrap_or_default();
536 headers.insert(
537 "x-delay".into(),
538 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
539 );
540 headers.insert(
541 "x-retry-count".into(),
542 lapin::types::AMQPValue::LongInt(retry_count as i32),
543 );
544 headers.insert(
545 "x-original-queue".into(),
546 lapin::types::AMQPValue::LongString(original_queue.into()),
547 );
548
549 properties = properties.with_headers(headers);
550
551 channel
553 .basic_publish(
554 &self.exchange_name,
555 original_queue, BasicPublishOptions::default(),
557 &payload,
558 properties,
559 )
560 .await?;
561
562 info!(
563 "Published retry message for queue: {} (attempt: {}, delay: {:?})",
564 original_queue,
565 retry_count + 1,
566 delay
567 );
568
569 Ok(())
570 }
571
572 async fn send_to_dead_letter<T>(
574 &self,
575 message: &T,
576 dead_letter_exchange: &str,
577 original_headers: Option<FieldTable>,
578 ) -> Result<()>
579 where
580 T: Serialize,
581 {
582 let connection = self.connection_manager.get_connection().await?;
583 let channel = connection.create_channel().await?;
584
585 let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
587
588 let mut properties = BasicProperties::default()
590 .with_content_type("application/json".into())
591 .with_delivery_mode(2); let mut headers = original_headers.unwrap_or_default();
595 headers.insert(
596 "x-death-reason".into(),
597 lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
598 );
599 headers.insert(
600 "x-death-timestamp".into(),
601 lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp()),
602 );
603
604 properties = properties.with_headers(headers);
605
606 channel
608 .basic_publish(
609 dead_letter_exchange,
610 "dead-letter", BasicPublishOptions::default(),
612 &payload,
613 properties,
614 )
615 .await?;
616
617 warn!(
618 "Message sent to dead letter exchange: {}",
619 dead_letter_exchange
620 );
621 Ok(())
622 }
623
624 pub async fn setup_retry_queues(&self, original_queue: &str) -> Result<()> {
626 let connection = self.connection_manager.get_connection().await?;
627 let channel = connection.create_channel().await?;
628
629 for attempt in 1..=self.retry_policy.max_retries {
631 let retry_queue_name = self
632 .retry_policy
633 .get_retry_queue_name(original_queue, attempt);
634
635 let mut arguments = FieldTable::default();
637
638 let delay = self.retry_policy.calculate_delay(attempt - 1);
640 arguments.insert(
641 "x-message-ttl".into(),
642 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
643 );
644
645 if attempt < self.retry_policy.max_retries {
647 arguments.insert(
648 "x-dead-letter-exchange".into(),
649 lapin::types::AMQPValue::LongString("".into()), );
651 arguments.insert(
652 "x-dead-letter-routing-key".into(),
653 lapin::types::AMQPValue::LongString(original_queue.into()),
654 );
655 } else {
656 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
658 arguments.insert(
659 "x-dead-letter-exchange".into(),
660 lapin::types::AMQPValue::LongString(dle.clone().into()),
661 );
662 arguments.insert(
663 "x-dead-letter-routing-key".into(),
664 lapin::types::AMQPValue::LongString("dead-letter".into()),
665 );
666 }
667 }
668
669 let queue_options = QueueDeclareOptions {
671 passive: false,
672 durable: true,
673 exclusive: false,
674 auto_delete: false,
675 nowait: false,
676 };
677
678 channel
679 .queue_declare(&retry_queue_name, queue_options, arguments)
680 .await?;
681
682 debug!(
683 "Setup retry queue: {} for attempt: {}",
684 retry_queue_name, attempt
685 );
686 }
687
688 info!("Retry queues setup completed for: {}", original_queue);
689 Ok(())
690 }
691}
692
693#[derive(Debug, Serialize, Deserialize)]
695pub struct RetryMessage<T> {
696 pub original_message: T,
697 pub retry_count: u32,
698 pub original_queue: String,
699 pub original_headers: Option<serde_json::Value>,
700 pub retry_timestamp: chrono::DateTime<chrono::Utc>,
701}
702
703impl<T> RetryMessage<T> {
704 pub fn new(
705 original_message: T,
706 retry_count: u32,
707 original_queue: String,
708 original_headers: Option<serde_json::Value>,
709 ) -> Self {
710 Self {
711 original_message,
712 retry_count,
713 original_queue,
714 original_headers,
715 retry_timestamp: chrono::Utc::now(),
716 }
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723 use std::time::Duration;
724
725 #[test]
726 fn test_retry_policy_default() {
727 let policy = RetryPolicy::default();
728 assert_eq!(policy.max_retries, 3);
729 assert_eq!(policy.initial_delay, Duration::from_millis(1000));
730 assert_eq!(policy.max_delay, Duration::from_secs(60));
731 assert_eq!(policy.backoff_multiplier, 2.0);
732 assert_eq!(policy.jitter, 0.1);
733 }
734
735 #[test]
736 fn test_retry_policy_calculate_delay() {
737 let policy = RetryPolicy {
738 initial_delay: Duration::from_millis(1000),
739 max_delay: Duration::from_secs(30),
740 backoff_multiplier: 2.0,
741 jitter: 0.0, ..Default::default()
743 };
744
745 let delay1 = policy.calculate_delay(0);
746 assert_eq!(delay1, Duration::from_millis(1000));
747
748 let delay2 = policy.calculate_delay(1);
749 assert_eq!(delay2, Duration::from_millis(2000));
750
751 let delay3 = policy.calculate_delay(2);
752 assert_eq!(delay3, Duration::from_millis(4000));
753
754 let delay_large = policy.calculate_delay(10);
756 assert_eq!(delay_large, Duration::from_secs(30));
757 }
758
759 #[test]
760 fn test_retry_queue_name_generation() {
761 let policy = RetryPolicy::default();
762 let queue_name = policy.get_retry_queue_name("orders", 1);
763 assert_eq!(queue_name, "orders.retry.1");
764
765 let queue_name = policy.get_retry_queue_name("user-events", 3);
766 assert_eq!(queue_name, "user-events.retry.3");
767 }
768
769 #[test]
770 fn test_retry_message_creation() {
771 let original_message = "test message";
772 let retry_msg = RetryMessage::new(original_message, 2, "test-queue".to_string(), None);
773
774 assert_eq!(retry_msg.original_message, "test message");
775 assert_eq!(retry_msg.retry_count, 2);
776 assert_eq!(retry_msg.original_queue, "test-queue");
777 assert!(retry_msg.original_headers.is_none());
778 }
779}