rust_rabbit/
retry.rs

1use crate::{
2    connection::ConnectionManager,
3    error::{RabbitError, Result},
4};
5use lapin::{
6    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
7    types::FieldTable,
8    BasicProperties, Channel, ExchangeKind,
9};
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14/// Retry policy configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetryPolicy {
17    /// Maximum number of retry attempts
18    pub max_retries: u32,
19
20    /// Initial delay between retries
21    pub initial_delay: Duration,
22
23    /// Maximum delay between retries
24    pub max_delay: Duration,
25
26    /// Multiplier for exponential backoff
27    pub backoff_multiplier: f64,
28
29    /// Jitter factor (0.0 to 1.0) to add randomness to delays
30    pub jitter: f64,
31
32    /// Retry queue naming pattern
33    pub retry_queue_pattern: String,
34
35    /// Dead letter exchange for failed messages
36    pub dead_letter_exchange: Option<String>,
37
38    /// Dead letter queue for failed messages
39    pub dead_letter_queue: Option<String>,
40}
41
42impl Default for RetryPolicy {
43    fn default() -> Self {
44        Self {
45            max_retries: 3,
46            initial_delay: Duration::from_millis(1000),
47            max_delay: Duration::from_secs(60),
48            backoff_multiplier: 2.0,
49            jitter: 0.1,
50            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
51            dead_letter_exchange: Some("dead-letter".to_string()),
52            dead_letter_queue: Some("dead-letter-queue".to_string()),
53        }
54    }
55}
56
57impl RetryPolicy {
58    /// Calculate delay for a specific retry attempt
59    pub fn calculate_delay(&self, attempt: u32) -> Duration {
60        let base_delay = Duration::from_millis(
61            (self.initial_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
62                as u64,
63        );
64
65        let delay = if base_delay > self.max_delay {
66            self.max_delay
67        } else {
68            base_delay
69        };
70
71        // Add jitter to prevent thundering herd
72        if self.jitter > 0.0 {
73            let jitter_amount = (delay.as_millis() as f64 * self.jitter) as u64;
74            let jitter = fastrand::u64(0..=jitter_amount);
75            Duration::from_millis(delay.as_millis() as u64 + jitter)
76        } else {
77            delay
78        }
79    }
80
81    /// Generate retry queue name for a specific attempt
82    pub fn get_retry_queue_name(&self, original_queue: &str, attempt: u32) -> String {
83        self.retry_queue_pattern
84            .replace("{queue_name}", original_queue)
85            .replace("{attempt}", &attempt.to_string())
86    }
87
88    /// Create a fast retry policy (quick retries for transient errors)
89    pub fn fast() -> Self {
90        Self {
91            max_retries: 5,
92            initial_delay: Duration::from_millis(200),
93            max_delay: Duration::from_secs(10),
94            backoff_multiplier: 1.5,
95            jitter: 0.05,
96            dead_letter_exchange: Some("fast.dlx".to_string()),
97            dead_letter_queue: Some("fast.dlq".to_string()),
98            ..Default::default()
99        }
100    }
101
102    /// Create a fast retry policy with custom dead letter names based on queue
103    pub fn fast_for_queue<S: Into<String>>(queue_name: S) -> Self {
104        let queue = queue_name.into();
105        Self {
106            max_retries: 5,
107            initial_delay: Duration::from_millis(200),
108            max_delay: Duration::from_secs(10),
109            backoff_multiplier: 1.5,
110            jitter: 0.05,
111            dead_letter_exchange: Some(format!("{}.dlx", queue)),
112            dead_letter_queue: Some(format!("{}.dlq", queue)),
113            ..Default::default()
114        }
115    }
116
117    /// Create a slow retry policy (for operations that need more time)
118    pub fn slow() -> Self {
119        Self {
120            max_retries: 3,
121            initial_delay: Duration::from_secs(5),
122            max_delay: Duration::from_secs(300), // 5 minutes
123            backoff_multiplier: 2.5,
124            jitter: 0.2,
125            dead_letter_exchange: Some("slow.dlx".to_string()),
126            dead_letter_queue: Some("slow.dlq".to_string()),
127            ..Default::default()
128        }
129    }
130
131    /// Create a slow retry policy with custom dead letter names based on queue
132    pub fn slow_for_queue<S: Into<String>>(queue_name: S) -> Self {
133        let queue = queue_name.into();
134        Self {
135            max_retries: 3,
136            initial_delay: Duration::from_secs(5),
137            max_delay: Duration::from_secs(300),
138            backoff_multiplier: 2.5,
139            jitter: 0.2,
140            dead_letter_exchange: Some(format!("{}.dlx", queue)),
141            dead_letter_queue: Some(format!("{}.dlq", queue)),
142            ..Default::default()
143        }
144    }
145
146    /// Create an aggressive retry policy (many attempts with exponential backoff)
147    pub fn aggressive() -> Self {
148        Self {
149            max_retries: 10,
150            initial_delay: Duration::from_millis(100),
151            max_delay: Duration::from_secs(120), // 2 minutes
152            backoff_multiplier: 2.0,
153            jitter: 0.15,
154            dead_letter_exchange: Some("aggressive.dlx".to_string()),
155            dead_letter_queue: Some("aggressive.dlq".to_string()),
156            ..Default::default()
157        }
158    }
159
160    /// Create a conservative retry policy (few attempts, larger delays)
161    pub fn conservative() -> Self {
162        Self {
163            max_retries: 2,
164            initial_delay: Duration::from_secs(30),
165            max_delay: Duration::from_secs(600), // 10 minutes
166            backoff_multiplier: 2.0,
167            jitter: 0.3,
168            dead_letter_exchange: Some("conservative.dlx".to_string()),
169            dead_letter_queue: Some("conservative.dlq".to_string()),
170            ..Default::default()
171        }
172    }
173
174    /// Create a linear retry policy (fixed delay between retries)
175    pub fn linear(delay: Duration, max_retries: u32) -> Self {
176        Self {
177            max_retries,
178            initial_delay: delay,
179            max_delay: delay,        // Same as initial = linear
180            backoff_multiplier: 1.0, // No exponential growth
181            jitter: 0.0,
182            dead_letter_exchange: Some("linear.dlx".to_string()),
183            dead_letter_queue: Some("linear.dlq".to_string()),
184            ..Default::default()
185        }
186    }
187
188    /// Create a no-retry policy (fail immediately, no retries)
189    pub fn no_retry() -> Self {
190        Self {
191            max_retries: 0,
192            initial_delay: Duration::from_secs(0),
193            max_delay: Duration::from_secs(0),
194            backoff_multiplier: 1.0,
195            jitter: 0.0,
196            dead_letter_exchange: Some("immediate.dlx".to_string()),
197            dead_letter_queue: Some("immediate.dlq".to_string()),
198            ..Default::default()
199        }
200    }
201
202    /// Create a minutes-based exponential retry policy (1min, 2min, 4min, 8min, 16min)
203    pub fn minutes_exponential() -> Self {
204        Self {
205            max_retries: 5,
206            initial_delay: Duration::from_secs(60), // 1 minute
207            max_delay: Duration::from_secs(1800),   // 30 minutes cap
208            backoff_multiplier: 2.0,                // Double each time
209            jitter: 0.1,                            // 10% jitter
210            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
211            dead_letter_exchange: Some("minutes.dlx".to_string()),
212            dead_letter_queue: Some("minutes.dlq".to_string()),
213        }
214    }
215
216    /// Create a minutes-based exponential retry policy with custom dead letter names
217    pub fn minutes_exponential_for_queue<S: Into<String>>(queue_name: S) -> Self {
218        let queue = queue_name.into();
219        Self {
220            max_retries: 5,
221            initial_delay: Duration::from_secs(60),
222            max_delay: Duration::from_secs(1800),
223            backoff_multiplier: 2.0,
224            jitter: 0.1,
225            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
226            dead_letter_exchange: Some(format!("{}.dlx", queue)),
227            dead_letter_queue: Some(format!("{}.dlq", queue)),
228        }
229    }
230
231    /// Create a custom retry policy with builder pattern
232    pub fn builder() -> RetryPolicyBuilder {
233        RetryPolicyBuilder::new()
234    }
235}
236
237/// Builder for RetryPolicy
238#[derive(Debug, Clone)]
239pub struct RetryPolicyBuilder {
240    max_retries: u32,
241    initial_delay: Duration,
242    max_delay: Duration,
243    backoff_multiplier: f64,
244    jitter: f64,
245    retry_queue_pattern: String,
246    dead_letter_exchange: Option<String>,
247    dead_letter_queue: Option<String>,
248}
249
250impl Default for RetryPolicyBuilder {
251    fn default() -> Self {
252        Self::new()
253    }
254}
255
256impl RetryPolicyBuilder {
257    /// Create a new builder with default values
258    pub fn new() -> Self {
259        Self {
260            max_retries: 3,
261            initial_delay: Duration::from_secs(1),
262            max_delay: Duration::from_secs(60),
263            backoff_multiplier: 2.0,
264            jitter: 0.1,
265            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
266            dead_letter_exchange: Some("dead-letter".to_string()),
267            dead_letter_queue: Some("dead-letter-queue".to_string()),
268        }
269    }
270
271    /// Set maximum number of retry attempts
272    pub fn max_retries(mut self, max_retries: u32) -> Self {
273        self.max_retries = max_retries;
274        self
275    }
276
277    /// Set initial delay between retries
278    pub fn initial_delay(mut self, delay: Duration) -> Self {
279        self.initial_delay = delay;
280        self
281    }
282
283    /// Set maximum delay between retries
284    pub fn max_delay(mut self, delay: Duration) -> Self {
285        self.max_delay = delay;
286        self
287    }
288
289    /// Set backoff multiplier for exponential backoff
290    pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
291        self.backoff_multiplier = multiplier;
292        self
293    }
294
295    /// Set jitter factor (0.0 to 1.0)
296    pub fn jitter(mut self, jitter: f64) -> Self {
297        self.jitter = jitter.clamp(0.0, 1.0);
298        self
299    }
300
301    /// Set dead letter exchange name
302    pub fn dead_letter_exchange<S: Into<String>>(mut self, exchange: S) -> Self {
303        self.dead_letter_exchange = Some(exchange.into());
304        self
305    }
306
307    /// Set dead letter queue name
308    pub fn dead_letter_queue<S: Into<String>>(mut self, queue: S) -> Self {
309        self.dead_letter_queue = Some(queue.into());
310        self
311    }
312
313    /// Disable dead letter exchange (messages will be discarded after max retries)
314    pub fn no_dead_letter(mut self) -> Self {
315        self.dead_letter_exchange = None;
316        self.dead_letter_queue = None;
317        self
318    }
319
320    /// Set retry queue naming pattern
321    pub fn retry_queue_pattern<S: Into<String>>(mut self, pattern: S) -> Self {
322        self.retry_queue_pattern = pattern.into();
323        self
324    }
325
326    /// Configure for fast retries (preset)
327    pub fn fast_preset(mut self) -> Self {
328        self.max_retries = 5;
329        self.initial_delay = Duration::from_millis(200);
330        self.max_delay = Duration::from_secs(10);
331        self.backoff_multiplier = 1.5;
332        self.jitter = 0.05;
333        self
334    }
335
336    /// Configure for slow retries (preset)
337    pub fn slow_preset(mut self) -> Self {
338        self.max_retries = 3;
339        self.initial_delay = Duration::from_secs(5);
340        self.max_delay = Duration::from_secs(300);
341        self.backoff_multiplier = 2.5;
342        self.jitter = 0.2;
343        self
344    }
345
346    /// Configure for linear retries (preset)
347    pub fn linear_preset(mut self, delay: Duration) -> Self {
348        self.initial_delay = delay;
349        self.max_delay = delay;
350        self.backoff_multiplier = 1.0;
351        self.jitter = 0.0;
352        self
353    }
354
355    /// Build the final RetryPolicy
356    pub fn build(self) -> RetryPolicy {
357        RetryPolicy {
358            max_retries: self.max_retries,
359            initial_delay: self.initial_delay,
360            max_delay: self.max_delay,
361            backoff_multiplier: self.backoff_multiplier,
362            jitter: self.jitter,
363            retry_queue_pattern: self.retry_queue_pattern,
364            dead_letter_exchange: self.dead_letter_exchange,
365            dead_letter_queue: self.dead_letter_queue,
366        }
367    }
368}
369
370/// Delayed Message Exchange handler for implementing retry mechanism
371pub struct DelayedMessageExchange {
372    connection_manager: ConnectionManager,
373    exchange_name: String,
374    retry_policy: RetryPolicy,
375}
376
377impl DelayedMessageExchange {
378    /// Create a new DelayedMessageExchange
379    pub fn new(
380        connection_manager: ConnectionManager,
381        exchange_name: String,
382        retry_policy: RetryPolicy,
383    ) -> Self {
384        Self {
385            connection_manager,
386            exchange_name,
387            retry_policy,
388        }
389    }
390
391    /// Setup the delayed message exchange and retry infrastructure
392    pub async fn setup(&self) -> Result<()> {
393        let connection = self.connection_manager.get_connection().await?;
394        let channel = connection.create_channel().await?;
395
396        // Declare the delayed message exchange
397        self.declare_delayed_exchange(&channel).await?;
398
399        // Setup dead letter exchange and queue if configured
400        if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
401            self.setup_dead_letter_infrastructure(&channel, dle).await?;
402        }
403
404        info!(
405            "Delayed message exchange setup completed: {}",
406            self.exchange_name
407        );
408        Ok(())
409    }
410
411    /// Setup retry mechanism for a specific queue by binding it to the delay exchange
412    pub async fn setup_queue_retry(&self, original_queue: &str) -> Result<()> {
413        let connection = self.connection_manager.get_connection().await?;
414        let channel = connection.create_channel().await?;
415
416        // Ensure the original queue exists
417        let queue_options = QueueDeclareOptions {
418            passive: false,
419            durable: true,
420            exclusive: false,
421            auto_delete: false,
422            nowait: false,
423        };
424
425        channel
426            .queue_declare(original_queue, queue_options, FieldTable::default())
427            .await?;
428
429        // Bind the original queue to the delayed exchange
430        // This allows delayed messages to be routed back to the original queue
431        channel
432            .queue_bind(
433                original_queue,
434                &self.exchange_name,
435                original_queue, // routing key = queue name
436                QueueBindOptions::default(),
437                FieldTable::default(),
438            )
439            .await?;
440
441        info!(
442            "Setup retry binding for queue '{}' to delayed exchange '{}'",
443            original_queue, self.exchange_name
444        );
445        Ok(())
446    }
447
448    /// Declare the delayed message exchange
449    async fn declare_delayed_exchange(&self, channel: &Channel) -> Result<()> {
450        // Arguments for delayed message exchange
451        let mut arguments = FieldTable::default();
452        arguments.insert(
453            "x-delayed-type".into(),
454            lapin::types::AMQPValue::LongString("direct".into()),
455        );
456
457        let options = ExchangeDeclareOptions {
458            passive: false,
459            durable: true,
460            auto_delete: false,
461            internal: false,
462            nowait: false,
463        };
464
465        channel
466            .exchange_declare(
467                &self.exchange_name,
468                ExchangeKind::Custom("x-delayed-message".to_string()),
469                options,
470                arguments,
471            )
472            .await?;
473
474        debug!("Declared delayed message exchange: {}", self.exchange_name);
475        Ok(())
476    }
477
478    /// Setup dead letter exchange and queue
479    async fn setup_dead_letter_infrastructure(
480        &self,
481        channel: &Channel,
482        dle_name: &str,
483    ) -> Result<()> {
484        // Declare dead letter exchange
485        let dle_options = ExchangeDeclareOptions {
486            passive: false,
487            durable: true,
488            auto_delete: false,
489            internal: false,
490            nowait: false,
491        };
492
493        channel
494            .exchange_declare(
495                dle_name,
496                ExchangeKind::Direct,
497                dle_options,
498                FieldTable::default(),
499            )
500            .await?;
501
502        // Declare dead letter queue if configured
503        if let Some(ref dlq_name) = self.retry_policy.dead_letter_queue {
504            let dlq_options = QueueDeclareOptions {
505                passive: false,
506                durable: true,
507                exclusive: false,
508                auto_delete: false,
509                nowait: false,
510            };
511
512            channel
513                .queue_declare(dlq_name, dlq_options, FieldTable::default())
514                .await?;
515
516            // Bind dead letter queue to dead letter exchange
517            channel
518                .queue_bind(
519                    dlq_name,
520                    dle_name,
521                    "dead-letter",
522                    QueueBindOptions::default(),
523                    FieldTable::default(),
524                )
525                .await?;
526
527            debug!("Setup dead letter queue: {}", dlq_name);
528        }
529
530        debug!("Setup dead letter exchange: {}", dle_name);
531        Ok(())
532    }
533
534    /// Publish a message with retry mechanism
535    pub async fn publish_with_retry<T>(
536        &self,
537        original_queue: &str,
538        message: &T,
539        retry_count: u32,
540        original_headers: Option<FieldTable>,
541    ) -> Result<()>
542    where
543        T: Serialize,
544    {
545        if retry_count >= self.retry_policy.max_retries {
546            // Send to dead letter exchange
547            if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
548                return self
549                    .send_to_dead_letter(message, dle, original_headers)
550                    .await;
551            } else {
552                return Err(RabbitError::RetryExhausted(format!(
553                    "Max retries ({}) exceeded for queue: {}",
554                    self.retry_policy.max_retries, original_queue
555                )));
556            }
557        }
558
559        // Ensure the queue is set up for retry (binding to delayed exchange)
560        self.setup_queue_retry(original_queue).await?;
561
562        let delay = self.retry_policy.calculate_delay(retry_count);
563        let connection = self.connection_manager.get_connection().await?;
564        let channel = connection.create_channel().await?;
565
566        // Serialize message
567        let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
568
569        // Build properties with delay header
570        let mut properties = BasicProperties::default()
571            .with_content_type("application/json".into())
572            .with_delivery_mode(2); // Persistent
573
574        // Add delay header for delayed message exchange
575        let mut headers = original_headers.unwrap_or_default();
576        headers.insert(
577            "x-delay".into(),
578            lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
579        );
580        headers.insert(
581            "x-retry-count".into(),
582            lapin::types::AMQPValue::LongInt(retry_count as i32),
583        );
584        headers.insert(
585            "x-original-queue".into(),
586            lapin::types::AMQPValue::LongString(original_queue.into()),
587        );
588
589        properties = properties.with_headers(headers);
590
591        // Publish to delayed exchange with original queue as routing key
592        channel
593            .basic_publish(
594                &self.exchange_name,
595                original_queue, // Use original queue name as routing key
596                BasicPublishOptions::default(),
597                &payload,
598                properties,
599            )
600            .await?;
601
602        info!(
603            "Published retry message for queue: {} (attempt: {}, delay: {:?})",
604            original_queue,
605            retry_count + 1,
606            delay
607        );
608
609        Ok(())
610    }
611
612    /// Send message to dead letter exchange
613    async fn send_to_dead_letter<T>(
614        &self,
615        message: &T,
616        dead_letter_exchange: &str,
617        original_headers: Option<FieldTable>,
618    ) -> Result<()>
619    where
620        T: Serialize,
621    {
622        let connection = self.connection_manager.get_connection().await?;
623        let channel = connection.create_channel().await?;
624
625        // Serialize message
626        let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
627
628        // Build properties
629        let mut properties = BasicProperties::default()
630            .with_content_type("application/json".into())
631            .with_delivery_mode(2); // Persistent
632
633        // Add failure headers
634        let mut headers = original_headers.unwrap_or_default();
635        headers.insert(
636            "x-death-reason".into(),
637            lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
638        );
639        headers.insert(
640            "x-death-timestamp".into(),
641            lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp()),
642        );
643
644        properties = properties.with_headers(headers);
645
646        // Publish to dead letter exchange
647        channel
648            .basic_publish(
649                dead_letter_exchange,
650                "dead-letter", // Fixed routing key for dead letters
651                BasicPublishOptions::default(),
652                &payload,
653                properties,
654            )
655            .await?;
656
657        warn!(
658            "Message sent to dead letter exchange: {}",
659            dead_letter_exchange
660        );
661        Ok(())
662    }
663
664    /// Setup retry queues for a specific original queue
665    pub async fn setup_retry_queues(&self, original_queue: &str) -> Result<()> {
666        let connection = self.connection_manager.get_connection().await?;
667        let channel = connection.create_channel().await?;
668
669        // Setup retry queues for each retry attempt
670        for attempt in 1..=self.retry_policy.max_retries {
671            let retry_queue_name = self
672                .retry_policy
673                .get_retry_queue_name(original_queue, attempt);
674
675            // Create retry queue arguments
676            let mut arguments = FieldTable::default();
677
678            // Set message TTL based on retry delay
679            let delay = self.retry_policy.calculate_delay(attempt - 1);
680            arguments.insert(
681                "x-message-ttl".into(),
682                lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
683            );
684
685            // Set dead letter exchange to route back to original queue or next retry
686            if attempt < self.retry_policy.max_retries {
687                arguments.insert(
688                    "x-dead-letter-exchange".into(),
689                    lapin::types::AMQPValue::LongString("".into()), // Default exchange
690                );
691                arguments.insert(
692                    "x-dead-letter-routing-key".into(),
693                    lapin::types::AMQPValue::LongString(original_queue.into()),
694                );
695            } else {
696                // Last retry attempt - send to dead letter exchange
697                if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
698                    arguments.insert(
699                        "x-dead-letter-exchange".into(),
700                        lapin::types::AMQPValue::LongString(dle.clone().into()),
701                    );
702                    arguments.insert(
703                        "x-dead-letter-routing-key".into(),
704                        lapin::types::AMQPValue::LongString("dead-letter".into()),
705                    );
706                }
707            }
708
709            // Declare retry queue
710            let queue_options = QueueDeclareOptions {
711                passive: false,
712                durable: true,
713                exclusive: false,
714                auto_delete: false,
715                nowait: false,
716            };
717
718            channel
719                .queue_declare(&retry_queue_name, queue_options, arguments)
720                .await?;
721
722            debug!(
723                "Setup retry queue: {} for attempt: {}",
724                retry_queue_name, attempt
725            );
726        }
727
728        info!("Retry queues setup completed for: {}", original_queue);
729        Ok(())
730    }
731}
732
733/// Retry message wrapper for serialization
734#[derive(Debug, Serialize, Deserialize)]
735pub struct RetryMessage<T> {
736    pub original_message: T,
737    pub retry_count: u32,
738    pub original_queue: String,
739    pub original_headers: Option<serde_json::Value>,
740    pub retry_timestamp: chrono::DateTime<chrono::Utc>,
741}
742
743impl<T> RetryMessage<T> {
744    pub fn new(
745        original_message: T,
746        retry_count: u32,
747        original_queue: String,
748        original_headers: Option<serde_json::Value>,
749    ) -> Self {
750        Self {
751            original_message,
752            retry_count,
753            original_queue,
754            original_headers,
755            retry_timestamp: chrono::Utc::now(),
756        }
757    }
758}
759
760#[cfg(test)]
761mod delay_exchange_binding_tests {
762    use super::*;
763    use crate::{
764        config::RabbitConfig,
765        connection::ConnectionManager,
766        retry::{DelayedMessageExchange, RetryPolicy},
767    };
768    use std::time::Duration;
769
770    #[tokio::test]
771    async fn test_delay_exchange_setup_queue_retry_method_exists() {
772        // This test verifies that the setup_queue_retry method exists and can be called
773        let config = RabbitConfig::builder()
774            .connection_string("amqp://test:test@localhost:5672/")
775            .build();
776
777        // This will fail to connect but that's ok - we're just testing the API exists
778        let connection_manager = match ConnectionManager::new(config).await {
779            Ok(cm) => cm,
780            Err(_) => return, // Skip test if no RabbitMQ available
781        };
782
783        let retry_policy = RetryPolicy::default();
784        let delayed_exchange =
785            DelayedMessageExchange::new(connection_manager, "test.retry".to_string(), retry_policy);
786
787        // The fact that this compiles means our API is correct
788        // We can't actually test the functionality without RabbitMQ running
789        let _result = delayed_exchange.setup_queue_retry("test_queue").await;
790
791        // If we get here, the method exists and has the right signature
792        assert!(true, "setup_queue_retry method exists and is callable");
793    }
794
795    #[test]
796    fn test_delay_exchange_api_structure() {
797        // Test that DelayedMessageExchange has the expected methods
798        use std::any::type_name;
799
800        // This compilation test verifies our API structure
801        let type_name = type_name::<DelayedMessageExchange>();
802        assert!(type_name.contains("DelayedMessageExchange"));
803
804        // The fact that this code compiles means all our imports and types are correct
805        let _config = RabbitConfig::builder().connection_string("test").build();
806
807        let _policy = RetryPolicy::builder()
808            .max_retries(3)
809            .initial_delay(Duration::from_millis(100))
810            .build();
811
812        // Test passes if compilation succeeds
813        assert!(true);
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820    use std::time::Duration;
821
822    #[test]
823    fn test_retry_policy_default() {
824        let policy = RetryPolicy::default();
825        assert_eq!(policy.max_retries, 3);
826        assert_eq!(policy.initial_delay, Duration::from_millis(1000));
827        assert_eq!(policy.max_delay, Duration::from_secs(60));
828        assert_eq!(policy.backoff_multiplier, 2.0);
829        assert_eq!(policy.jitter, 0.1);
830    }
831
832    #[test]
833    fn test_retry_policy_calculate_delay() {
834        let policy = RetryPolicy {
835            initial_delay: Duration::from_millis(1000),
836            max_delay: Duration::from_secs(30),
837            backoff_multiplier: 2.0,
838            jitter: 0.0, // No jitter for predictable tests
839            ..Default::default()
840        };
841
842        let delay1 = policy.calculate_delay(0);
843        assert_eq!(delay1, Duration::from_millis(1000));
844
845        let delay2 = policy.calculate_delay(1);
846        assert_eq!(delay2, Duration::from_millis(2000));
847
848        let delay3 = policy.calculate_delay(2);
849        assert_eq!(delay3, Duration::from_millis(4000));
850
851        // Test max delay cap
852        let delay_large = policy.calculate_delay(10);
853        assert_eq!(delay_large, Duration::from_secs(30));
854    }
855
856    #[test]
857    fn test_retry_queue_name_generation() {
858        let policy = RetryPolicy::default();
859        let queue_name = policy.get_retry_queue_name("orders", 1);
860        assert_eq!(queue_name, "orders.retry.1");
861
862        let queue_name = policy.get_retry_queue_name("user-events", 3);
863        assert_eq!(queue_name, "user-events.retry.3");
864    }
865
866    #[test]
867    fn test_retry_message_creation() {
868        let original_message = "test message";
869        let retry_msg = RetryMessage::new(original_message, 2, "test-queue".to_string(), None);
870
871        assert_eq!(retry_msg.original_message, "test message");
872        assert_eq!(retry_msg.retry_count, 2);
873        assert_eq!(retry_msg.original_queue, "test-queue");
874        assert!(retry_msg.original_headers.is_none());
875    }
876}