rust_rabbit/
consumer.rs

1use crate::{
2    connection::ConnectionManager,
3    error::{ProcessingError, RabbitError, Result},
4    metrics::RustRabbitMetrics,
5    publisher::{CustomExchangeDeclareOptions, CustomQueueDeclareOptions, Publisher},
6    retry::{DelayedMessageExchange, RetryPolicy},
7};
8use async_trait::async_trait;
9use futures::StreamExt;
10use lapin::{
11    message::Delivery,
12    options::{
13        BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
14        BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions,
15        QueueDeclareOptions as LapinQueueDeclareOptions,
16    },
17    types::FieldTable,
18    BasicProperties, Channel, ExchangeKind,
19};
20use serde::de::DeserializeOwned;
21use std::sync::Arc;
22use tokio::sync::Semaphore;
23use tracing::{debug, error, info, warn};
24
25/// Base consumer trait for processing messages with smart retry handling
26///
27/// This trait provides a simplified interface where:
28/// - Messages are automatically ACK'd after successful processing
29/// - Retryable errors automatically publish to delay exchange
30/// - Non-retryable errors send to DLQ or discard based on configuration
31#[async_trait]
32pub trait BaseConsumer<T>: Send + Sync + 'static
33where
34    T: DeserializeOwned + Send + Sync,
35{
36    /// Process a message and return the result
37    ///
38    /// # Returns
39    /// - `Ok(())` - Message processed successfully, will be ACK'd automatically
40    /// - `Err(ProcessingError::Retryable { .. })` - Will retry with delay exchange
41    /// - `Err(ProcessingError::NonRetryable { .. })` - Will reject/send to DLQ
42    async fn handle(
43        &self,
44        message: T,
45        context: MessageContext,
46    ) -> std::result::Result<(), ProcessingError>;
47}
48
49/// Message handler trait for processing consumed messages (legacy, prefer BaseConsumer)
50#[async_trait]
51pub trait MessageHandler<T>: Send + Sync + 'static
52where
53    T: DeserializeOwned + Send + Sync,
54{
55    /// Handle a received message
56    async fn handle(&self, message: T, context: MessageContext) -> MessageResult;
57}
58
59/// Context information for a received message
60#[derive(Debug, Clone)]
61pub struct MessageContext {
62    pub message_id: Option<String>,
63    pub correlation_id: Option<String>,
64    pub reply_to: Option<String>,
65    pub delivery_tag: u64,
66    pub redelivered: bool,
67    pub exchange: String,
68    pub routing_key: String,
69    pub headers: FieldTable,
70    pub timestamp: Option<u64>,
71    pub retry_count: u32,
72}
73
74/// Result of message processing
75#[derive(Debug)]
76pub enum MessageResult {
77    /// Message processed successfully
78    Ack,
79    /// Message processing failed, should be retried
80    Retry,
81    /// Message processing failed permanently, should be rejected
82    Reject,
83    /// Message processing failed, should be requeued
84    Requeue,
85}
86
87/// Consumer options
88#[derive(Debug, Clone)]
89pub struct ConsumerOptions {
90    /// Queue name to consume from
91    pub queue_name: String,
92
93    /// Consumer tag (optional)
94    pub consumer_tag: Option<String>,
95
96    /// Number of concurrent message processors
97    pub concurrency: usize,
98
99    /// Prefetch count (QoS)
100    pub prefetch_count: Option<u16>,
101
102    /// Auto-declare queue before consuming
103    pub auto_declare_queue: bool,
104
105    /// Queue declaration options
106    pub queue_options: CustomQueueDeclareOptions,
107
108    /// Auto-declare exchange and bind to queue
109    pub auto_declare_exchange: bool,
110
111    /// Exchange name (if not provided, uses queue_name as exchange name)
112    pub exchange_name: Option<String>,
113
114    /// Exchange declaration options
115    pub exchange_options: CustomExchangeDeclareOptions,
116
117    /// Routing key for binding queue to exchange (default: queue_name)
118    pub routing_key: Option<String>,
119
120    /// Retry policy for failed messages
121    pub retry_policy: Option<RetryPolicy>,
122
123    /// Dead letter exchange for failed messages
124    pub dead_letter_exchange: Option<String>,
125
126    /// Auto-ack messages (not recommended for production)
127    pub auto_ack: bool,
128
129    /// Consumer exclusive mode
130    pub exclusive: bool,
131
132    /// Consumer arguments
133    pub arguments: FieldTable,
134}
135
136impl ConsumerOptions {
137    /// Create a new consumer options builder
138    pub fn builder<S: Into<String>>(queue_name: S) -> ConsumerOptionsBuilder {
139        ConsumerOptionsBuilder::new(queue_name.into())
140    }
141}
142
143/// Builder for ConsumerOptions
144#[derive(Debug, Clone)]
145pub struct ConsumerOptionsBuilder {
146    queue_name: String,
147    consumer_tag: Option<String>,
148    concurrency: usize,
149    prefetch_count: Option<u16>,
150    auto_declare_queue: bool,
151    queue_options: CustomQueueDeclareOptions,
152    auto_declare_exchange: bool,
153    exchange_name: Option<String>,
154    exchange_options: CustomExchangeDeclareOptions,
155    routing_key: Option<String>,
156    retry_policy: Option<RetryPolicy>,
157    dead_letter_exchange: Option<String>,
158    auto_ack: bool,
159    exclusive: bool,
160    arguments: FieldTable,
161}
162
163impl ConsumerOptionsBuilder {
164    /// Create a new builder with default values
165    pub fn new(queue_name: String) -> Self {
166        Self {
167            queue_name,
168            consumer_tag: None,
169            concurrency: 1,
170            prefetch_count: Some(10),
171            auto_declare_queue: false,
172            queue_options: CustomQueueDeclareOptions::default(),
173            auto_declare_exchange: false,
174            exchange_name: None,
175            exchange_options: CustomExchangeDeclareOptions::default(),
176            routing_key: None,
177            retry_policy: None,
178            dead_letter_exchange: None,
179            auto_ack: false,
180            exclusive: false,
181            arguments: FieldTable::default(),
182        }
183    }
184
185    /// Set consumer tag
186    pub fn consumer_tag<S: Into<String>>(mut self, tag: S) -> Self {
187        self.consumer_tag = Some(tag.into());
188        self
189    }
190
191    /// Set concurrency level
192    pub fn concurrency(mut self, concurrency: usize) -> Self {
193        self.concurrency = concurrency;
194        self
195    }
196
197    /// Set prefetch count
198    pub fn prefetch_count(mut self, count: u16) -> Self {
199        self.prefetch_count = Some(count);
200        self
201    }
202
203    /// Disable prefetch limit
204    pub fn no_prefetch_limit(mut self) -> Self {
205        self.prefetch_count = None;
206        self
207    }
208
209    /// Enable auto-declare queue
210    pub fn auto_declare_queue(mut self) -> Self {
211        self.auto_declare_queue = true;
212        self
213    }
214
215    /// Enable auto-declare exchange and bind to queue
216    pub fn auto_declare_exchange(mut self) -> Self {
217        self.auto_declare_exchange = true;
218        self
219    }
220
221    /// Set exchange name (if not set, uses queue_name)
222    pub fn exchange_name<S: Into<String>>(mut self, name: S) -> Self {
223        self.exchange_name = Some(name.into());
224        self
225    }
226
227    /// Set exchange options
228    pub fn exchange_options(mut self, options: CustomExchangeDeclareOptions) -> Self {
229        self.exchange_options = options;
230        self
231    }
232
233    /// Set routing key for binding (default: queue_name)
234    pub fn routing_key<S: Into<String>>(mut self, key: S) -> Self {
235        self.routing_key = Some(key.into());
236        self
237    }
238
239    /// Set queue options
240    pub fn queue_options(mut self, options: CustomQueueDeclareOptions) -> Self {
241        self.queue_options = options;
242        self
243    }
244
245    /// Set retry policy
246    pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
247        self.retry_policy = Some(policy);
248        self
249    }
250
251    /// Set dead letter exchange
252    pub fn dead_letter_exchange<S: Into<String>>(mut self, exchange: S) -> Self {
253        self.dead_letter_exchange = Some(exchange.into());
254        self
255    }
256
257    /// Enable auto-ack (not recommended for production)
258    pub fn auto_ack(mut self) -> Self {
259        self.auto_ack = true;
260        self
261    }
262
263    /// Enable manual ack (recommended for production)
264    pub fn manual_ack(mut self) -> Self {
265        self.auto_ack = false;
266        self
267    }
268
269    /// Enable exclusive mode
270    pub fn exclusive(mut self) -> Self {
271        self.exclusive = true;
272        self
273    }
274
275    /// Configure for high throughput
276    pub fn high_throughput(mut self) -> Self {
277        self.concurrency = 20;
278        self.prefetch_count = Some(50);
279        self.auto_ack = false;
280        self
281    }
282
283    /// Configure for reliability (lower throughput but safer)
284    pub fn reliable(mut self) -> Self {
285        self.concurrency = 1;
286        self.prefetch_count = Some(1);
287        self.auto_ack = false;
288        self
289    }
290
291    /// Configure for development (simpler settings)
292    pub fn development(mut self) -> Self {
293        self.concurrency = 1;
294        self.prefetch_count = Some(1);
295        self.auto_ack = true;
296        self.auto_declare_queue = true;
297        self.auto_declare_exchange = true; // Auto-declare exchange in development
298        self
299    }
300
301    /// Configure for minutes exponential retry (1min, 2min, 4min, 8min, 16min - max 5 retries)
302    /// This preset automatically sets up:
303    /// - Auto declare queue and exchange
304    /// - Retry policy with minutes exponential backoff
305    /// - Dead letter exchange/queue based on queue name
306    /// - Reliable processing settings
307    pub fn minutes_retry(mut self) -> Self {
308        let queue_name = self.queue_name.clone();
309
310        self.auto_declare_queue = true;
311        self.auto_declare_exchange = true;
312        self.retry_policy = Some(RetryPolicy::minutes_exponential_for_queue(&queue_name));
313        self.concurrency = 1; // Process one at a time for reliable retries
314        self.prefetch_count = Some(1); // One message at a time
315        self.auto_ack = false; // Manual ack for retry support
316        self
317    }
318
319    /// Build the final configuration
320    pub fn build(self) -> ConsumerOptions {
321        ConsumerOptions {
322            queue_name: self.queue_name,
323            consumer_tag: self.consumer_tag,
324            concurrency: self.concurrency,
325            prefetch_count: self.prefetch_count,
326            auto_declare_queue: self.auto_declare_queue,
327            queue_options: self.queue_options,
328            auto_declare_exchange: self.auto_declare_exchange,
329            exchange_name: self.exchange_name,
330            exchange_options: self.exchange_options,
331            routing_key: self.routing_key,
332            retry_policy: self.retry_policy,
333            dead_letter_exchange: self.dead_letter_exchange,
334            auto_ack: self.auto_ack,
335            exclusive: self.exclusive,
336            arguments: self.arguments,
337        }
338    }
339}
340
341impl Default for ConsumerOptions {
342    fn default() -> Self {
343        Self {
344            queue_name: String::new(),
345            consumer_tag: None,
346            concurrency: 1,
347            prefetch_count: Some(10),
348            auto_declare_queue: false,
349            queue_options: CustomQueueDeclareOptions::default(),
350            auto_declare_exchange: false,
351            exchange_name: None,
352            exchange_options: CustomExchangeDeclareOptions::default(),
353            routing_key: None,
354            retry_policy: None,
355            dead_letter_exchange: None,
356            auto_ack: false,
357            exclusive: false,
358            arguments: FieldTable::default(),
359        }
360    }
361}
362
363/// Consumer for receiving messages from RabbitMQ
364pub struct Consumer {
365    #[allow(dead_code)] // Will be used for connection health monitoring
366    connection_manager: ConnectionManager,
367    options: ConsumerOptions,
368    channel: Channel,
369    semaphore: Arc<Semaphore>,
370    metrics: Option<RustRabbitMetrics>,
371    publisher: Publisher,
372}
373
374impl Consumer {
375    /// Create a new consumer
376    pub async fn new(
377        connection_manager: ConnectionManager,
378        options: ConsumerOptions,
379    ) -> Result<Self> {
380        let connection = connection_manager.get_connection().await?;
381        let channel = connection.create_channel().await?;
382
383        // Set QoS if prefetch_count is specified
384        if let Some(prefetch_count) = options.prefetch_count {
385            debug!("Setting prefetch_count: {}", prefetch_count);
386            channel
387                .basic_qos(
388                    prefetch_count,
389                    lapin::options::BasicQosOptions { global: false },
390                )
391                .await
392                .map_err(|e| {
393                    error!("Failed to set QoS prefetch_count={}: {}", prefetch_count, e);
394                    RabbitError::Connection(e)
395                })?;
396            debug!("Successfully set prefetch_count: {}", prefetch_count);
397        }
398
399        // Declare queue if auto_declare is enabled
400        if options.auto_declare_queue {
401            Self::declare_queue_and_exchange(&channel, &options).await?;
402        }
403
404        let semaphore = Arc::new(Semaphore::new(options.concurrency));
405
406        // Setup delayed exchange infrastructure if retry policy is configured
407        if options.retry_policy.is_some() {
408            Self::setup_retry_infrastructure(&connection_manager, &options).await?;
409        }
410
411        let publisher = Publisher::new(connection_manager.clone());
412
413        Ok(Self {
414            connection_manager,
415            options,
416            channel,
417            semaphore,
418            metrics: None,
419            publisher,
420        })
421    }
422
423    /// Set metrics for this consumer
424    pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
425        self.metrics = Some(metrics);
426    }
427
428    /// Consume messages using BaseConsumer trait with automatic retry handling
429    pub async fn consume_with_base_consumer<T, H>(&self, handler: Arc<H>) -> Result<()>
430    where
431        T: DeserializeOwned + Send + Sync + 'static,
432        H: BaseConsumer<T>,
433    {
434        let connection = self.connection_manager.get_connection().await?;
435        let channel = connection.create_channel().await?;
436        let publisher = Publisher::new(self.connection_manager.clone());
437
438        // Set up QoS if prefetch count is specified
439        if let Some(prefetch_count) = self.options.prefetch_count {
440            channel
441                .basic_qos(prefetch_count, BasicQosOptions::default())
442                .await?;
443        }
444
445        let semaphore = Arc::new(Semaphore::new(self.options.concurrency));
446
447        // Consume messages
448        let mut consumer = channel
449            .basic_consume(
450                &self.options.queue_name,
451                self.options.consumer_tag.as_deref().unwrap_or(""),
452                BasicConsumeOptions {
453                    no_local: false,
454                    no_ack: self.options.auto_ack,
455                    exclusive: self.options.exclusive,
456                    nowait: false,
457                },
458                self.options.arguments.clone(),
459            )
460            .await?;
461
462        info!(
463            "Started consuming from queue: {} with BaseConsumer",
464            self.options.queue_name
465        );
466
467        // Process messages
468        while let Some(delivery) = consumer.next().await {
469            let delivery = delivery?;
470            let permit = semaphore.clone().acquire_owned().await.map_err(|e| {
471                RabbitError::Generic(anyhow::anyhow!("Semaphore acquire error: {}", e))
472            })?;
473
474            let handler_clone = handler.clone();
475            let retry_policy = self.options.retry_policy.clone();
476            let dead_letter_exchange = self.options.dead_letter_exchange.clone();
477            let channel_clone = channel.clone();
478            let publisher_clone = publisher.clone();
479            let exchange_name = self
480                .options
481                .exchange_name
482                .clone()
483                .unwrap_or_else(|| self.options.queue_name.clone());
484
485            tokio::spawn(async move {
486                let _permit = permit;
487                if let Err(e) = Self::process_message_with_base_consumer(
488                    delivery,
489                    handler_clone,
490                    retry_policy,
491                    dead_letter_exchange,
492                    channel_clone,
493                    publisher_clone,
494                    exchange_name,
495                )
496                .await
497                {
498                    error!("Error processing message with BaseConsumer: {}", e);
499                }
500            });
501        }
502
503        Ok(())
504    }
505
506    /// Process a single message using BaseConsumer
507    async fn process_message_with_base_consumer<T, H>(
508        delivery: Delivery,
509        handler: Arc<H>,
510        retry_policy: Option<RetryPolicy>,
511        dead_letter_exchange: Option<String>,
512        channel: Channel,
513        publisher: Publisher,
514        exchange_name: String,
515    ) -> Result<()>
516    where
517        T: DeserializeOwned + Send + Sync,
518        H: BaseConsumer<T>,
519    {
520        let context = Self::build_message_context(&delivery);
521
522        // Deserialize message
523        let message: T = match serde_json::from_slice(&delivery.data) {
524            Ok(msg) => msg,
525            Err(e) => {
526                error!("Failed to deserialize message: {}", e);
527                Self::reject_message(&delivery, &channel, false).await?;
528                return Ok(());
529            }
530        };
531
532        // Handle message with BaseConsumer
533        match handler.handle(message, context.clone()).await {
534            Ok(()) => {
535                // Success - automatically ACK the message
536                Self::ack_message(&delivery, &channel).await?;
537                debug!(
538                    "Message processed successfully and ACK'd: {}",
539                    delivery.delivery_tag
540                );
541            }
542            Err(ProcessingError::Retryable {
543                message: error_msg,
544                custom_delay,
545            }) => {
546                // Retryable error - send to retry/delay exchange
547                if let Some(ref policy) = retry_policy {
548                    info!("Retryable error occurred: {}. Scheduling retry.", error_msg);
549
550                    // Use custom delay if specified, otherwise calculate from policy
551                    let delay = custom_delay
552                        .unwrap_or_else(|| policy.calculate_delay(context.retry_count + 1));
553
554                    Self::handle_retry_with_delay(
555                        &delivery,
556                        &channel,
557                        &context,
558                        policy,
559                        &publisher,
560                        &exchange_name,
561                        delay,
562                    )
563                    .await?;
564                } else {
565                    warn!(
566                        "Retryable error but no retry policy configured. Rejecting message: {}",
567                        error_msg
568                    );
569                    Self::reject_message(&delivery, &channel, false).await?;
570                }
571            }
572            Err(ProcessingError::NonRetryable {
573                message: error_msg,
574                send_to_dlq,
575            }) => {
576                // Non-retryable error
577                error!("Non-retryable error occurred: {}", error_msg);
578
579                if send_to_dlq {
580                    if let Some(ref dle) = dead_letter_exchange {
581                        Self::send_to_dead_letter(&delivery, dle, &context, &publisher).await?;
582                    } else {
583                        warn!("Error should go to DLQ but no dead letter exchange configured. Rejecting message.");
584                        Self::reject_message(&delivery, &channel, false).await?;
585                    }
586                } else {
587                    // Discard the message (reject without DLQ)
588                    info!(
589                        "Discarding message due to non-retryable error: {}",
590                        error_msg
591                    );
592                    Self::reject_message(&delivery, &channel, false).await?;
593                }
594            }
595        }
596
597        Ok(())
598    }
599
600    /// Handle retry with custom delay
601    async fn handle_retry_with_delay(
602        delivery: &Delivery,
603        channel: &Channel,
604        context: &MessageContext,
605        retry_policy: &RetryPolicy,
606        publisher: &Publisher,
607        exchange_name: &str,
608        delay: std::time::Duration,
609    ) -> Result<()> {
610        let max_retries = retry_policy.max_retries;
611        let current_retry = context.retry_count;
612
613        if current_retry >= max_retries {
614            warn!(
615                "Max retries ({}) exceeded for message, sending to dead letter",
616                max_retries
617            );
618
619            // Send to dead letter exchange if configured
620            if let Some(dlx) = &retry_policy.dead_letter_exchange {
621                Self::send_to_dead_letter(delivery, dlx, context, publisher).await?;
622            } else {
623                Self::reject_message(delivery, channel, false).await?;
624            }
625            return Ok(());
626        }
627
628        // Create delayed exchange name
629        let delayed_exchange_name = format!("{}.retry", exchange_name);
630
631        // Prepare message for retry with updated headers
632        let mut headers = delivery.properties.headers().clone().unwrap_or_default();
633        headers.insert(
634            "x-retry-count".into(),
635            lapin::types::AMQPValue::LongInt((current_retry + 1) as i32),
636        );
637        headers.insert(
638            "x-original-exchange".into(),
639            lapin::types::AMQPValue::LongString(exchange_name.into()),
640        );
641        headers.insert(
642            "x-original-routing-key".into(),
643            lapin::types::AMQPValue::LongString(delivery.routing_key.to_string().into()),
644        );
645
646        // Set delay
647        headers.insert(
648            "x-delay".into(),
649            lapin::types::AMQPValue::LongInt(delay.as_millis() as i32),
650        );
651
652        let properties = BasicProperties::default()
653            .with_content_type("application/json".into())
654            .with_delivery_mode(2)
655            .with_headers(headers);
656
657        // Publish to delay exchange
658        let connection = publisher.get_connection().await?;
659        let retry_channel = connection.create_channel().await?;
660
661        retry_channel
662            .basic_publish(
663                &delayed_exchange_name,
664                delivery.routing_key.as_str(), // Use original routing key as string
665                BasicPublishOptions::default(),
666                &delivery.data,
667                properties,
668            )
669            .await?;
670
671        // ACK the original message since we've scheduled retry
672        Self::ack_message(delivery, channel).await?;
673
674        info!(
675            "Message scheduled for retry #{} with delay {:?}ms",
676            current_retry + 1,
677            delay.as_millis()
678        );
679
680        Ok(())
681    }
682
683    /// Start consuming messages with the given handler (legacy MessageHandler trait)
684    pub async fn consume<T, H>(&self, handler: Arc<H>) -> Result<()>
685    where
686        T: DeserializeOwned + Send + Sync + 'static,
687        H: MessageHandler<T>,
688    {
689        let consumer_tag = self
690            .options
691            .consumer_tag
692            .clone()
693            .unwrap_or_else(|| format!("rust-rabbit-{}", uuid::Uuid::new_v4()));
694
695        let consume_options = BasicConsumeOptions {
696            no_local: false,
697            no_ack: self.options.auto_ack,
698            exclusive: self.options.exclusive,
699            nowait: false,
700        };
701
702        let mut consumer = self
703            .channel
704            .basic_consume(
705                &self.options.queue_name,
706                &consumer_tag,
707                consume_options,
708                self.options.arguments.clone(),
709            )
710            .await?;
711
712        info!(
713            "Started consuming from queue: {} with tag: {}",
714            self.options.queue_name, consumer_tag
715        );
716
717        while let Some(delivery) = consumer.next().await {
718            let delivery = delivery?;
719            let permit = self
720                .semaphore
721                .clone()
722                .acquire_owned()
723                .await
724                .map_err(|e| RabbitError::Generic(e.into()))?;
725
726            let handler = handler.clone();
727            let retry_policy = self.options.retry_policy.clone();
728            let dead_letter_exchange = self.options.dead_letter_exchange.clone();
729            let channel = self.channel.clone();
730            let publisher = self.publisher.clone();
731            let exchange_name = self
732                .options
733                .exchange_name
734                .clone()
735                .unwrap_or_else(|| self.options.queue_name.clone());
736
737            // Process message in a separate task
738            tokio::spawn(async move {
739                let _permit = permit; // Hold the permit for the duration of processing
740
741                if let Err(e) = Self::process_message::<T, H>(
742                    delivery,
743                    handler,
744                    retry_policy,
745                    dead_letter_exchange,
746                    channel,
747                    publisher,
748                    exchange_name,
749                )
750                .await
751                {
752                    error!("Error processing message: {}", e);
753                }
754            });
755        }
756
757        warn!(
758            "Consumer stream ended for queue: {}",
759            self.options.queue_name
760        );
761        Ok(())
762    }
763
764    /// Process a single message
765    async fn process_message<T, H>(
766        delivery: Delivery,
767        handler: Arc<H>,
768        retry_policy: Option<RetryPolicy>,
769        dead_letter_exchange: Option<String>,
770        channel: Channel,
771        publisher: Publisher,
772        exchange_name: String,
773    ) -> Result<()>
774    where
775        T: DeserializeOwned + Send + Sync,
776        H: MessageHandler<T>,
777    {
778        let context = Self::build_message_context(&delivery);
779
780        // Deserialize message
781        let message: T = match serde_json::from_slice(&delivery.data) {
782            Ok(msg) => msg,
783            Err(e) => {
784                error!("Failed to deserialize message: {}", e);
785                Self::reject_message(&delivery, &channel, false).await?;
786                return Ok(());
787            }
788        };
789
790        // Handle message
791        let result = handler.handle(message, context.clone()).await;
792
793        match result {
794            MessageResult::Ack => {
795                Self::ack_message(&delivery, &channel).await?;
796                debug!("Message acknowledged: {}", delivery.delivery_tag);
797            }
798            MessageResult::Retry => {
799                if let Some(ref policy) = retry_policy {
800                    Self::handle_retry(
801                        &delivery,
802                        &channel,
803                        &context,
804                        policy,
805                        &publisher,
806                        &exchange_name,
807                    )
808                    .await?;
809                } else {
810                    Self::reject_message(&delivery, &channel, true).await?;
811                }
812            }
813            MessageResult::Reject => {
814                if let Some(ref dle) = dead_letter_exchange {
815                    Self::send_to_dead_letter(&delivery, dle, &context, &publisher).await?;
816                } else {
817                    Self::reject_message(&delivery, &channel, false).await?;
818                }
819            }
820            MessageResult::Requeue => {
821                Self::reject_message(&delivery, &channel, true).await?;
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Build message context from delivery
829    fn build_message_context(delivery: &Delivery) -> MessageContext {
830        let properties = &delivery.properties;
831
832        MessageContext {
833            message_id: properties.message_id().as_ref().map(|s| s.to_string()),
834            correlation_id: properties.correlation_id().as_ref().map(|s| s.to_string()),
835            reply_to: properties.reply_to().as_ref().map(|s| s.to_string()),
836            delivery_tag: delivery.delivery_tag,
837            redelivered: delivery.redelivered,
838            exchange: delivery.exchange.to_string(),
839            routing_key: delivery.routing_key.to_string(),
840            headers: properties.headers().clone().unwrap_or_default(),
841            timestamp: *properties.timestamp(),
842            retry_count: Self::get_retry_count_from_headers(
843                properties
844                    .headers()
845                    .as_ref()
846                    .unwrap_or(&FieldTable::default()),
847            ),
848        }
849    }
850
851    /// Get retry count from message headers
852    fn get_retry_count_from_headers(headers: &FieldTable) -> u32 {
853        headers
854            .inner()
855            .get("x-retry-count")
856            .and_then(|v| match v {
857                lapin::types::AMQPValue::LongInt(count) => Some(*count as u32),
858                lapin::types::AMQPValue::LongLongInt(count) => Some(*count as u32),
859                _ => None,
860            })
861            .unwrap_or(0)
862    }
863
864    /// Acknowledge a message
865    async fn ack_message(delivery: &Delivery, channel: &Channel) -> Result<()> {
866        channel
867            .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
868            .await?;
869        Ok(())
870    }
871
872    /// Reject a message
873    async fn reject_message(delivery: &Delivery, channel: &Channel, requeue: bool) -> Result<()> {
874        channel
875            .basic_nack(
876                delivery.delivery_tag,
877                BasicNackOptions {
878                    multiple: false,
879                    requeue,
880                },
881            )
882            .await?;
883        Ok(())
884    }
885
886    /// Handle retry logic
887    async fn handle_retry(
888        delivery: &Delivery,
889        channel: &Channel,
890        context: &MessageContext,
891        retry_policy: &RetryPolicy,
892        publisher: &Publisher,
893        exchange_name: &str,
894    ) -> Result<()> {
895        if context.retry_count >= retry_policy.max_retries {
896            warn!(
897                "Max retries exceeded for message: {}",
898                delivery.delivery_tag
899            );
900
901            // Send to dead letter exchange if configured
902            if let Some(ref dle) = retry_policy.dead_letter_exchange {
903                Self::send_to_dead_letter(delivery, dle, context, publisher).await?;
904            } else {
905                Self::reject_message(delivery, channel, false).await?;
906            }
907            return Ok(());
908        }
909
910        // Calculate delay for next retry
911        let delay = retry_policy.calculate_delay(context.retry_count);
912        let delayed_exchange_name = format!("{}.retry", exchange_name);
913
914        // Create retry message with updated headers
915        let mut headers = delivery.properties.headers().clone().unwrap_or_default();
916        headers.insert(
917            "x-retry-count".into(),
918            lapin::types::AMQPValue::LongInt((context.retry_count + 1) as i32),
919        );
920        headers.insert(
921            "x-original-queue".into(),
922            lapin::types::AMQPValue::LongString(context.routing_key.clone().into()),
923        );
924
925        // Build properties with delay header for delayed message exchange
926        let mut properties = BasicProperties::default()
927            .with_content_type("application/json".into())
928            .with_delivery_mode(2)
929            .with_headers(headers);
930
931        // Add delay header for delayed message exchange
932        let mut delay_headers = properties.headers().clone().unwrap_or_default();
933        delay_headers.insert(
934            "x-delay".into(),
935            lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
936        );
937        properties = properties.with_headers(delay_headers);
938
939        // Publish to delayed exchange using channel
940        channel
941            .basic_publish(
942                &delayed_exchange_name,
943                &context.routing_key,
944                BasicPublishOptions::default(),
945                &delivery.data,
946                properties,
947            )
948            .await?;
949
950        info!(
951            "Retrying message after {:?} (attempt {})",
952            delay,
953            context.retry_count + 1
954        );
955
956        // Acknowledge the original message since we've republished it
957        Self::ack_message(delivery, channel).await?;
958
959        Ok(())
960    }
961
962    /// Send message to dead letter exchange
963    async fn send_to_dead_letter(
964        delivery: &Delivery,
965        dead_letter_exchange: &str,
966        _context: &MessageContext,
967        publisher: &Publisher,
968    ) -> Result<()> {
969        // Create dead letter message with additional headers
970        let mut headers = delivery.properties.headers().clone().unwrap_or_default();
971        headers.insert(
972            "x-death-reason".into(),
973            lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
974        );
975        headers.insert(
976            "x-death-time".into(),
977            lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp_millis()),
978        );
979
980        // Build properties for dead letter message
981        let properties = BasicProperties::default()
982            .with_content_type("application/json".into())
983            .with_delivery_mode(2)
984            .with_headers(headers);
985
986        // Get connection and publish to dead letter exchange
987        let connection = publisher.get_connection().await?;
988        let dlx_channel = connection.create_channel().await?;
989
990        dlx_channel
991            .basic_publish(
992                dead_letter_exchange,
993                "dead-letter", // routing key for dead letter
994                BasicPublishOptions::default(),
995                &delivery.data,
996                properties,
997            )
998            .await?;
999
1000        warn!(
1001            "Sent message to dead letter exchange: {}",
1002            dead_letter_exchange
1003        );
1004
1005        Ok(())
1006    }
1007
1008    /// Stop consuming (close the consumer)
1009    pub async fn stop(&self) -> Result<()> {
1010        // The consumer will stop when the channel is closed
1011        // or when the stream ends
1012        info!("Stopping consumer for queue: {}", self.options.queue_name);
1013        Ok(())
1014    }
1015
1016    /// Declare queue and optionally exchange with binding
1017    async fn declare_queue_and_exchange(
1018        channel: &Channel,
1019        options: &ConsumerOptions,
1020    ) -> Result<()> {
1021        // First declare the queue
1022        let queue_options = LapinQueueDeclareOptions {
1023            passive: options.queue_options.passive,
1024            durable: options.queue_options.durable,
1025            exclusive: options.queue_options.exclusive,
1026            auto_delete: options.queue_options.auto_delete,
1027            nowait: false,
1028        };
1029
1030        channel
1031            .queue_declare(
1032                &options.queue_name,
1033                queue_options,
1034                options.queue_options.arguments.clone(),
1035            )
1036            .await?;
1037
1038        debug!("Declared queue: {}", options.queue_name);
1039
1040        // Declare exchange and bind if auto_declare_exchange is enabled
1041        if options.auto_declare_exchange {
1042            let exchange_name = options
1043                .exchange_name
1044                .as_ref()
1045                .unwrap_or(&options.queue_name);
1046
1047            // Declare exchange
1048            let exchange_options = ExchangeDeclareOptions {
1049                passive: options.exchange_options.passive,
1050                durable: options.exchange_options.durable,
1051                auto_delete: options.exchange_options.auto_delete,
1052                internal: options.exchange_options.internal,
1053                nowait: false,
1054            };
1055
1056            // Handle delayed message exchange if needed
1057            let mut arguments = options.exchange_options.arguments.clone();
1058            if matches!(options.exchange_options.exchange_type, ExchangeKind::Custom(ref kind) if kind == "x-delayed-message")
1059            {
1060                arguments.insert(
1061                    "x-delayed-type".into(),
1062                    lapin::types::AMQPValue::LongString(
1063                        match options.exchange_options.original_type {
1064                            ExchangeKind::Direct => "direct".into(),
1065                            ExchangeKind::Fanout => "fanout".into(),
1066                            ExchangeKind::Topic => "topic".into(),
1067                            ExchangeKind::Headers => "headers".into(),
1068                            ExchangeKind::Custom(ref s) => s.clone().into(),
1069                        },
1070                    ),
1071                );
1072            }
1073
1074            channel
1075                .exchange_declare(
1076                    exchange_name,
1077                    options.exchange_options.exchange_type.clone(),
1078                    exchange_options,
1079                    arguments,
1080                )
1081                .await?;
1082
1083            debug!("Declared exchange: {}", exchange_name);
1084
1085            // Bind queue to exchange
1086            let routing_key = options.routing_key.as_ref().unwrap_or(&options.queue_name);
1087
1088            channel
1089                .queue_bind(
1090                    &options.queue_name,
1091                    exchange_name,
1092                    routing_key,
1093                    QueueBindOptions::default(),
1094                    FieldTable::default(),
1095                )
1096                .await?;
1097
1098            debug!(
1099                "Bound queue '{}' to exchange '{}' with routing key '{}'",
1100                options.queue_name, exchange_name, routing_key
1101            );
1102        }
1103
1104        Ok(())
1105    }
1106
1107    /// Setup retry infrastructure (delayed exchange) if retry policy is configured
1108    async fn setup_retry_infrastructure(
1109        connection_manager: &ConnectionManager,
1110        options: &ConsumerOptions,
1111    ) -> Result<()> {
1112        if let Some(ref retry_policy) = options.retry_policy {
1113            // Create delayed exchange name
1114            let delayed_exchange_name = format!(
1115                "{}.retry",
1116                options
1117                    .exchange_name
1118                    .as_ref()
1119                    .unwrap_or(&options.queue_name)
1120            );
1121
1122            // Create DelayedMessageExchange instance and setup infrastructure
1123            let delayed_exchange = DelayedMessageExchange::new(
1124                connection_manager.clone(),
1125                delayed_exchange_name.clone(),
1126                retry_policy.clone(),
1127            );
1128
1129            // Setup the delayed exchange and dead letter infrastructure
1130            delayed_exchange.setup().await?;
1131
1132            // Setup queue binding for retry mechanism
1133            delayed_exchange
1134                .setup_queue_retry(&options.queue_name)
1135                .await?;
1136
1137            debug!(
1138                "Setup retry infrastructure for queue: {} with delayed exchange: {}",
1139                options.queue_name, delayed_exchange_name
1140            );
1141        }
1142
1143        Ok(())
1144    }
1145}
1146
1147// Example message handler implementation
1148pub struct SimpleMessageHandler<F, T>
1149where
1150    F: Fn(T, MessageContext) -> MessageResult + Send + Sync,
1151    T: DeserializeOwned + Send + Sync,
1152{
1153    handler_fn: F,
1154    _phantom: std::marker::PhantomData<T>,
1155}
1156
1157impl<F, T> SimpleMessageHandler<F, T>
1158where
1159    F: Fn(T, MessageContext) -> MessageResult + Send + Sync + 'static,
1160    T: DeserializeOwned + Send + Sync + 'static,
1161{
1162    pub fn new(handler_fn: F) -> Self {
1163        Self {
1164            handler_fn,
1165            _phantom: std::marker::PhantomData,
1166        }
1167    }
1168}
1169
1170#[async_trait]
1171impl<F, T> MessageHandler<T> for SimpleMessageHandler<F, T>
1172where
1173    F: Fn(T, MessageContext) -> MessageResult + Send + Sync + 'static,
1174    T: DeserializeOwned + Send + Sync + 'static,
1175{
1176    async fn handle(&self, message: T, context: MessageContext) -> MessageResult {
1177        (self.handler_fn)(message, context)
1178    }
1179}