1use crate::{
2 connection::ConnectionManager,
3 error::{RabbitError, Result},
4};
5use lapin::{
6 options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
7 types::FieldTable,
8 BasicProperties, Channel, ExchangeKind,
9};
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetryPolicy {
17 pub max_retries: u32,
19
20 pub initial_delay: Duration,
22
23 pub max_delay: Duration,
25
26 pub backoff_multiplier: f64,
28
29 pub jitter: f64,
31
32 pub retry_queue_pattern: String,
34
35 pub dead_letter_exchange: Option<String>,
37
38 pub dead_letter_queue: Option<String>,
40}
41
42impl Default for RetryPolicy {
43 fn default() -> Self {
44 Self {
45 max_retries: 3,
46 initial_delay: Duration::from_millis(1000),
47 max_delay: Duration::from_secs(60),
48 backoff_multiplier: 2.0,
49 jitter: 0.1,
50 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
51 dead_letter_exchange: Some("dead-letter".to_string()),
52 dead_letter_queue: Some("dead-letter-queue".to_string()),
53 }
54 }
55}
56
57impl RetryPolicy {
58 pub fn calculate_delay(&self, attempt: u32) -> Duration {
60 let base_delay = Duration::from_millis(
61 (self.initial_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
62 as u64,
63 );
64
65 let delay = if base_delay > self.max_delay {
66 self.max_delay
67 } else {
68 base_delay
69 };
70
71 if self.jitter > 0.0 {
73 let jitter_amount = (delay.as_millis() as f64 * self.jitter) as u64;
74 let jitter = fastrand::u64(0..=jitter_amount);
75 Duration::from_millis(delay.as_millis() as u64 + jitter)
76 } else {
77 delay
78 }
79 }
80
81 pub fn get_retry_queue_name(&self, original_queue: &str, attempt: u32) -> String {
83 self.retry_queue_pattern
84 .replace("{queue_name}", original_queue)
85 .replace("{attempt}", &attempt.to_string())
86 }
87
88 pub fn fast() -> Self {
90 Self {
91 max_retries: 5,
92 initial_delay: Duration::from_millis(200),
93 max_delay: Duration::from_secs(10),
94 backoff_multiplier: 1.5,
95 jitter: 0.05,
96 dead_letter_exchange: Some("fast.dlx".to_string()),
97 dead_letter_queue: Some("fast.dlq".to_string()),
98 ..Default::default()
99 }
100 }
101
102 pub fn fast_for_queue<S: Into<String>>(queue_name: S) -> Self {
104 let queue = queue_name.into();
105 Self {
106 max_retries: 5,
107 initial_delay: Duration::from_millis(200),
108 max_delay: Duration::from_secs(10),
109 backoff_multiplier: 1.5,
110 jitter: 0.05,
111 dead_letter_exchange: Some(format!("{}.dlx", queue)),
112 dead_letter_queue: Some(format!("{}.dlq", queue)),
113 ..Default::default()
114 }
115 }
116
117 pub fn slow() -> Self {
119 Self {
120 max_retries: 3,
121 initial_delay: Duration::from_secs(5),
122 max_delay: Duration::from_secs(300), backoff_multiplier: 2.5,
124 jitter: 0.2,
125 dead_letter_exchange: Some("slow.dlx".to_string()),
126 dead_letter_queue: Some("slow.dlq".to_string()),
127 ..Default::default()
128 }
129 }
130
131 pub fn slow_for_queue<S: Into<String>>(queue_name: S) -> Self {
133 let queue = queue_name.into();
134 Self {
135 max_retries: 3,
136 initial_delay: Duration::from_secs(5),
137 max_delay: Duration::from_secs(300),
138 backoff_multiplier: 2.5,
139 jitter: 0.2,
140 dead_letter_exchange: Some(format!("{}.dlx", queue)),
141 dead_letter_queue: Some(format!("{}.dlq", queue)),
142 ..Default::default()
143 }
144 }
145
146 pub fn aggressive() -> Self {
148 Self {
149 max_retries: 10,
150 initial_delay: Duration::from_millis(100),
151 max_delay: Duration::from_secs(120), backoff_multiplier: 2.0,
153 jitter: 0.15,
154 dead_letter_exchange: Some("aggressive.dlx".to_string()),
155 dead_letter_queue: Some("aggressive.dlq".to_string()),
156 ..Default::default()
157 }
158 }
159
160 pub fn conservative() -> Self {
162 Self {
163 max_retries: 2,
164 initial_delay: Duration::from_secs(30),
165 max_delay: Duration::from_secs(600), backoff_multiplier: 2.0,
167 jitter: 0.3,
168 dead_letter_exchange: Some("conservative.dlx".to_string()),
169 dead_letter_queue: Some("conservative.dlq".to_string()),
170 ..Default::default()
171 }
172 }
173
174 pub fn linear(delay: Duration, max_retries: u32) -> Self {
176 Self {
177 max_retries,
178 initial_delay: delay,
179 max_delay: delay, backoff_multiplier: 1.0, jitter: 0.0,
182 dead_letter_exchange: Some("linear.dlx".to_string()),
183 dead_letter_queue: Some("linear.dlq".to_string()),
184 ..Default::default()
185 }
186 }
187
188 pub fn no_retry() -> Self {
190 Self {
191 max_retries: 0,
192 initial_delay: Duration::from_secs(0),
193 max_delay: Duration::from_secs(0),
194 backoff_multiplier: 1.0,
195 jitter: 0.0,
196 dead_letter_exchange: Some("immediate.dlx".to_string()),
197 dead_letter_queue: Some("immediate.dlq".to_string()),
198 ..Default::default()
199 }
200 }
201
202 pub fn minutes_exponential() -> Self {
204 Self {
205 max_retries: 5,
206 initial_delay: Duration::from_secs(60), max_delay: Duration::from_secs(1800), backoff_multiplier: 2.0, jitter: 0.1, retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
211 dead_letter_exchange: Some("minutes.dlx".to_string()),
212 dead_letter_queue: Some("minutes.dlq".to_string()),
213 }
214 }
215
216 pub fn minutes_exponential_for_queue<S: Into<String>>(queue_name: S) -> Self {
218 let queue = queue_name.into();
219 Self {
220 max_retries: 5,
221 initial_delay: Duration::from_secs(60),
222 max_delay: Duration::from_secs(1800),
223 backoff_multiplier: 2.0,
224 jitter: 0.1,
225 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
226 dead_letter_exchange: Some(format!("{}.dlx", queue)),
227 dead_letter_queue: Some(format!("{}.dlq", queue)),
228 }
229 }
230
231 pub fn builder() -> RetryPolicyBuilder {
233 RetryPolicyBuilder::new()
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct RetryPolicyBuilder {
240 max_retries: u32,
241 initial_delay: Duration,
242 max_delay: Duration,
243 backoff_multiplier: f64,
244 jitter: f64,
245 retry_queue_pattern: String,
246 dead_letter_exchange: Option<String>,
247 dead_letter_queue: Option<String>,
248}
249
250impl Default for RetryPolicyBuilder {
251 fn default() -> Self {
252 Self::new()
253 }
254}
255
256impl RetryPolicyBuilder {
257 pub fn new() -> Self {
259 Self {
260 max_retries: 3,
261 initial_delay: Duration::from_secs(1),
262 max_delay: Duration::from_secs(60),
263 backoff_multiplier: 2.0,
264 jitter: 0.1,
265 retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
266 dead_letter_exchange: Some("dead-letter".to_string()),
267 dead_letter_queue: Some("dead-letter-queue".to_string()),
268 }
269 }
270
271 pub fn max_retries(mut self, max_retries: u32) -> Self {
273 self.max_retries = max_retries;
274 self
275 }
276
277 pub fn initial_delay(mut self, delay: Duration) -> Self {
279 self.initial_delay = delay;
280 self
281 }
282
283 pub fn max_delay(mut self, delay: Duration) -> Self {
285 self.max_delay = delay;
286 self
287 }
288
289 pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
291 self.backoff_multiplier = multiplier;
292 self
293 }
294
295 pub fn jitter(mut self, jitter: f64) -> Self {
297 self.jitter = jitter.clamp(0.0, 1.0);
298 self
299 }
300
301 pub fn dead_letter_exchange<S: Into<String>>(mut self, exchange: S) -> Self {
303 self.dead_letter_exchange = Some(exchange.into());
304 self
305 }
306
307 pub fn dead_letter_queue<S: Into<String>>(mut self, queue: S) -> Self {
309 self.dead_letter_queue = Some(queue.into());
310 self
311 }
312
313 pub fn no_dead_letter(mut self) -> Self {
315 self.dead_letter_exchange = None;
316 self.dead_letter_queue = None;
317 self
318 }
319
320 pub fn retry_queue_pattern<S: Into<String>>(mut self, pattern: S) -> Self {
322 self.retry_queue_pattern = pattern.into();
323 self
324 }
325
326 pub fn fast_preset(mut self) -> Self {
328 self.max_retries = 5;
329 self.initial_delay = Duration::from_millis(200);
330 self.max_delay = Duration::from_secs(10);
331 self.backoff_multiplier = 1.5;
332 self.jitter = 0.05;
333 self
334 }
335
336 pub fn slow_preset(mut self) -> Self {
338 self.max_retries = 3;
339 self.initial_delay = Duration::from_secs(5);
340 self.max_delay = Duration::from_secs(300);
341 self.backoff_multiplier = 2.5;
342 self.jitter = 0.2;
343 self
344 }
345
346 pub fn linear_preset(mut self, delay: Duration) -> Self {
348 self.initial_delay = delay;
349 self.max_delay = delay;
350 self.backoff_multiplier = 1.0;
351 self.jitter = 0.0;
352 self
353 }
354
355 pub fn build(self) -> RetryPolicy {
357 RetryPolicy {
358 max_retries: self.max_retries,
359 initial_delay: self.initial_delay,
360 max_delay: self.max_delay,
361 backoff_multiplier: self.backoff_multiplier,
362 jitter: self.jitter,
363 retry_queue_pattern: self.retry_queue_pattern,
364 dead_letter_exchange: self.dead_letter_exchange,
365 dead_letter_queue: self.dead_letter_queue,
366 }
367 }
368}
369
370pub struct DelayedMessageExchange {
372 connection_manager: ConnectionManager,
373 exchange_name: String,
374 retry_policy: RetryPolicy,
375}
376
377impl DelayedMessageExchange {
378 pub fn new(
380 connection_manager: ConnectionManager,
381 exchange_name: String,
382 retry_policy: RetryPolicy,
383 ) -> Self {
384 Self {
385 connection_manager,
386 exchange_name,
387 retry_policy,
388 }
389 }
390
391 pub async fn setup(&self) -> Result<()> {
393 let connection = self.connection_manager.get_connection().await?;
394 let channel = connection.create_channel().await?;
395
396 self.declare_delayed_exchange(&channel).await?;
398
399 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
401 self.setup_dead_letter_infrastructure(&channel, dle).await?;
402 }
403
404 info!(
405 "Delayed message exchange setup completed: {}",
406 self.exchange_name
407 );
408 Ok(())
409 }
410
411 pub async fn setup_queue_retry(&self, original_queue: &str) -> Result<()> {
413 let connection = self.connection_manager.get_connection().await?;
414 let channel = connection.create_channel().await?;
415
416 let queue_options = QueueDeclareOptions {
418 passive: false,
419 durable: true,
420 exclusive: false,
421 auto_delete: false,
422 nowait: false,
423 };
424
425 channel
426 .queue_declare(original_queue, queue_options, FieldTable::default())
427 .await?;
428
429 channel
432 .queue_bind(
433 original_queue,
434 &self.exchange_name,
435 original_queue, QueueBindOptions::default(),
437 FieldTable::default(),
438 )
439 .await?;
440
441 info!(
442 "Setup retry binding for queue '{}' to delayed exchange '{}'",
443 original_queue, self.exchange_name
444 );
445 Ok(())
446 }
447
448 async fn declare_delayed_exchange(&self, channel: &Channel) -> Result<()> {
450 let mut arguments = FieldTable::default();
452 arguments.insert(
453 "x-delayed-type".into(),
454 lapin::types::AMQPValue::LongString("direct".into()),
455 );
456
457 let options = ExchangeDeclareOptions {
458 passive: false,
459 durable: true,
460 auto_delete: false,
461 internal: false,
462 nowait: false,
463 };
464
465 channel
466 .exchange_declare(
467 &self.exchange_name,
468 ExchangeKind::Custom("x-delayed-message".to_string()),
469 options,
470 arguments,
471 )
472 .await?;
473
474 debug!("Declared delayed message exchange: {}", self.exchange_name);
475 Ok(())
476 }
477
478 async fn setup_dead_letter_infrastructure(
480 &self,
481 channel: &Channel,
482 dle_name: &str,
483 ) -> Result<()> {
484 let dle_options = ExchangeDeclareOptions {
486 passive: false,
487 durable: true,
488 auto_delete: false,
489 internal: false,
490 nowait: false,
491 };
492
493 channel
494 .exchange_declare(
495 dle_name,
496 ExchangeKind::Direct,
497 dle_options,
498 FieldTable::default(),
499 )
500 .await?;
501
502 if let Some(ref dlq_name) = self.retry_policy.dead_letter_queue {
504 let dlq_options = QueueDeclareOptions {
505 passive: false,
506 durable: true,
507 exclusive: false,
508 auto_delete: false,
509 nowait: false,
510 };
511
512 channel
513 .queue_declare(dlq_name, dlq_options, FieldTable::default())
514 .await?;
515
516 channel
518 .queue_bind(
519 dlq_name,
520 dle_name,
521 "dead-letter",
522 QueueBindOptions::default(),
523 FieldTable::default(),
524 )
525 .await?;
526
527 debug!("Setup dead letter queue: {}", dlq_name);
528 }
529
530 debug!("Setup dead letter exchange: {}", dle_name);
531 Ok(())
532 }
533
534 pub async fn publish_with_retry<T>(
536 &self,
537 original_queue: &str,
538 message: &T,
539 retry_count: u32,
540 original_headers: Option<FieldTable>,
541 ) -> Result<()>
542 where
543 T: Serialize,
544 {
545 if retry_count >= self.retry_policy.max_retries {
546 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
548 return self
549 .send_to_dead_letter(message, dle, original_headers)
550 .await;
551 } else {
552 return Err(RabbitError::RetryExhausted(format!(
553 "Max retries ({}) exceeded for queue: {}",
554 self.retry_policy.max_retries, original_queue
555 )));
556 }
557 }
558
559 self.setup_queue_retry(original_queue).await?;
561
562 let delay = self.retry_policy.calculate_delay(retry_count);
563 let connection = self.connection_manager.get_connection().await?;
564 let channel = connection.create_channel().await?;
565
566 let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
568
569 let mut properties = BasicProperties::default()
571 .with_content_type("application/json".into())
572 .with_delivery_mode(2); let mut headers = original_headers.unwrap_or_default();
576 headers.insert(
577 "x-delay".into(),
578 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
579 );
580 headers.insert(
581 "x-retry-count".into(),
582 lapin::types::AMQPValue::LongInt(retry_count as i32),
583 );
584 headers.insert(
585 "x-original-queue".into(),
586 lapin::types::AMQPValue::LongString(original_queue.into()),
587 );
588
589 properties = properties.with_headers(headers);
590
591 channel
593 .basic_publish(
594 &self.exchange_name,
595 original_queue, BasicPublishOptions::default(),
597 &payload,
598 properties,
599 )
600 .await?;
601
602 info!(
603 "Published retry message for queue: {} (attempt: {}, delay: {:?})",
604 original_queue,
605 retry_count + 1,
606 delay
607 );
608
609 Ok(())
610 }
611
612 async fn send_to_dead_letter<T>(
614 &self,
615 message: &T,
616 dead_letter_exchange: &str,
617 original_headers: Option<FieldTable>,
618 ) -> Result<()>
619 where
620 T: Serialize,
621 {
622 let connection = self.connection_manager.get_connection().await?;
623 let channel = connection.create_channel().await?;
624
625 let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
627
628 let mut properties = BasicProperties::default()
630 .with_content_type("application/json".into())
631 .with_delivery_mode(2); let mut headers = original_headers.unwrap_or_default();
635 headers.insert(
636 "x-death-reason".into(),
637 lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
638 );
639 headers.insert(
640 "x-death-timestamp".into(),
641 lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp()),
642 );
643
644 properties = properties.with_headers(headers);
645
646 channel
648 .basic_publish(
649 dead_letter_exchange,
650 "dead-letter", BasicPublishOptions::default(),
652 &payload,
653 properties,
654 )
655 .await?;
656
657 warn!(
658 "Message sent to dead letter exchange: {}",
659 dead_letter_exchange
660 );
661 Ok(())
662 }
663
664 pub async fn setup_retry_queues(&self, original_queue: &str) -> Result<()> {
666 let connection = self.connection_manager.get_connection().await?;
667 let channel = connection.create_channel().await?;
668
669 for attempt in 1..=self.retry_policy.max_retries {
671 let retry_queue_name = self
672 .retry_policy
673 .get_retry_queue_name(original_queue, attempt);
674
675 let mut arguments = FieldTable::default();
677
678 let delay = self.retry_policy.calculate_delay(attempt - 1);
680 arguments.insert(
681 "x-message-ttl".into(),
682 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
683 );
684
685 if attempt < self.retry_policy.max_retries {
687 arguments.insert(
688 "x-dead-letter-exchange".into(),
689 lapin::types::AMQPValue::LongString("".into()), );
691 arguments.insert(
692 "x-dead-letter-routing-key".into(),
693 lapin::types::AMQPValue::LongString(original_queue.into()),
694 );
695 } else {
696 if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
698 arguments.insert(
699 "x-dead-letter-exchange".into(),
700 lapin::types::AMQPValue::LongString(dle.clone().into()),
701 );
702 arguments.insert(
703 "x-dead-letter-routing-key".into(),
704 lapin::types::AMQPValue::LongString("dead-letter".into()),
705 );
706 }
707 }
708
709 let queue_options = QueueDeclareOptions {
711 passive: false,
712 durable: true,
713 exclusive: false,
714 auto_delete: false,
715 nowait: false,
716 };
717
718 channel
719 .queue_declare(&retry_queue_name, queue_options, arguments)
720 .await?;
721
722 debug!(
723 "Setup retry queue: {} for attempt: {}",
724 retry_queue_name, attempt
725 );
726 }
727
728 info!("Retry queues setup completed for: {}", original_queue);
729 Ok(())
730 }
731}
732
733#[derive(Debug, Serialize, Deserialize)]
735pub struct RetryMessage<T> {
736 pub original_message: T,
737 pub retry_count: u32,
738 pub original_queue: String,
739 pub original_headers: Option<serde_json::Value>,
740 pub retry_timestamp: chrono::DateTime<chrono::Utc>,
741}
742
743impl<T> RetryMessage<T> {
744 pub fn new(
745 original_message: T,
746 retry_count: u32,
747 original_queue: String,
748 original_headers: Option<serde_json::Value>,
749 ) -> Self {
750 Self {
751 original_message,
752 retry_count,
753 original_queue,
754 original_headers,
755 retry_timestamp: chrono::Utc::now(),
756 }
757 }
758}
759
760#[cfg(test)]
761mod delay_exchange_binding_tests {
762 use super::*;
763 use crate::{
764 config::RabbitConfig,
765 connection::ConnectionManager,
766 retry::{DelayedMessageExchange, RetryPolicy},
767 };
768 use std::time::Duration;
769
770 #[tokio::test]
771 async fn test_delay_exchange_setup_queue_retry_method_exists() {
772 let config = RabbitConfig::builder()
774 .connection_string("amqp://test:test@localhost:5672/")
775 .build();
776
777 let connection_manager = match ConnectionManager::new(config).await {
779 Ok(cm) => cm,
780 Err(_) => return, };
782
783 let retry_policy = RetryPolicy::default();
784 let delayed_exchange =
785 DelayedMessageExchange::new(connection_manager, "test.retry".to_string(), retry_policy);
786
787 let _result = delayed_exchange.setup_queue_retry("test_queue").await;
790
791 assert!(true, "setup_queue_retry method exists and is callable");
793 }
794
795 #[test]
796 fn test_delay_exchange_api_structure() {
797 use std::any::type_name;
799
800 let type_name = type_name::<DelayedMessageExchange>();
802 assert!(type_name.contains("DelayedMessageExchange"));
803
804 let _config = RabbitConfig::builder().connection_string("test").build();
806
807 let _policy = RetryPolicy::builder()
808 .max_retries(3)
809 .initial_delay(Duration::from_millis(100))
810 .build();
811
812 assert!(true);
814 }
815}
816
817#[cfg(test)]
818mod tests {
819 use super::*;
820 use std::time::Duration;
821
822 #[test]
823 fn test_retry_policy_default() {
824 let policy = RetryPolicy::default();
825 assert_eq!(policy.max_retries, 3);
826 assert_eq!(policy.initial_delay, Duration::from_millis(1000));
827 assert_eq!(policy.max_delay, Duration::from_secs(60));
828 assert_eq!(policy.backoff_multiplier, 2.0);
829 assert_eq!(policy.jitter, 0.1);
830 }
831
832 #[test]
833 fn test_retry_policy_calculate_delay() {
834 let policy = RetryPolicy {
835 initial_delay: Duration::from_millis(1000),
836 max_delay: Duration::from_secs(30),
837 backoff_multiplier: 2.0,
838 jitter: 0.0, ..Default::default()
840 };
841
842 let delay1 = policy.calculate_delay(0);
843 assert_eq!(delay1, Duration::from_millis(1000));
844
845 let delay2 = policy.calculate_delay(1);
846 assert_eq!(delay2, Duration::from_millis(2000));
847
848 let delay3 = policy.calculate_delay(2);
849 assert_eq!(delay3, Duration::from_millis(4000));
850
851 let delay_large = policy.calculate_delay(10);
853 assert_eq!(delay_large, Duration::from_secs(30));
854 }
855
856 #[test]
857 fn test_retry_queue_name_generation() {
858 let policy = RetryPolicy::default();
859 let queue_name = policy.get_retry_queue_name("orders", 1);
860 assert_eq!(queue_name, "orders.retry.1");
861
862 let queue_name = policy.get_retry_queue_name("user-events", 3);
863 assert_eq!(queue_name, "user-events.retry.3");
864 }
865
866 #[test]
867 fn test_retry_message_creation() {
868 let original_message = "test message";
869 let retry_msg = RetryMessage::new(original_message, 2, "test-queue".to_string(), None);
870
871 assert_eq!(retry_msg.original_message, "test message");
872 assert_eq!(retry_msg.retry_count, 2);
873 assert_eq!(retry_msg.original_queue, "test-queue");
874 assert!(retry_msg.original_headers.is_none());
875 }
876}