rust_rabbit/
consumer.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{ErrorType, MessageEnvelope, WireMessage},
5    retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9    options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10    types::{AMQPValue, FieldTable},
11    BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Semaphore;
18use tracing::{debug, error, warn};
19
20/// Message wrapper with retry tracking
21#[derive(Debug)]
22pub struct Message<T>
23where
24    T: Clone,
25{
26    pub data: T,
27    pub retry_attempt: u32,
28    tag: u64,
29    channel: Arc<Channel>,
30}
31
32impl<T> Clone for Message<T>
33where
34    T: Clone,
35{
36    fn clone(&self) -> Self {
37        Self {
38            data: self.data.clone(),
39            retry_attempt: self.retry_attempt,
40            tag: self.tag,
41            channel: Arc::clone(&self.channel),
42        }
43    }
44}
45
46impl<T> Message<T>
47where
48    T: Clone,
49{
50    /// Acknowledge the message
51    pub async fn ack(&self) -> Result<(), RustRabbitError> {
52        self.channel
53            .basic_ack(self.tag, BasicAckOptions::default())
54            .await
55            .map_err(RustRabbitError::from)
56    }
57
58    /// Reject and requeue the message
59    pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
60        self.channel
61            .basic_nack(
62                self.tag,
63                lapin::options::BasicNackOptions {
64                    multiple: false,
65                    requeue,
66                },
67            )
68            .await
69            .map_err(RustRabbitError::from)
70    }
71}
72
73/// Consumer configuration builder
74pub struct ConsumerBuilder {
75    connection: Arc<Connection>,
76    queue_name: String,
77    exchange_name: Option<String>,
78    routing_key: Option<String>,
79    retry_config: Option<RetryConfig>,
80    prefetch_count: Option<u16>,
81    auto_ack: bool,
82}
83
84impl ConsumerBuilder {
85    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
86        Self {
87            connection,
88            queue_name: queue_name.into(),
89            exchange_name: None,
90            routing_key: None,
91            retry_config: None,
92            prefetch_count: Some(10),
93            auto_ack: true,
94        }
95    }
96
97    /// Bind to an exchange with routing key
98    pub fn bind_to_exchange(
99        mut self,
100        exchange: impl Into<String>,
101        routing_key: impl Into<String>,
102    ) -> Self {
103        self.exchange_name = Some(exchange.into());
104        self.routing_key = Some(routing_key.into());
105        self
106    }
107
108    /// Set routing key (for use with bind_to_exchange)
109    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
110        self.routing_key = Some(routing_key.into());
111        self
112    }
113
114    /// Configure retry behavior
115    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
116        self.retry_config = Some(retry_config);
117        self
118    }
119
120    /// Set TTL for dead letter queue (auto-cleanup failed messages)
121    /// This is a convenience method that modifies the retry_config if it exists
122    ///
123    /// # Example
124    /// ```ignore
125    /// let consumer = Consumer::builder(connection, "orders")
126    ///     .with_retry(RetryConfig::exponential_default())
127    ///     .with_dlq_ttl(Duration::from_secs(86400))  // 1 day
128    ///     .build();
129    /// ```
130    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
131        if let Some(retry_config) = self.retry_config.as_mut() {
132            retry_config.dlq_ttl = Some(ttl);
133        }
134        self
135    }
136
137    /// Set prefetch count
138    pub fn with_prefetch(mut self, count: u16) -> Self {
139        self.prefetch_count = Some(count);
140        self
141    }
142
143    /// Disable auto-acknowledge (manual ack required)
144    pub fn manual_ack(mut self) -> Self {
145        self.auto_ack = false;
146        self
147    }
148
149    /// Build the consumer
150    pub fn build(self) -> Consumer {
151        Consumer {
152            connection: self.connection,
153            queue_name: self.queue_name,
154            exchange_name: self.exchange_name,
155            routing_key: self.routing_key,
156            retry_config: self.retry_config,
157            prefetch_count: self.prefetch_count.unwrap_or(10),
158            auto_ack: self.auto_ack,
159        }
160    }
161}
162
163/// Simplified Consumer for message consumption
164pub struct Consumer {
165    connection: Arc<Connection>,
166    queue_name: String,
167    exchange_name: Option<String>,
168    routing_key: Option<String>,
169    retry_config: Option<RetryConfig>,
170    prefetch_count: u16,
171    auto_ack: bool,
172}
173
174impl Consumer {
175    /// Create a new consumer builder
176    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
177        ConsumerBuilder::new(connection, queue_name)
178    }
179
180    /// Create retry queue with TTL
181    async fn create_retry_queue(
182        &self,
183        channel: &Channel,
184        retry_attempt: u32,
185        delay: std::time::Duration,
186    ) -> Result<String, RustRabbitError> {
187        let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
188        let delay_ms = delay.as_millis() as i64;
189
190        // Create retry queue with TTL that routes back to original queue
191        let mut args = FieldTable::default();
192        args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
193        args.insert(
194            "x-dead-letter-exchange".into(),
195            AMQPValue::LongString("".into()),
196        ); // Default exchange
197        args.insert(
198            "x-dead-letter-routing-key".into(),
199            AMQPValue::LongString(self.queue_name.clone().into()),
200        );
201
202        channel
203            .queue_declare(
204                &retry_queue_name,
205                QueueDeclareOptions {
206                    durable: true,
207                    ..Default::default()
208                },
209                args,
210            )
211            .await?;
212
213        debug!(
214            "Created retry queue: {} with TTL: {}ms",
215            retry_queue_name, delay_ms
216        );
217        Ok(retry_queue_name)
218    }
219
220    /// Create DLQ (Dead Letter Queue) with optional TTL
221    async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
222        let dlq_name = format!("{}.dlq", self.queue_name);
223
224        // Build queue arguments with optional TTL
225        let mut args = FieldTable::default();
226        if let Some(retry_config) = &self.retry_config {
227            if let Some(ttl) = &retry_config.dlq_ttl {
228                let ttl_ms = ttl.as_millis() as i64;
229                args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
230                debug!("DLQ TTL: {}ms", ttl_ms);
231            }
232        }
233
234        channel
235            .queue_declare(
236                &dlq_name,
237                QueueDeclareOptions {
238                    durable: true,
239                    ..Default::default()
240                },
241                args,
242            )
243            .await?;
244
245        debug!("Created DLQ: {}", dlq_name);
246        Ok(dlq_name)
247    }
248
249    /// Send message to retry queue with delay
250    async fn send_to_retry_queue(
251        &self,
252        channel: &Channel,
253        message_data: &[u8],
254        retry_attempt: u32,
255        delay: std::time::Duration,
256    ) -> Result<(), RustRabbitError> {
257        let retry_queue_name = self
258            .create_retry_queue(channel, retry_attempt, delay)
259            .await?;
260
261        // Publish to retry queue
262        channel
263            .basic_publish(
264                "", // Default exchange
265                &retry_queue_name,
266                BasicPublishOptions::default(),
267                message_data,
268                BasicProperties::default()
269                    .with_content_type("application/json".into())
270                    .with_delivery_mode(2), // Persistent
271            )
272            .await?
273            .await?;
274
275        debug!("Sent message to retry queue: {}", retry_queue_name);
276        Ok(())
277    }
278
279    /// Send message to DLQ
280    async fn send_to_dlq_simple(
281        &self,
282        channel: &Channel,
283        message_data: &[u8],
284    ) -> Result<(), RustRabbitError> {
285        let dlq_name = self.create_dlq(channel).await?;
286
287        // Publish to DLQ
288        channel
289            .basic_publish(
290                "", // Default exchange
291                &dlq_name,
292                BasicPublishOptions::default(),
293                message_data,
294                BasicProperties::default()
295                    .with_content_type("application/json".into())
296                    .with_delivery_mode(2), // Persistent
297            )
298            .await?
299            .await?;
300
301        debug!("Sent message to DLQ: {}", dlq_name);
302        Ok(())
303    }
304
305    /// Create delay exchange using RabbitMQ delayed message exchange plugin
306    /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
307    async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
308        if let Some(retry_config) = &self.retry_config {
309            let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
310
311            // Declare delay exchange with x-delayed-type argument
312            let mut args = FieldTable::default();
313            args.insert(
314                "x-delayed-type".into(),
315                AMQPValue::LongString("direct".into()),
316            );
317
318            channel
319                .exchange_declare(
320                    &delay_exchange,
321                    lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
322                    lapin::options::ExchangeDeclareOptions {
323                        durable: true,
324                        ..Default::default()
325                    },
326                    args,
327                )
328                .await?;
329
330            debug!(
331                "Created delay exchange: {} (x-delayed-message type)",
332                delay_exchange
333            );
334            Ok(delay_exchange)
335        } else {
336            Err(RustRabbitError::Retry(
337                "Retry config not configured".to_string(),
338            ))
339        }
340    }
341
342    /// Send message to delay exchange with x-delay header for retry
343    /// Message will be automatically routed back to the original queue after delay
344    async fn send_to_delay_exchange(
345        &self,
346        channel: &Channel,
347        message_data: &[u8],
348        delay: std::time::Duration,
349    ) -> Result<(), RustRabbitError> {
350        let delay_exchange = self.create_delay_exchange(channel).await?;
351        let delay_ms = delay.as_millis() as i64;
352
353        // Publish to delay exchange with x-delay header
354        // The message will be re-delivered to original queue after delay
355        channel
356            .basic_publish(
357                &delay_exchange,
358                &self.queue_name, // Routing key: original queue name
359                BasicPublishOptions::default(),
360                message_data,
361                BasicProperties::default()
362                    .with_content_type("application/json".into())
363                    .with_delivery_mode(2) // Persistent
364                    .with_headers({
365                        let mut headers = FieldTable::default();
366                        headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
367                        headers
368                    }),
369            )
370            .await?
371            .await?;
372
373        debug!(
374            "Sent message to delay exchange: {} with delay: {}ms",
375            delay_exchange, delay_ms
376        );
377        Ok(())
378    }
379
380    /// Start consuming messages
381    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
382    where
383        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
384        H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
385        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
386    {
387        let channel = self.connection.create_channel().await?;
388
389        // Set prefetch count
390        channel
391            .basic_qos(
392                self.prefetch_count,
393                lapin::options::BasicQosOptions::default(),
394            )
395            .await?;
396
397        // Setup infrastructure (queues, exchanges)
398        self.setup_infrastructure(&channel).await?;
399
400        // Start consuming
401        let mut consumer = channel
402            .basic_consume(
403                &self.queue_name,
404                "",
405                BasicConsumeOptions::default(),
406                FieldTable::default(),
407            )
408            .await?;
409
410        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
411
412        debug!("Started consuming from queue: {}", self.queue_name);
413
414        // Process messages
415        while let Some(delivery_result) = consumer.next().await {
416            let delivery = delivery_result?;
417            let permit = semaphore.clone().acquire_owned().await.unwrap();
418            let handler_clone = handler.clone();
419            let auto_ack = self.auto_ack;
420            let channel_clone = Arc::new(channel.clone());
421            let retry_config = self.retry_config.clone();
422            let consumer_self = Consumer {
423                connection: self.connection.clone(),
424                queue_name: self.queue_name.clone(),
425                exchange_name: self.exchange_name.clone(),
426                routing_key: self.routing_key.clone(),
427                retry_config: self.retry_config.clone(),
428                prefetch_count: self.prefetch_count,
429                auto_ack: self.auto_ack,
430            };
431
432            tokio::spawn(async move {
433                let _permit = permit;
434
435                // Deserialize as WireMessage format
436                match serde_json::from_slice::<crate::message::WireMessage<T>>(&delivery.data) {
437                    Ok(wire_msg) => {
438                        let message = Message {
439                            data: wire_msg.data,
440                            retry_attempt: wire_msg.retry_attempt,
441                            tag: delivery.delivery_tag,
442                            channel: channel_clone.clone(),
443                        };
444
445                        // Process message
446                        match handler_clone(message.clone()).await {
447                            Ok(()) => {
448                                if auto_ack {
449                                    if let Err(e) = message.ack().await {
450                                        error!("Failed to ack message: {}", e);
451                                    }
452                                }
453                                debug!("Message processed successfully");
454                            }
455                            Err(e) => {
456                                error!("Handler error: {}", e);
457                                if auto_ack {
458                                    // Check if retry is configured
459                                    if let Some(retry_cfg) = &retry_config {
460                                        if message.retry_attempt < retry_cfg.max_retries {
461                                            // Calculate delay for next retry
462                                            if let Some(delay) =
463                                                retry_cfg.calculate_delay(message.retry_attempt)
464                                            {
465                                                warn!(
466                                                    "Scheduling retry {} with delay {:?} for message",
467                                                    message.retry_attempt + 1,
468                                                    delay
469                                                );
470
471                                                // Update retry attempt in wire message
472                                                let wire_msg = WireMessage {
473                                                    data: message.data.clone(),
474                                                    retry_attempt: message.retry_attempt + 1,
475                                                };
476
477                                                let retry_payload =
478                                                    match serde_json::to_vec(&wire_msg) {
479                                                        Ok(payload) => payload,
480                                                        Err(e) => {
481                                                            error!(
482                                                            "Failed to serialize retry message: {}",
483                                                            e
484                                                        );
485                                                            if let Err(e) =
486                                                                message.nack(false).await
487                                                            {
488                                                                error!(
489                                                                    "Failed to nack message: {}",
490                                                                    e
491                                                                );
492                                                            }
493                                                            return;
494                                                        }
495                                                    };
496
497                                                // Send via appropriate strategy (TTL or DelayedExchange)
498                                                let send_result = if matches!(
499                                                    retry_cfg.delay_strategy,
500                                                    crate::retry::DelayStrategy::DelayedExchange
501                                                ) {
502                                                    consumer_self
503                                                        .send_to_delay_exchange(
504                                                            &channel_clone,
505                                                            &retry_payload,
506                                                            delay,
507                                                        )
508                                                        .await
509                                                } else {
510                                                    consumer_self
511                                                        .send_to_retry_queue(
512                                                            &channel_clone,
513                                                            &retry_payload,
514                                                            message.retry_attempt + 1,
515                                                            delay,
516                                                        )
517                                                        .await
518                                                };
519
520                                                if let Err(e) = send_result {
521                                                    error!("Failed to send retry message: {}", e);
522                                                    if let Err(e) = message.nack(false).await {
523                                                        error!("Failed to nack message: {}", e);
524                                                    }
525                                                    return;
526                                                }
527
528                                                // ACK original message (it's now queued for retry)
529                                                if let Err(e) = message.ack().await {
530                                                    error!(
531                                                        "Failed to ack message after retry: {}",
532                                                        e
533                                                    );
534                                                }
535                                            } else {
536                                                // No more retries, send to DLQ
537                                                warn!("Retry exhausted, sending to DLQ");
538                                                if let Err(e) = consumer_self
539                                                    .send_to_dlq_simple(
540                                                        &channel_clone,
541                                                        &delivery.data,
542                                                    )
543                                                    .await
544                                                {
545                                                    error!("Failed to send to DLQ: {}", e);
546                                                }
547                                                if let Err(e) = message.ack().await {
548                                                    error!(
549                                                        "Failed to ack message after DLQ: {}",
550                                                        e
551                                                    );
552                                                }
553                                            }
554                                        } else {
555                                            // Retry exhausted, send to DLQ
556                                            warn!("Max retries reached, sending to DLQ");
557                                            if let Err(e) = consumer_self
558                                                .send_to_dlq_simple(&channel_clone, &delivery.data)
559                                                .await
560                                            {
561                                                error!("Failed to send to DLQ: {}", e);
562                                            }
563                                            if let Err(e) = message.ack().await {
564                                                error!("Failed to ack message after DLQ: {}", e);
565                                            }
566                                        }
567                                    } else {
568                                        // No retry config, just nack
569                                        if let Err(e) = message.nack(false).await {
570                                            error!("Failed to nack message: {}", e);
571                                        }
572                                    }
573                                }
574                            }
575                        }
576                    }
577                    Err(e) => {
578                        error!("Failed to deserialize message: {}", e);
579                        if auto_ack {
580                            // Reject malformed messages
581                            if let Err(e) = channel_clone
582                                .basic_nack(
583                                    delivery.delivery_tag,
584                                    lapin::options::BasicNackOptions {
585                                        multiple: false,
586                                        requeue: false,
587                                    },
588                                )
589                                .await
590                            {
591                                error!("Failed to nack malformed message: {}", e);
592                            }
593                        }
594                    }
595                }
596            });
597        }
598
599        Ok(())
600    }
601
602    /// Setup queue and exchange infrastructure
603    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
604        // Declare queue
605        channel
606            .queue_declare(
607                &self.queue_name,
608                QueueDeclareOptions {
609                    durable: true,
610                    ..Default::default()
611                },
612                FieldTable::default(),
613            )
614            .await?;
615
616        // Bind to exchange if specified
617        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
618            channel
619                .queue_bind(
620                    &self.queue_name,
621                    exchange,
622                    routing_key,
623                    lapin::options::QueueBindOptions::default(),
624                    FieldTable::default(),
625                )
626                .await?;
627        }
628
629        // Setup delay exchange if using DelayedExchange strategy
630        if let Some(retry_config) = &self.retry_config {
631            if matches!(
632                retry_config.delay_strategy,
633                crate::retry::DelayStrategy::DelayedExchange
634            ) {
635                let delay_exchange = self.create_delay_exchange(channel).await?;
636
637                // Bind delay exchange to original queue
638                channel
639                    .queue_bind(
640                        &self.queue_name,
641                        &delay_exchange,
642                        &self.queue_name, // Routing key: original queue name
643                        lapin::options::QueueBindOptions::default(),
644                        FieldTable::default(),
645                    )
646                    .await?;
647
648                debug!(
649                    "Bound queue {} to delay exchange {}",
650                    self.queue_name, delay_exchange
651                );
652            }
653        }
654
655        Ok(())
656    }
657
658    /// Start consuming message envelopes with full retry support
659    pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
660    where
661        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
662        H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
663        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
664    {
665        let channel = self.connection.create_channel().await?;
666        let retry_config = self.retry_config.clone();
667
668        // Set prefetch count
669        channel
670            .basic_qos(
671                self.prefetch_count,
672                lapin::options::BasicQosOptions::default(),
673            )
674            .await?;
675
676        // Setup queue and exchange
677        self.setup_infrastructure(&channel).await?;
678
679        // Create consumer
680        let mut consumer = channel
681            .basic_consume(
682                &self.queue_name,
683                "rust-rabbit-envelope-consumer",
684                BasicConsumeOptions::default(),
685                FieldTable::default(),
686            )
687            .await?;
688
689        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
690
691        debug!(
692            "Started consuming envelopes from queue: {}",
693            self.queue_name
694        );
695
696        // Process message envelopes with retry support
697        while let Some(delivery_result) = consumer.next().await {
698            let delivery = delivery_result?;
699            let permit = semaphore.clone().acquire_owned().await.unwrap();
700            let handler_clone = handler.clone();
701            let auto_ack = self.auto_ack;
702            let channel_clone = Arc::new(channel.clone());
703            let retry_config_clone = retry_config.clone();
704            let queue_name = self.queue_name.clone();
705            let connection = self.connection.clone();
706
707            tokio::spawn(async move {
708                let _permit = permit;
709
710                // Try to deserialize as MessageEnvelope
711                match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
712                    Ok(mut envelope) => {
713                        debug!(
714                            "Processing envelope {} (attempt {}/{})",
715                            envelope.metadata.message_id,
716                            envelope.metadata.retry_attempt + 1,
717                            envelope.metadata.max_retries + 1
718                        );
719
720                        // Process message
721                        match handler_clone(envelope.clone()).await {
722                            Ok(()) => {
723                                if auto_ack {
724                                    if let Err(e) = channel_clone
725                                        .basic_ack(
726                                            delivery.delivery_tag,
727                                            BasicAckOptions::default(),
728                                        )
729                                        .await
730                                    {
731                                        error!("Failed to ack message: {}", e);
732                                    }
733                                }
734                                debug!(
735                                    "Envelope {} processed successfully",
736                                    envelope.metadata.message_id
737                                );
738                            }
739                            Err(e) => {
740                                error!(
741                                    "Handler error for envelope {}: {}",
742                                    envelope.metadata.message_id, e
743                                );
744
745                                // Determine error type (simplified classification)
746                                let error_type = classify_error(e.as_ref());
747
748                                // Add error to envelope
749                                envelope = envelope.with_error(
750                                    &e.to_string(),
751                                    error_type,
752                                    Some(&format!("Queue: {}", queue_name)),
753                                );
754
755                                if auto_ack {
756                                    // Check if we should retry
757                                    if let Some(retry_cfg) = &retry_config_clone {
758                                        if !envelope.is_retry_exhausted() {
759                                            // Calculate delay and schedule retry
760                                            if let Some(delay) = retry_cfg
761                                                .calculate_delay(envelope.metadata.retry_attempt)
762                                            {
763                                                warn!(
764                                                    "Scheduling retry {} for envelope {} with delay {:?}",
765                                                    envelope.metadata.retry_attempt + 1,
766                                                    envelope.metadata.message_id,
767                                                    delay
768                                                );
769
770                                                // Increment retry attempt in envelope
771                                                envelope.metadata.retry_attempt += 1;
772
773                                                // Serialize updated envelope
774                                                match serde_json::to_vec(&envelope) {
775                                                    Ok(retry_payload) => {
776                                                        // Create consumer instance for access to methods
777                                                        let consumer_self = Consumer {
778                                                            connection: connection.clone(),
779                                                            queue_name: queue_name.clone(),
780                                                            exchange_name: None,
781                                                            routing_key: None,
782                                                            retry_config: retry_config_clone
783                                                                .clone(),
784                                                            prefetch_count: 10,
785                                                            auto_ack: true,
786                                                        };
787
788                                                        // Send via appropriate strategy (TTL or DelayedExchange)
789                                                        let send_result = if matches!(
790                                                            retry_config_clone
791                                                                .as_ref()
792                                                                .map(|c| c.delay_strategy),
793                                                            Some(crate::retry::DelayStrategy::DelayedExchange)
794                                                        ) {
795                                                            consumer_self
796                                                                .send_to_delay_exchange(
797                                                                    &channel_clone,
798                                                                    &retry_payload,
799                                                                    delay,
800                                                                )
801                                                                .await
802                                                        } else {
803                                                            consumer_self
804                                                                .send_to_retry_queue(
805                                                                    &channel_clone,
806                                                                    &retry_payload,
807                                                                    envelope.metadata.retry_attempt,
808                                                                    delay,
809                                                                )
810                                                                .await
811                                                        };
812
813                                                        if let Err(e) = send_result {
814                                                            error!("Failed to send envelope for retry: {}", e);
815                                                            // Fallback to simple nack
816                                                            if let Err(e) = channel_clone
817                                                                .basic_nack(
818                                                                    delivery.delivery_tag,
819                                                                    lapin::options::BasicNackOptions {
820                                                                        multiple: false,
821                                                                        requeue: false,
822                                                                    },
823                                                                )
824                                                                .await
825                                                            {
826                                                                error!("Failed to nack message: {}", e);
827                                                            }
828                                                            return;
829                                                        }
830
831                                                        // ACK original message (it's now queued for retry)
832                                                        if let Err(e) = channel_clone
833                                                            .basic_ack(
834                                                                delivery.delivery_tag,
835                                                                BasicAckOptions::default(),
836                                                            )
837                                                            .await
838                                                        {
839                                                            error!("Failed to ack message after retry: {}", e);
840                                                        }
841                                                    }
842                                                    Err(e) => {
843                                                        error!("Failed to serialize envelope for retry: {}", e);
844                                                        // Fallback to simple nack
845                                                        if let Err(e) = channel_clone
846                                                            .basic_nack(
847                                                                delivery.delivery_tag,
848                                                                lapin::options::BasicNackOptions {
849                                                                    multiple: false,
850                                                                    requeue: false,
851                                                                },
852                                                            )
853                                                            .await
854                                                        {
855                                                            error!("Failed to nack message: {}", e);
856                                                        }
857                                                    }
858                                                }
859                                            } else {
860                                                // No more retries, send to DLQ
861                                                Self::send_to_dlq(
862                                                    &envelope,
863                                                    retry_cfg,
864                                                    &connection,
865                                                    &queue_name,
866                                                )
867                                                .await;
868
869                                                // ACK original message
870                                                if let Err(e) = channel_clone
871                                                    .basic_ack(
872                                                        delivery.delivery_tag,
873                                                        BasicAckOptions::default(),
874                                                    )
875                                                    .await
876                                                {
877                                                    error!(
878                                                        "Failed to ack message after DLQ: {}",
879                                                        e
880                                                    );
881                                                }
882                                            }
883                                        } else {
884                                            // Retry exhausted, send to DLQ
885                                            warn!(
886                                                "Retry exhausted for envelope {}",
887                                                envelope.metadata.message_id
888                                            );
889                                            Self::send_to_dlq(
890                                                &envelope,
891                                                retry_cfg,
892                                                &connection,
893                                                &queue_name,
894                                            )
895                                            .await;
896
897                                            // ACK original message
898                                            if let Err(e) = channel_clone
899                                                .basic_ack(
900                                                    delivery.delivery_tag,
901                                                    BasicAckOptions::default(),
902                                                )
903                                                .await
904                                            {
905                                                error!("Failed to ack message after DLQ: {}", e);
906                                            }
907                                        }
908                                    } else {
909                                        // No retry config, just nack
910                                        if let Err(e) = channel_clone
911                                            .basic_nack(
912                                                delivery.delivery_tag,
913                                                lapin::options::BasicNackOptions {
914                                                    multiple: false,
915                                                    requeue: false,
916                                                },
917                                            )
918                                            .await
919                                        {
920                                            error!("Failed to nack message: {}", e);
921                                        }
922                                    }
923                                }
924                            }
925                        }
926                    }
927                    Err(e) => {
928                        error!("Failed to deserialize message envelope: {}", e);
929                        if auto_ack {
930                            // Reject malformed messages
931                            if let Err(e) = channel_clone
932                                .basic_nack(
933                                    delivery.delivery_tag,
934                                    lapin::options::BasicNackOptions {
935                                        multiple: false,
936                                        requeue: false,
937                                    },
938                                )
939                                .await
940                            {
941                                error!("Failed to nack malformed envelope: {}", e);
942                            }
943                        }
944                    }
945                }
946            });
947        }
948
949        Ok(())
950    }
951
952    /// Send failed message to Dead Letter Queue
953    async fn send_to_dlq<T>(
954        envelope: &MessageEnvelope<T>,
955        retry_config: &RetryConfig,
956        connection: &Arc<Connection>,
957        queue_name: &str,
958    ) where
959        T: serde::Serialize,
960    {
961        match connection.create_channel().await {
962            Ok(dlq_channel) => {
963                let dlq_name = retry_config.get_dead_letter_queue(queue_name);
964
965                // Declare DLQ
966                if let Err(e) = dlq_channel
967                    .queue_declare(
968                        &dlq_name,
969                        QueueDeclareOptions {
970                            durable: true,
971                            ..Default::default()
972                        },
973                        FieldTable::default(),
974                    )
975                    .await
976                {
977                    error!("Failed to declare DLQ {}: {}", dlq_name, e);
978                    return;
979                }
980
981                // Publish to DLQ with failure summary
982                let failure_summary = envelope.get_failure_summary();
983                let dlq_payload = serde_json::json!({
984                    "envelope": envelope,
985                    "failure_summary": failure_summary,
986                    "sent_to_dlq_at": chrono::Utc::now(),
987                });
988
989                if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
990                    if let Err(e) = dlq_channel
991                        .basic_publish(
992                            "",
993                            &dlq_name,
994                            lapin::options::BasicPublishOptions::default(),
995                            &payload_bytes,
996                            lapin::BasicProperties::default(),
997                        )
998                        .await
999                    {
1000                        error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1001                    } else {
1002                        warn!(
1003                            "Sent envelope {} to DLQ: {}",
1004                            envelope.metadata.message_id, failure_summary
1005                        );
1006                    }
1007                }
1008            }
1009            Err(e) => {
1010                error!("Failed to create DLQ channel: {}", e);
1011            }
1012        }
1013    }
1014}
1015
1016/// Classify error type based on error message (simplified heuristics)
1017fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1018    let error_msg = error.to_string().to_lowercase();
1019
1020    if error_msg.contains("timeout")
1021        || error_msg.contains("connection")
1022        || error_msg.contains("network")
1023        || error_msg.contains("temporary")
1024    {
1025        ErrorType::Transient
1026    } else if error_msg.contains("rate limit")
1027        || error_msg.contains("quota")
1028        || error_msg.contains("resource")
1029    {
1030        ErrorType::Resource
1031    } else if error_msg.contains("validation")
1032        || error_msg.contains("authentication")
1033        || error_msg.contains("authorization")
1034        || error_msg.contains("invalid")
1035        || error_msg.contains("bad request")
1036    {
1037        ErrorType::Permanent
1038    } else {
1039        ErrorType::Unknown
1040    }
1041}