rust_rabbit/
retry.rs

1use crate::{
2    connection::ConnectionManager,
3    error::{RabbitError, Result},
4};
5use lapin::{
6    options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
7    types::FieldTable,
8    BasicProperties, Channel, ExchangeKind,
9};
10use serde::{Deserialize, Serialize};
11use std::time::Duration;
12use tracing::{debug, info, warn};
13
14/// Retry policy configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetryPolicy {
17    /// Maximum number of retry attempts
18    pub max_retries: u32,
19
20    /// Initial delay between retries
21    pub initial_delay: Duration,
22
23    /// Maximum delay between retries
24    pub max_delay: Duration,
25
26    /// Multiplier for exponential backoff
27    pub backoff_multiplier: f64,
28
29    /// Jitter factor (0.0 to 1.0) to add randomness to delays
30    pub jitter: f64,
31
32    /// Retry queue naming pattern
33    pub retry_queue_pattern: String,
34
35    /// Dead letter exchange for failed messages
36    pub dead_letter_exchange: Option<String>,
37
38    /// Dead letter queue for failed messages
39    pub dead_letter_queue: Option<String>,
40}
41
42impl Default for RetryPolicy {
43    fn default() -> Self {
44        Self {
45            max_retries: 3,
46            initial_delay: Duration::from_millis(1000),
47            max_delay: Duration::from_secs(60),
48            backoff_multiplier: 2.0,
49            jitter: 0.1,
50            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
51            dead_letter_exchange: Some("dead-letter".to_string()),
52            dead_letter_queue: Some("dead-letter-queue".to_string()),
53        }
54    }
55}
56
57impl RetryPolicy {
58    /// Calculate delay for a specific retry attempt
59    pub fn calculate_delay(&self, attempt: u32) -> Duration {
60        let base_delay = Duration::from_millis(
61            (self.initial_delay.as_millis() as f64 * self.backoff_multiplier.powi(attempt as i32))
62                as u64,
63        );
64
65        let delay = if base_delay > self.max_delay {
66            self.max_delay
67        } else {
68            base_delay
69        };
70
71        // Add jitter to prevent thundering herd
72        if self.jitter > 0.0 {
73            let jitter_amount = (delay.as_millis() as f64 * self.jitter) as u64;
74            let jitter = fastrand::u64(0..=jitter_amount);
75            Duration::from_millis(delay.as_millis() as u64 + jitter)
76        } else {
77            delay
78        }
79    }
80
81    /// Generate retry queue name for a specific attempt
82    pub fn get_retry_queue_name(&self, original_queue: &str, attempt: u32) -> String {
83        self.retry_queue_pattern
84            .replace("{queue_name}", original_queue)
85            .replace("{attempt}", &attempt.to_string())
86    }
87
88    /// Create a fast retry policy (quick retries for transient errors)
89    pub fn fast() -> Self {
90        Self {
91            max_retries: 5,
92            initial_delay: Duration::from_millis(200),
93            max_delay: Duration::from_secs(10),
94            backoff_multiplier: 1.5,
95            jitter: 0.05,
96            dead_letter_exchange: Some("fast.dlx".to_string()),
97            dead_letter_queue: Some("fast.dlq".to_string()),
98            ..Default::default()
99        }
100    }
101
102    /// Create a fast retry policy with custom dead letter names based on queue
103    pub fn fast_for_queue<S: Into<String>>(queue_name: S) -> Self {
104        let queue = queue_name.into();
105        Self {
106            max_retries: 5,
107            initial_delay: Duration::from_millis(200),
108            max_delay: Duration::from_secs(10),
109            backoff_multiplier: 1.5,
110            jitter: 0.05,
111            dead_letter_exchange: Some(format!("{}.dlx", queue)),
112            dead_letter_queue: Some(format!("{}.dlq", queue)),
113            ..Default::default()
114        }
115    }
116
117    /// Create a slow retry policy (for operations that need more time)
118    pub fn slow() -> Self {
119        Self {
120            max_retries: 3,
121            initial_delay: Duration::from_secs(5),
122            max_delay: Duration::from_secs(300), // 5 minutes
123            backoff_multiplier: 2.5,
124            jitter: 0.2,
125            dead_letter_exchange: Some("slow.dlx".to_string()),
126            dead_letter_queue: Some("slow.dlq".to_string()),
127            ..Default::default()
128        }
129    }
130
131    /// Create a slow retry policy with custom dead letter names based on queue
132    pub fn slow_for_queue<S: Into<String>>(queue_name: S) -> Self {
133        let queue = queue_name.into();
134        Self {
135            max_retries: 3,
136            initial_delay: Duration::from_secs(5),
137            max_delay: Duration::from_secs(300),
138            backoff_multiplier: 2.5,
139            jitter: 0.2,
140            dead_letter_exchange: Some(format!("{}.dlx", queue)),
141            dead_letter_queue: Some(format!("{}.dlq", queue)),
142            ..Default::default()
143        }
144    }
145
146    /// Create an aggressive retry policy (many attempts with exponential backoff)
147    pub fn aggressive() -> Self {
148        Self {
149            max_retries: 10,
150            initial_delay: Duration::from_millis(100),
151            max_delay: Duration::from_secs(120), // 2 minutes
152            backoff_multiplier: 2.0,
153            jitter: 0.15,
154            dead_letter_exchange: Some("aggressive.dlx".to_string()),
155            dead_letter_queue: Some("aggressive.dlq".to_string()),
156            ..Default::default()
157        }
158    }
159
160    /// Create a conservative retry policy (few attempts, larger delays)
161    pub fn conservative() -> Self {
162        Self {
163            max_retries: 2,
164            initial_delay: Duration::from_secs(30),
165            max_delay: Duration::from_secs(600), // 10 minutes
166            backoff_multiplier: 2.0,
167            jitter: 0.3,
168            dead_letter_exchange: Some("conservative.dlx".to_string()),
169            dead_letter_queue: Some("conservative.dlq".to_string()),
170            ..Default::default()
171        }
172    }
173
174    /// Create a linear retry policy (fixed delay between retries)
175    pub fn linear(delay: Duration, max_retries: u32) -> Self {
176        Self {
177            max_retries,
178            initial_delay: delay,
179            max_delay: delay,        // Same as initial = linear
180            backoff_multiplier: 1.0, // No exponential growth
181            jitter: 0.0,
182            dead_letter_exchange: Some("linear.dlx".to_string()),
183            dead_letter_queue: Some("linear.dlq".to_string()),
184            ..Default::default()
185        }
186    }
187
188    /// Create a no-retry policy (fail immediately, no retries)
189    pub fn no_retry() -> Self {
190        Self {
191            max_retries: 0,
192            initial_delay: Duration::from_secs(0),
193            max_delay: Duration::from_secs(0),
194            backoff_multiplier: 1.0,
195            jitter: 0.0,
196            dead_letter_exchange: Some("immediate.dlx".to_string()),
197            dead_letter_queue: Some("immediate.dlq".to_string()),
198            ..Default::default()
199        }
200    }
201
202    /// Create a minutes-based exponential retry policy (1min, 2min, 4min, 8min, 16min)
203    pub fn minutes_exponential() -> Self {
204        Self {
205            max_retries: 5,
206            initial_delay: Duration::from_secs(60), // 1 minute
207            max_delay: Duration::from_secs(1800),   // 30 minutes cap
208            backoff_multiplier: 2.0,                // Double each time
209            jitter: 0.1,                            // 10% jitter
210            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
211            dead_letter_exchange: Some("minutes.dlx".to_string()),
212            dead_letter_queue: Some("minutes.dlq".to_string()),
213        }
214    }
215
216    /// Create a minutes-based exponential retry policy with custom dead letter names
217    pub fn minutes_exponential_for_queue<S: Into<String>>(queue_name: S) -> Self {
218        let queue = queue_name.into();
219        Self {
220            max_retries: 5,
221            initial_delay: Duration::from_secs(60),
222            max_delay: Duration::from_secs(1800),
223            backoff_multiplier: 2.0,
224            jitter: 0.1,
225            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
226            dead_letter_exchange: Some(format!("{}.dlx", queue)),
227            dead_letter_queue: Some(format!("{}.dlq", queue)),
228        }
229    }
230
231    /// Create a custom retry policy with builder pattern
232    pub fn builder() -> RetryPolicyBuilder {
233        RetryPolicyBuilder::new()
234    }
235}
236
237/// Builder for RetryPolicy
238#[derive(Debug, Clone)]
239pub struct RetryPolicyBuilder {
240    max_retries: u32,
241    initial_delay: Duration,
242    max_delay: Duration,
243    backoff_multiplier: f64,
244    jitter: f64,
245    retry_queue_pattern: String,
246    dead_letter_exchange: Option<String>,
247    dead_letter_queue: Option<String>,
248}
249
250impl Default for RetryPolicyBuilder {
251    fn default() -> Self {
252        Self::new()
253    }
254}
255
256impl RetryPolicyBuilder {
257    /// Create a new builder with default values
258    pub fn new() -> Self {
259        Self {
260            max_retries: 3,
261            initial_delay: Duration::from_secs(1),
262            max_delay: Duration::from_secs(60),
263            backoff_multiplier: 2.0,
264            jitter: 0.1,
265            retry_queue_pattern: "{queue_name}.retry.{attempt}".to_string(),
266            dead_letter_exchange: Some("dead-letter".to_string()),
267            dead_letter_queue: Some("dead-letter-queue".to_string()),
268        }
269    }
270
271    /// Set maximum number of retry attempts
272    pub fn max_retries(mut self, max_retries: u32) -> Self {
273        self.max_retries = max_retries;
274        self
275    }
276
277    /// Set initial delay between retries
278    pub fn initial_delay(mut self, delay: Duration) -> Self {
279        self.initial_delay = delay;
280        self
281    }
282
283    /// Set maximum delay between retries
284    pub fn max_delay(mut self, delay: Duration) -> Self {
285        self.max_delay = delay;
286        self
287    }
288
289    /// Set backoff multiplier for exponential backoff
290    pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
291        self.backoff_multiplier = multiplier;
292        self
293    }
294
295    /// Set jitter factor (0.0 to 1.0)
296    pub fn jitter(mut self, jitter: f64) -> Self {
297        self.jitter = jitter.clamp(0.0, 1.0);
298        self
299    }
300
301    /// Set dead letter exchange name
302    pub fn dead_letter_exchange<S: Into<String>>(mut self, exchange: S) -> Self {
303        self.dead_letter_exchange = Some(exchange.into());
304        self
305    }
306
307    /// Set dead letter queue name
308    pub fn dead_letter_queue<S: Into<String>>(mut self, queue: S) -> Self {
309        self.dead_letter_queue = Some(queue.into());
310        self
311    }
312
313    /// Disable dead letter exchange (messages will be discarded after max retries)
314    pub fn no_dead_letter(mut self) -> Self {
315        self.dead_letter_exchange = None;
316        self.dead_letter_queue = None;
317        self
318    }
319
320    /// Set retry queue naming pattern
321    pub fn retry_queue_pattern<S: Into<String>>(mut self, pattern: S) -> Self {
322        self.retry_queue_pattern = pattern.into();
323        self
324    }
325
326    /// Configure for fast retries (preset)
327    pub fn fast_preset(mut self) -> Self {
328        self.max_retries = 5;
329        self.initial_delay = Duration::from_millis(200);
330        self.max_delay = Duration::from_secs(10);
331        self.backoff_multiplier = 1.5;
332        self.jitter = 0.05;
333        self
334    }
335
336    /// Configure for slow retries (preset)
337    pub fn slow_preset(mut self) -> Self {
338        self.max_retries = 3;
339        self.initial_delay = Duration::from_secs(5);
340        self.max_delay = Duration::from_secs(300);
341        self.backoff_multiplier = 2.5;
342        self.jitter = 0.2;
343        self
344    }
345
346    /// Configure for linear retries (preset)
347    pub fn linear_preset(mut self, delay: Duration) -> Self {
348        self.initial_delay = delay;
349        self.max_delay = delay;
350        self.backoff_multiplier = 1.0;
351        self.jitter = 0.0;
352        self
353    }
354
355    /// Build the final RetryPolicy
356    pub fn build(self) -> RetryPolicy {
357        RetryPolicy {
358            max_retries: self.max_retries,
359            initial_delay: self.initial_delay,
360            max_delay: self.max_delay,
361            backoff_multiplier: self.backoff_multiplier,
362            jitter: self.jitter,
363            retry_queue_pattern: self.retry_queue_pattern,
364            dead_letter_exchange: self.dead_letter_exchange,
365            dead_letter_queue: self.dead_letter_queue,
366        }
367    }
368}
369
370/// Delayed Message Exchange handler for implementing retry mechanism
371pub struct DelayedMessageExchange {
372    connection_manager: ConnectionManager,
373    exchange_name: String,
374    retry_policy: RetryPolicy,
375}
376
377impl DelayedMessageExchange {
378    /// Create a new DelayedMessageExchange
379    pub fn new(
380        connection_manager: ConnectionManager,
381        exchange_name: String,
382        retry_policy: RetryPolicy,
383    ) -> Self {
384        Self {
385            connection_manager,
386            exchange_name,
387            retry_policy,
388        }
389    }
390
391    /// Setup the delayed message exchange and retry infrastructure
392    pub async fn setup(&self) -> Result<()> {
393        let connection = self.connection_manager.get_connection().await?;
394        let channel = connection.create_channel().await?;
395
396        // Declare the delayed message exchange
397        self.declare_delayed_exchange(&channel).await?;
398
399        // Setup dead letter exchange and queue if configured
400        if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
401            self.setup_dead_letter_infrastructure(&channel, dle).await?;
402        }
403
404        info!(
405            "Delayed message exchange setup completed: {}",
406            self.exchange_name
407        );
408        Ok(())
409    }
410
411    /// Declare the delayed message exchange
412    async fn declare_delayed_exchange(&self, channel: &Channel) -> Result<()> {
413        // Arguments for delayed message exchange
414        let mut arguments = FieldTable::default();
415        arguments.insert(
416            "x-delayed-type".into(),
417            lapin::types::AMQPValue::LongString("direct".into()),
418        );
419
420        let options = ExchangeDeclareOptions {
421            passive: false,
422            durable: true,
423            auto_delete: false,
424            internal: false,
425            nowait: false,
426        };
427
428        channel
429            .exchange_declare(
430                &self.exchange_name,
431                ExchangeKind::Custom("x-delayed-message".to_string()),
432                options,
433                arguments,
434            )
435            .await?;
436
437        debug!("Declared delayed message exchange: {}", self.exchange_name);
438        Ok(())
439    }
440
441    /// Setup dead letter exchange and queue
442    async fn setup_dead_letter_infrastructure(
443        &self,
444        channel: &Channel,
445        dle_name: &str,
446    ) -> Result<()> {
447        // Declare dead letter exchange
448        let dle_options = ExchangeDeclareOptions {
449            passive: false,
450            durable: true,
451            auto_delete: false,
452            internal: false,
453            nowait: false,
454        };
455
456        channel
457            .exchange_declare(
458                dle_name,
459                ExchangeKind::Direct,
460                dle_options,
461                FieldTable::default(),
462            )
463            .await?;
464
465        // Declare dead letter queue if configured
466        if let Some(ref dlq_name) = self.retry_policy.dead_letter_queue {
467            let dlq_options = QueueDeclareOptions {
468                passive: false,
469                durable: true,
470                exclusive: false,
471                auto_delete: false,
472                nowait: false,
473            };
474
475            channel
476                .queue_declare(dlq_name, dlq_options, FieldTable::default())
477                .await?;
478
479            // Bind dead letter queue to dead letter exchange
480            channel
481                .queue_bind(
482                    dlq_name,
483                    dle_name,
484                    "dead-letter",
485                    QueueBindOptions::default(),
486                    FieldTable::default(),
487                )
488                .await?;
489
490            debug!("Setup dead letter queue: {}", dlq_name);
491        }
492
493        debug!("Setup dead letter exchange: {}", dle_name);
494        Ok(())
495    }
496
497    /// Publish a message with retry mechanism
498    pub async fn publish_with_retry<T>(
499        &self,
500        original_queue: &str,
501        message: &T,
502        retry_count: u32,
503        original_headers: Option<FieldTable>,
504    ) -> Result<()>
505    where
506        T: Serialize,
507    {
508        if retry_count >= self.retry_policy.max_retries {
509            // Send to dead letter exchange
510            if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
511                return self
512                    .send_to_dead_letter(message, dle, original_headers)
513                    .await;
514            } else {
515                return Err(RabbitError::RetryExhausted(format!(
516                    "Max retries ({}) exceeded for queue: {}",
517                    self.retry_policy.max_retries, original_queue
518                )));
519            }
520        }
521
522        let delay = self.retry_policy.calculate_delay(retry_count);
523        let connection = self.connection_manager.get_connection().await?;
524        let channel = connection.create_channel().await?;
525
526        // Serialize message
527        let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
528
529        // Build properties with delay header
530        let mut properties = BasicProperties::default()
531            .with_content_type("application/json".into())
532            .with_delivery_mode(2); // Persistent
533
534        // Add delay header for delayed message exchange
535        let mut headers = original_headers.unwrap_or_default();
536        headers.insert(
537            "x-delay".into(),
538            lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
539        );
540        headers.insert(
541            "x-retry-count".into(),
542            lapin::types::AMQPValue::LongInt(retry_count as i32),
543        );
544        headers.insert(
545            "x-original-queue".into(),
546            lapin::types::AMQPValue::LongString(original_queue.into()),
547        );
548
549        properties = properties.with_headers(headers);
550
551        // Publish to delayed exchange with original queue as routing key
552        channel
553            .basic_publish(
554                &self.exchange_name,
555                original_queue, // Use original queue name as routing key
556                BasicPublishOptions::default(),
557                &payload,
558                properties,
559            )
560            .await?;
561
562        info!(
563            "Published retry message for queue: {} (attempt: {}, delay: {:?})",
564            original_queue,
565            retry_count + 1,
566            delay
567        );
568
569        Ok(())
570    }
571
572    /// Send message to dead letter exchange
573    async fn send_to_dead_letter<T>(
574        &self,
575        message: &T,
576        dead_letter_exchange: &str,
577        original_headers: Option<FieldTable>,
578    ) -> Result<()>
579    where
580        T: Serialize,
581    {
582        let connection = self.connection_manager.get_connection().await?;
583        let channel = connection.create_channel().await?;
584
585        // Serialize message
586        let payload = serde_json::to_vec(message).map_err(RabbitError::Serialization)?;
587
588        // Build properties
589        let mut properties = BasicProperties::default()
590            .with_content_type("application/json".into())
591            .with_delivery_mode(2); // Persistent
592
593        // Add failure headers
594        let mut headers = original_headers.unwrap_or_default();
595        headers.insert(
596            "x-death-reason".into(),
597            lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
598        );
599        headers.insert(
600            "x-death-timestamp".into(),
601            lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp()),
602        );
603
604        properties = properties.with_headers(headers);
605
606        // Publish to dead letter exchange
607        channel
608            .basic_publish(
609                dead_letter_exchange,
610                "dead-letter", // Fixed routing key for dead letters
611                BasicPublishOptions::default(),
612                &payload,
613                properties,
614            )
615            .await?;
616
617        warn!(
618            "Message sent to dead letter exchange: {}",
619            dead_letter_exchange
620        );
621        Ok(())
622    }
623
624    /// Setup retry queues for a specific original queue
625    pub async fn setup_retry_queues(&self, original_queue: &str) -> Result<()> {
626        let connection = self.connection_manager.get_connection().await?;
627        let channel = connection.create_channel().await?;
628
629        // Setup retry queues for each retry attempt
630        for attempt in 1..=self.retry_policy.max_retries {
631            let retry_queue_name = self
632                .retry_policy
633                .get_retry_queue_name(original_queue, attempt);
634
635            // Create retry queue arguments
636            let mut arguments = FieldTable::default();
637
638            // Set message TTL based on retry delay
639            let delay = self.retry_policy.calculate_delay(attempt - 1);
640            arguments.insert(
641                "x-message-ttl".into(),
642                lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
643            );
644
645            // Set dead letter exchange to route back to original queue or next retry
646            if attempt < self.retry_policy.max_retries {
647                arguments.insert(
648                    "x-dead-letter-exchange".into(),
649                    lapin::types::AMQPValue::LongString("".into()), // Default exchange
650                );
651                arguments.insert(
652                    "x-dead-letter-routing-key".into(),
653                    lapin::types::AMQPValue::LongString(original_queue.into()),
654                );
655            } else {
656                // Last retry attempt - send to dead letter exchange
657                if let Some(ref dle) = self.retry_policy.dead_letter_exchange {
658                    arguments.insert(
659                        "x-dead-letter-exchange".into(),
660                        lapin::types::AMQPValue::LongString(dle.clone().into()),
661                    );
662                    arguments.insert(
663                        "x-dead-letter-routing-key".into(),
664                        lapin::types::AMQPValue::LongString("dead-letter".into()),
665                    );
666                }
667            }
668
669            // Declare retry queue
670            let queue_options = QueueDeclareOptions {
671                passive: false,
672                durable: true,
673                exclusive: false,
674                auto_delete: false,
675                nowait: false,
676            };
677
678            channel
679                .queue_declare(&retry_queue_name, queue_options, arguments)
680                .await?;
681
682            debug!(
683                "Setup retry queue: {} for attempt: {}",
684                retry_queue_name, attempt
685            );
686        }
687
688        info!("Retry queues setup completed for: {}", original_queue);
689        Ok(())
690    }
691}
692
693/// Retry message wrapper for serialization
694#[derive(Debug, Serialize, Deserialize)]
695pub struct RetryMessage<T> {
696    pub original_message: T,
697    pub retry_count: u32,
698    pub original_queue: String,
699    pub original_headers: Option<serde_json::Value>,
700    pub retry_timestamp: chrono::DateTime<chrono::Utc>,
701}
702
703impl<T> RetryMessage<T> {
704    pub fn new(
705        original_message: T,
706        retry_count: u32,
707        original_queue: String,
708        original_headers: Option<serde_json::Value>,
709    ) -> Self {
710        Self {
711            original_message,
712            retry_count,
713            original_queue,
714            original_headers,
715            retry_timestamp: chrono::Utc::now(),
716        }
717    }
718}
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use std::time::Duration;
724
725    #[test]
726    fn test_retry_policy_default() {
727        let policy = RetryPolicy::default();
728        assert_eq!(policy.max_retries, 3);
729        assert_eq!(policy.initial_delay, Duration::from_millis(1000));
730        assert_eq!(policy.max_delay, Duration::from_secs(60));
731        assert_eq!(policy.backoff_multiplier, 2.0);
732        assert_eq!(policy.jitter, 0.1);
733    }
734
735    #[test]
736    fn test_retry_policy_calculate_delay() {
737        let policy = RetryPolicy {
738            initial_delay: Duration::from_millis(1000),
739            max_delay: Duration::from_secs(30),
740            backoff_multiplier: 2.0,
741            jitter: 0.0, // No jitter for predictable tests
742            ..Default::default()
743        };
744
745        let delay1 = policy.calculate_delay(0);
746        assert_eq!(delay1, Duration::from_millis(1000));
747
748        let delay2 = policy.calculate_delay(1);
749        assert_eq!(delay2, Duration::from_millis(2000));
750
751        let delay3 = policy.calculate_delay(2);
752        assert_eq!(delay3, Duration::from_millis(4000));
753
754        // Test max delay cap
755        let delay_large = policy.calculate_delay(10);
756        assert_eq!(delay_large, Duration::from_secs(30));
757    }
758
759    #[test]
760    fn test_retry_queue_name_generation() {
761        let policy = RetryPolicy::default();
762        let queue_name = policy.get_retry_queue_name("orders", 1);
763        assert_eq!(queue_name, "orders.retry.1");
764
765        let queue_name = policy.get_retry_queue_name("user-events", 3);
766        assert_eq!(queue_name, "user-events.retry.3");
767    }
768
769    #[test]
770    fn test_retry_message_creation() {
771        let original_message = "test message";
772        let retry_msg = RetryMessage::new(original_message, 2, "test-queue".to_string(), None);
773
774        assert_eq!(retry_msg.original_message, "test message");
775        assert_eq!(retry_msg.retry_count, 2);
776        assert_eq!(retry_msg.original_queue, "test-queue");
777        assert!(retry_msg.original_headers.is_none());
778    }
779}