rust_rabbit/
consumer.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{ErrorType, MessageEnvelope, WireMessage},
5    retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9    options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10    types::{AMQPValue, FieldTable},
11    BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Semaphore;
18use tracing::{debug, error, warn};
19
20/// Message wrapper with retry tracking
21#[derive(Debug)]
22pub struct Message<T>
23where
24    T: Clone,
25{
26    pub data: T,
27    pub retry_attempt: u32,
28    tag: u64,
29    channel: Arc<Channel>,
30}
31
32impl<T> Clone for Message<T>
33where
34    T: Clone,
35{
36    fn clone(&self) -> Self {
37        Self {
38            data: self.data.clone(),
39            retry_attempt: self.retry_attempt,
40            tag: self.tag,
41            channel: Arc::clone(&self.channel),
42        }
43    }
44}
45
46impl<T> Message<T>
47where
48    T: Clone,
49{
50    /// Acknowledge the message
51    pub async fn ack(&self) -> Result<(), RustRabbitError> {
52        self.channel
53            .basic_ack(self.tag, BasicAckOptions::default())
54            .await
55            .map_err(RustRabbitError::from)
56    }
57
58    /// Reject and requeue the message
59    pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
60        self.channel
61            .basic_nack(
62                self.tag,
63                lapin::options::BasicNackOptions {
64                    multiple: false,
65                    requeue,
66                },
67            )
68            .await
69            .map_err(RustRabbitError::from)
70    }
71}
72
73/// Consumer configuration builder
74pub struct ConsumerBuilder {
75    connection: Arc<Connection>,
76    queue_name: String,
77    exchange_name: Option<String>,
78    routing_key: Option<String>,
79    retry_config: Option<RetryConfig>,
80    prefetch_count: Option<u16>,
81    auto_ack: bool,
82}
83
84impl ConsumerBuilder {
85    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
86        Self {
87            connection,
88            queue_name: queue_name.into(),
89            exchange_name: None,
90            routing_key: None,
91            retry_config: None,
92            prefetch_count: Some(10),
93            auto_ack: true,
94        }
95    }
96
97    /// Bind to an exchange with routing key
98    pub fn bind_to_exchange(
99        mut self,
100        exchange: impl Into<String>,
101        routing_key: impl Into<String>,
102    ) -> Self {
103        self.exchange_name = Some(exchange.into());
104        self.routing_key = Some(routing_key.into());
105        self
106    }
107
108    /// Set routing key (for use with bind_to_exchange)
109    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
110        self.routing_key = Some(routing_key.into());
111        self
112    }
113
114    /// Set concurrency level (same as prefetch count)
115    pub fn concurrency(mut self, count: u16) -> Self {
116        self.prefetch_count = Some(count);
117        self
118    }
119
120    /// Configure retry behavior
121    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
122        self.retry_config = Some(retry_config);
123        self
124    }
125
126    /// Set TTL for dead letter queue (auto-cleanup failed messages)
127    /// This is a convenience method that modifies the retry_config if it exists
128    ///
129    /// # Example
130    /// ```ignore
131    /// let consumer = Consumer::builder(connection, "orders")
132    ///     .with_retry(RetryConfig::exponential_default())
133    ///     .with_dlq_ttl(Duration::from_secs(86400))  // 1 day
134    ///     .build();
135    /// ```
136    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
137        if let Some(retry_config) = self.retry_config.as_mut() {
138            retry_config.dlq_ttl = Some(ttl);
139        }
140        self
141    }
142
143    /// Set prefetch count
144    pub fn with_prefetch(mut self, count: u16) -> Self {
145        self.prefetch_count = Some(count);
146        self
147    }
148
149    /// Disable auto-acknowledge (manual ack required)
150    pub fn manual_ack(mut self) -> Self {
151        self.auto_ack = false;
152        self
153    }
154
155    /// Build the consumer
156    pub fn build(self) -> Consumer {
157        Consumer {
158            connection: self.connection,
159            queue_name: self.queue_name,
160            exchange_name: self.exchange_name,
161            routing_key: self.routing_key,
162            retry_config: self.retry_config,
163            prefetch_count: self.prefetch_count.unwrap_or(10),
164            auto_ack: self.auto_ack,
165        }
166    }
167}
168
169/// Simplified Consumer for message consumption
170pub struct Consumer {
171    connection: Arc<Connection>,
172    queue_name: String,
173    exchange_name: Option<String>,
174    routing_key: Option<String>,
175    retry_config: Option<RetryConfig>,
176    prefetch_count: u16,
177    auto_ack: bool,
178}
179
180impl Consumer {
181    /// Create a new consumer builder
182    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
183        ConsumerBuilder::new(connection, queue_name)
184    }
185
186    /// Create retry queue with TTL
187    async fn create_retry_queue(
188        &self,
189        channel: &Channel,
190        retry_attempt: u32,
191        delay: std::time::Duration,
192    ) -> Result<String, RustRabbitError> {
193        let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
194        let delay_ms = delay.as_millis() as i64;
195
196        // Create retry queue with TTL that routes back to original queue
197        let mut args = FieldTable::default();
198        args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
199        args.insert(
200            "x-dead-letter-exchange".into(),
201            AMQPValue::LongString("".into()),
202        ); // Default exchange
203        args.insert(
204            "x-dead-letter-routing-key".into(),
205            AMQPValue::LongString(self.queue_name.clone().into()),
206        );
207
208        channel
209            .queue_declare(
210                &retry_queue_name,
211                QueueDeclareOptions {
212                    durable: true,
213                    ..Default::default()
214                },
215                args,
216            )
217            .await?;
218
219        debug!(
220            "Created retry queue: {} with TTL: {}ms",
221            retry_queue_name, delay_ms
222        );
223        Ok(retry_queue_name)
224    }
225
226    /// Create DLQ (Dead Letter Queue) with optional TTL
227    async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
228        let dlq_name = format!("{}.dlq", self.queue_name);
229
230        // Build queue arguments with optional TTL
231        let mut args = FieldTable::default();
232        if let Some(retry_config) = &self.retry_config {
233            if let Some(ttl) = &retry_config.dlq_ttl {
234                let ttl_ms = ttl.as_millis() as i64;
235                args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
236                debug!("DLQ TTL: {}ms", ttl_ms);
237            }
238        }
239
240        channel
241            .queue_declare(
242                &dlq_name,
243                QueueDeclareOptions {
244                    durable: true,
245                    ..Default::default()
246                },
247                args,
248            )
249            .await?;
250
251        debug!("Created DLQ: {}", dlq_name);
252        Ok(dlq_name)
253    }
254
255    /// Send message to retry queue with delay
256    async fn send_to_retry_queue(
257        &self,
258        channel: &Channel,
259        message_data: &[u8],
260        retry_attempt: u32,
261        delay: std::time::Duration,
262    ) -> Result<(), RustRabbitError> {
263        let retry_queue_name = self
264            .create_retry_queue(channel, retry_attempt, delay)
265            .await?;
266
267        // Publish to retry queue
268        channel
269            .basic_publish(
270                "", // Default exchange
271                &retry_queue_name,
272                BasicPublishOptions::default(),
273                message_data,
274                BasicProperties::default()
275                    .with_content_type("application/json".into())
276                    .with_delivery_mode(2), // Persistent
277            )
278            .await?
279            .await?;
280
281        debug!("Sent message to retry queue: {}", retry_queue_name);
282        Ok(())
283    }
284
285    /// Send message to DLQ
286    async fn send_to_dlq_simple(
287        &self,
288        channel: &Channel,
289        message_data: &[u8],
290    ) -> Result<(), RustRabbitError> {
291        let dlq_name = self.create_dlq(channel).await?;
292
293        // Publish to DLQ
294        channel
295            .basic_publish(
296                "", // Default exchange
297                &dlq_name,
298                BasicPublishOptions::default(),
299                message_data,
300                BasicProperties::default()
301                    .with_content_type("application/json".into())
302                    .with_delivery_mode(2), // Persistent
303            )
304            .await?
305            .await?;
306
307        debug!("Sent message to DLQ: {}", dlq_name);
308        Ok(())
309    }
310
311    /// Create delay exchange using RabbitMQ delayed message exchange plugin
312    /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
313    async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
314        if let Some(retry_config) = &self.retry_config {
315            let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
316
317            // Declare delay exchange with x-delayed-type argument
318            let mut args = FieldTable::default();
319            args.insert(
320                "x-delayed-type".into(),
321                AMQPValue::LongString("direct".into()),
322            );
323
324            channel
325                .exchange_declare(
326                    &delay_exchange,
327                    lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
328                    lapin::options::ExchangeDeclareOptions {
329                        durable: true,
330                        ..Default::default()
331                    },
332                    args,
333                )
334                .await?;
335
336            debug!(
337                "Created delay exchange: {} (x-delayed-message type)",
338                delay_exchange
339            );
340            Ok(delay_exchange)
341        } else {
342            Err(RustRabbitError::Retry(
343                "Retry config not configured".to_string(),
344            ))
345        }
346    }
347
348    /// Send message to delay exchange with x-delay header for retry
349    /// Message will be automatically routed back to the original queue after delay
350    async fn send_to_delay_exchange(
351        &self,
352        channel: &Channel,
353        message_data: &[u8],
354        delay: std::time::Duration,
355    ) -> Result<(), RustRabbitError> {
356        let delay_exchange = self.create_delay_exchange(channel).await?;
357        let delay_ms = delay.as_millis() as i64;
358
359        // Publish to delay exchange with x-delay header
360        // The message will be re-delivered to original queue after delay
361        channel
362            .basic_publish(
363                &delay_exchange,
364                &self.queue_name, // Routing key: original queue name
365                BasicPublishOptions::default(),
366                message_data,
367                BasicProperties::default()
368                    .with_content_type("application/json".into())
369                    .with_delivery_mode(2) // Persistent
370                    .with_headers({
371                        let mut headers = FieldTable::default();
372                        headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
373                        headers
374                    }),
375            )
376            .await?
377            .await?;
378
379        debug!(
380            "Sent message to delay exchange: {} with delay: {}ms",
381            delay_exchange, delay_ms
382        );
383        Ok(())
384    }
385
386    /// Start consuming messages
387    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
388    where
389        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
390        H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
391        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
392    {
393        let channel = self.connection.create_channel().await?;
394
395        // Set prefetch count
396        channel
397            .basic_qos(
398                self.prefetch_count,
399                lapin::options::BasicQosOptions::default(),
400            )
401            .await?;
402
403        // Setup infrastructure (queues, exchanges)
404        self.setup_infrastructure(&channel).await?;
405
406        // Start consuming
407        let mut consumer = channel
408            .basic_consume(
409                &self.queue_name,
410                "",
411                BasicConsumeOptions::default(),
412                FieldTable::default(),
413            )
414            .await?;
415
416        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
417
418        debug!("Started consuming from queue: {}", self.queue_name);
419
420        // Process messages
421        while let Some(delivery_result) = consumer.next().await {
422            let delivery = delivery_result?;
423            let permit = semaphore.clone().acquire_owned().await.unwrap();
424            let handler_clone = handler.clone();
425            let auto_ack = self.auto_ack;
426            let channel_clone = Arc::new(channel.clone());
427            let retry_config = self.retry_config.clone();
428            let consumer_self = Consumer {
429                connection: self.connection.clone(),
430                queue_name: self.queue_name.clone(),
431                exchange_name: self.exchange_name.clone(),
432                routing_key: self.routing_key.clone(),
433                retry_config: self.retry_config.clone(),
434                prefetch_count: self.prefetch_count,
435                auto_ack: self.auto_ack,
436            };
437
438            tokio::spawn(async move {
439                let _permit = permit;
440
441                // Deserialize as WireMessage format
442                match serde_json::from_slice::<crate::message::WireMessage<T>>(&delivery.data) {
443                    Ok(wire_msg) => {
444                        let message = Message {
445                            data: wire_msg.data,
446                            retry_attempt: wire_msg.retry_attempt,
447                            tag: delivery.delivery_tag,
448                            channel: channel_clone.clone(),
449                        };
450
451                        // Process message
452                        match handler_clone(message.clone()).await {
453                            Ok(()) => {
454                                if auto_ack {
455                                    if let Err(e) = message.ack().await {
456                                        error!("Failed to ack message: {}", e);
457                                    }
458                                }
459                                debug!("Message processed successfully");
460                            }
461                            Err(e) => {
462                                error!("Handler error: {}", e);
463                                if auto_ack {
464                                    // Check if retry is configured
465                                    if let Some(retry_cfg) = &retry_config {
466                                        if message.retry_attempt < retry_cfg.max_retries {
467                                            // Calculate delay for next retry
468                                            if let Some(delay) =
469                                                retry_cfg.calculate_delay(message.retry_attempt)
470                                            {
471                                                warn!(
472                                                    "Scheduling retry {} with delay {:?} for message",
473                                                    message.retry_attempt + 1,
474                                                    delay
475                                                );
476
477                                                // Update retry attempt in wire message
478                                                let wire_msg = WireMessage {
479                                                    data: message.data.clone(),
480                                                    retry_attempt: message.retry_attempt + 1,
481                                                };
482
483                                                let retry_payload =
484                                                    match serde_json::to_vec(&wire_msg) {
485                                                        Ok(payload) => payload,
486                                                        Err(e) => {
487                                                            error!(
488                                                            "Failed to serialize retry message: {}",
489                                                            e
490                                                        );
491                                                            if let Err(e) =
492                                                                message.nack(false).await
493                                                            {
494                                                                error!(
495                                                                    "Failed to nack message: {}",
496                                                                    e
497                                                                );
498                                                            }
499                                                            return;
500                                                        }
501                                                    };
502
503                                                // Send via appropriate strategy (TTL or DelayedExchange)
504                                                let send_result = if matches!(
505                                                    retry_cfg.delay_strategy,
506                                                    crate::retry::DelayStrategy::DelayedExchange
507                                                ) {
508                                                    consumer_self
509                                                        .send_to_delay_exchange(
510                                                            &channel_clone,
511                                                            &retry_payload,
512                                                            delay,
513                                                        )
514                                                        .await
515                                                } else {
516                                                    consumer_self
517                                                        .send_to_retry_queue(
518                                                            &channel_clone,
519                                                            &retry_payload,
520                                                            message.retry_attempt + 1,
521                                                            delay,
522                                                        )
523                                                        .await
524                                                };
525
526                                                if let Err(e) = send_result {
527                                                    error!("Failed to send retry message: {}", e);
528                                                    if let Err(e) = message.nack(false).await {
529                                                        error!("Failed to nack message: {}", e);
530                                                    }
531                                                    return;
532                                                }
533
534                                                // ACK original message (it's now queued for retry)
535                                                if let Err(e) = message.ack().await {
536                                                    error!(
537                                                        "Failed to ack message after retry: {}",
538                                                        e
539                                                    );
540                                                }
541                                            } else {
542                                                // No more retries, send to DLQ
543                                                warn!("Retry exhausted, sending to DLQ");
544                                                if let Err(e) = consumer_self
545                                                    .send_to_dlq_simple(
546                                                        &channel_clone,
547                                                        &delivery.data,
548                                                    )
549                                                    .await
550                                                {
551                                                    error!("Failed to send to DLQ: {}", e);
552                                                }
553                                                if let Err(e) = message.ack().await {
554                                                    error!(
555                                                        "Failed to ack message after DLQ: {}",
556                                                        e
557                                                    );
558                                                }
559                                            }
560                                        } else {
561                                            // Retry exhausted, send to DLQ
562                                            warn!("Max retries reached, sending to DLQ");
563                                            if let Err(e) = consumer_self
564                                                .send_to_dlq_simple(&channel_clone, &delivery.data)
565                                                .await
566                                            {
567                                                error!("Failed to send to DLQ: {}", e);
568                                            }
569                                            if let Err(e) = message.ack().await {
570                                                error!("Failed to ack message after DLQ: {}", e);
571                                            }
572                                        }
573                                    } else {
574                                        // No retry config, just nack
575                                        if let Err(e) = message.nack(false).await {
576                                            error!("Failed to nack message: {}", e);
577                                        }
578                                    }
579                                }
580                            }
581                        }
582                    }
583                    Err(e) => {
584                        error!("Failed to deserialize message: {}", e);
585                        if auto_ack {
586                            // Reject malformed messages
587                            if let Err(e) = channel_clone
588                                .basic_nack(
589                                    delivery.delivery_tag,
590                                    lapin::options::BasicNackOptions {
591                                        multiple: false,
592                                        requeue: false,
593                                    },
594                                )
595                                .await
596                            {
597                                error!("Failed to nack malformed message: {}", e);
598                            }
599                        }
600                    }
601                }
602            });
603        }
604
605        Ok(())
606    }
607
608    /// Setup queue and exchange infrastructure
609    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
610        // Declare queue
611        channel
612            .queue_declare(
613                &self.queue_name,
614                QueueDeclareOptions {
615                    durable: true,
616                    ..Default::default()
617                },
618                FieldTable::default(),
619            )
620            .await?;
621
622        // Bind to exchange if specified
623        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
624            channel
625                .queue_bind(
626                    &self.queue_name,
627                    exchange,
628                    routing_key,
629                    lapin::options::QueueBindOptions::default(),
630                    FieldTable::default(),
631                )
632                .await?;
633        }
634
635        // Setup delay exchange if using DelayedExchange strategy
636        if let Some(retry_config) = &self.retry_config {
637            if matches!(
638                retry_config.delay_strategy,
639                crate::retry::DelayStrategy::DelayedExchange
640            ) {
641                let delay_exchange = self.create_delay_exchange(channel).await?;
642
643                // Bind delay exchange to original queue
644                channel
645                    .queue_bind(
646                        &self.queue_name,
647                        &delay_exchange,
648                        &self.queue_name, // Routing key: original queue name
649                        lapin::options::QueueBindOptions::default(),
650                        FieldTable::default(),
651                    )
652                    .await?;
653
654                debug!(
655                    "Bound queue {} to delay exchange {}",
656                    self.queue_name, delay_exchange
657                );
658            }
659        }
660
661        Ok(())
662    }
663
664    /// Start consuming message envelopes with full retry support
665    pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
666    where
667        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
668        H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
669        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
670    {
671        let channel = self.connection.create_channel().await?;
672        let retry_config = self.retry_config.clone();
673
674        // Set prefetch count
675        channel
676            .basic_qos(
677                self.prefetch_count,
678                lapin::options::BasicQosOptions::default(),
679            )
680            .await?;
681
682        // Setup queue and exchange
683        self.setup_infrastructure(&channel).await?;
684
685        // Create consumer
686        let mut consumer = channel
687            .basic_consume(
688                &self.queue_name,
689                "rust-rabbit-envelope-consumer",
690                BasicConsumeOptions::default(),
691                FieldTable::default(),
692            )
693            .await?;
694
695        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
696
697        debug!(
698            "Started consuming envelopes from queue: {}",
699            self.queue_name
700        );
701
702        // Process message envelopes with retry support
703        while let Some(delivery_result) = consumer.next().await {
704            let delivery = delivery_result?;
705            let permit = semaphore.clone().acquire_owned().await.unwrap();
706            let handler_clone = handler.clone();
707            let auto_ack = self.auto_ack;
708            let channel_clone = Arc::new(channel.clone());
709            let retry_config_clone = retry_config.clone();
710            let queue_name = self.queue_name.clone();
711            let connection = self.connection.clone();
712
713            tokio::spawn(async move {
714                let _permit = permit;
715
716                // Try to deserialize as MessageEnvelope
717                match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
718                    Ok(mut envelope) => {
719                        debug!(
720                            "Processing envelope {} (attempt {}/{})",
721                            envelope.metadata.message_id,
722                            envelope.metadata.retry_attempt + 1,
723                            envelope.metadata.max_retries + 1
724                        );
725
726                        // Process message
727                        match handler_clone(envelope.clone()).await {
728                            Ok(()) => {
729                                if auto_ack {
730                                    if let Err(e) = channel_clone
731                                        .basic_ack(
732                                            delivery.delivery_tag,
733                                            BasicAckOptions::default(),
734                                        )
735                                        .await
736                                    {
737                                        error!("Failed to ack message: {}", e);
738                                    }
739                                }
740                                debug!(
741                                    "Envelope {} processed successfully",
742                                    envelope.metadata.message_id
743                                );
744                            }
745                            Err(e) => {
746                                error!(
747                                    "Handler error for envelope {}: {}",
748                                    envelope.metadata.message_id, e
749                                );
750
751                                // Determine error type (simplified classification)
752                                let error_type = classify_error(e.as_ref());
753
754                                // Add error to envelope
755                                envelope = envelope.with_error(
756                                    &e.to_string(),
757                                    error_type,
758                                    Some(&format!("Queue: {}", queue_name)),
759                                );
760
761                                if auto_ack {
762                                    // Check if we should retry
763                                    if let Some(retry_cfg) = &retry_config_clone {
764                                        if !envelope.is_retry_exhausted() {
765                                            // Calculate delay and schedule retry
766                                            if let Some(delay) = retry_cfg
767                                                .calculate_delay(envelope.metadata.retry_attempt)
768                                            {
769                                                warn!(
770                                                    "Scheduling retry {} for envelope {} with delay {:?}",
771                                                    envelope.metadata.retry_attempt + 1,
772                                                    envelope.metadata.message_id,
773                                                    delay
774                                                );
775
776                                                // Increment retry attempt in envelope
777                                                envelope.metadata.retry_attempt += 1;
778
779                                                // Serialize updated envelope
780                                                match serde_json::to_vec(&envelope) {
781                                                    Ok(retry_payload) => {
782                                                        // Create consumer instance for access to methods
783                                                        let consumer_self = Consumer {
784                                                            connection: connection.clone(),
785                                                            queue_name: queue_name.clone(),
786                                                            exchange_name: None,
787                                                            routing_key: None,
788                                                            retry_config: retry_config_clone
789                                                                .clone(),
790                                                            prefetch_count: 10,
791                                                            auto_ack: true,
792                                                        };
793
794                                                        // Send via appropriate strategy (TTL or DelayedExchange)
795                                                        let send_result = if matches!(
796                                                            retry_config_clone
797                                                                .as_ref()
798                                                                .map(|c| c.delay_strategy),
799                                                            Some(crate::retry::DelayStrategy::DelayedExchange)
800                                                        ) {
801                                                            consumer_self
802                                                                .send_to_delay_exchange(
803                                                                    &channel_clone,
804                                                                    &retry_payload,
805                                                                    delay,
806                                                                )
807                                                                .await
808                                                        } else {
809                                                            consumer_self
810                                                                .send_to_retry_queue(
811                                                                    &channel_clone,
812                                                                    &retry_payload,
813                                                                    envelope.metadata.retry_attempt,
814                                                                    delay,
815                                                                )
816                                                                .await
817                                                        };
818
819                                                        if let Err(e) = send_result {
820                                                            error!("Failed to send envelope for retry: {}", e);
821                                                            // Fallback to simple nack
822                                                            if let Err(e) = channel_clone
823                                                                .basic_nack(
824                                                                    delivery.delivery_tag,
825                                                                    lapin::options::BasicNackOptions {
826                                                                        multiple: false,
827                                                                        requeue: false,
828                                                                    },
829                                                                )
830                                                                .await
831                                                            {
832                                                                error!("Failed to nack message: {}", e);
833                                                            }
834                                                            return;
835                                                        }
836
837                                                        // ACK original message (it's now queued for retry)
838                                                        if let Err(e) = channel_clone
839                                                            .basic_ack(
840                                                                delivery.delivery_tag,
841                                                                BasicAckOptions::default(),
842                                                            )
843                                                            .await
844                                                        {
845                                                            error!("Failed to ack message after retry: {}", e);
846                                                        }
847                                                    }
848                                                    Err(e) => {
849                                                        error!("Failed to serialize envelope for retry: {}", e);
850                                                        // Fallback to simple nack
851                                                        if let Err(e) = channel_clone
852                                                            .basic_nack(
853                                                                delivery.delivery_tag,
854                                                                lapin::options::BasicNackOptions {
855                                                                    multiple: false,
856                                                                    requeue: false,
857                                                                },
858                                                            )
859                                                            .await
860                                                        {
861                                                            error!("Failed to nack message: {}", e);
862                                                        }
863                                                    }
864                                                }
865                                            } else {
866                                                // No more retries, send to DLQ
867                                                Self::send_to_dlq(
868                                                    &envelope,
869                                                    retry_cfg,
870                                                    &connection,
871                                                    &queue_name,
872                                                )
873                                                .await;
874
875                                                // ACK original message
876                                                if let Err(e) = channel_clone
877                                                    .basic_ack(
878                                                        delivery.delivery_tag,
879                                                        BasicAckOptions::default(),
880                                                    )
881                                                    .await
882                                                {
883                                                    error!(
884                                                        "Failed to ack message after DLQ: {}",
885                                                        e
886                                                    );
887                                                }
888                                            }
889                                        } else {
890                                            // Retry exhausted, send to DLQ
891                                            warn!(
892                                                "Retry exhausted for envelope {}",
893                                                envelope.metadata.message_id
894                                            );
895                                            Self::send_to_dlq(
896                                                &envelope,
897                                                retry_cfg,
898                                                &connection,
899                                                &queue_name,
900                                            )
901                                            .await;
902
903                                            // ACK original message
904                                            if let Err(e) = channel_clone
905                                                .basic_ack(
906                                                    delivery.delivery_tag,
907                                                    BasicAckOptions::default(),
908                                                )
909                                                .await
910                                            {
911                                                error!("Failed to ack message after DLQ: {}", e);
912                                            }
913                                        }
914                                    } else {
915                                        // No retry config, just nack
916                                        if let Err(e) = channel_clone
917                                            .basic_nack(
918                                                delivery.delivery_tag,
919                                                lapin::options::BasicNackOptions {
920                                                    multiple: false,
921                                                    requeue: false,
922                                                },
923                                            )
924                                            .await
925                                        {
926                                            error!("Failed to nack message: {}", e);
927                                        }
928                                    }
929                                }
930                            }
931                        }
932                    }
933                    Err(e) => {
934                        error!("Failed to deserialize message envelope: {}", e);
935                        if auto_ack {
936                            // Reject malformed messages
937                            if let Err(e) = channel_clone
938                                .basic_nack(
939                                    delivery.delivery_tag,
940                                    lapin::options::BasicNackOptions {
941                                        multiple: false,
942                                        requeue: false,
943                                    },
944                                )
945                                .await
946                            {
947                                error!("Failed to nack malformed envelope: {}", e);
948                            }
949                        }
950                    }
951                }
952            });
953        }
954
955        Ok(())
956    }
957
958    /// Send failed message to Dead Letter Queue
959    async fn send_to_dlq<T>(
960        envelope: &MessageEnvelope<T>,
961        retry_config: &RetryConfig,
962        connection: &Arc<Connection>,
963        queue_name: &str,
964    ) where
965        T: serde::Serialize,
966    {
967        match connection.create_channel().await {
968            Ok(dlq_channel) => {
969                let dlq_name = retry_config.get_dead_letter_queue(queue_name);
970
971                // Declare DLQ
972                if let Err(e) = dlq_channel
973                    .queue_declare(
974                        &dlq_name,
975                        QueueDeclareOptions {
976                            durable: true,
977                            ..Default::default()
978                        },
979                        FieldTable::default(),
980                    )
981                    .await
982                {
983                    error!("Failed to declare DLQ {}: {}", dlq_name, e);
984                    return;
985                }
986
987                // Publish to DLQ with failure summary
988                let failure_summary = envelope.get_failure_summary();
989                let dlq_payload = serde_json::json!({
990                    "envelope": envelope,
991                    "failure_summary": failure_summary,
992                    "sent_to_dlq_at": chrono::Utc::now(),
993                });
994
995                if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
996                    if let Err(e) = dlq_channel
997                        .basic_publish(
998                            "",
999                            &dlq_name,
1000                            lapin::options::BasicPublishOptions::default(),
1001                            &payload_bytes,
1002                            lapin::BasicProperties::default(),
1003                        )
1004                        .await
1005                    {
1006                        error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1007                    } else {
1008                        warn!(
1009                            "Sent envelope {} to DLQ: {}",
1010                            envelope.metadata.message_id, failure_summary
1011                        );
1012                    }
1013                }
1014            }
1015            Err(e) => {
1016                error!("Failed to create DLQ channel: {}", e);
1017            }
1018        }
1019    }
1020}
1021
1022/// Classify error type based on error message (simplified heuristics)
1023fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1024    let error_msg = error.to_string().to_lowercase();
1025
1026    if error_msg.contains("timeout")
1027        || error_msg.contains("connection")
1028        || error_msg.contains("network")
1029        || error_msg.contains("temporary")
1030    {
1031        ErrorType::Transient
1032    } else if error_msg.contains("rate limit")
1033        || error_msg.contains("quota")
1034        || error_msg.contains("resource")
1035    {
1036        ErrorType::Resource
1037    } else if error_msg.contains("validation")
1038        || error_msg.contains("authentication")
1039        || error_msg.contains("authorization")
1040        || error_msg.contains("invalid")
1041        || error_msg.contains("bad request")
1042    {
1043        ErrorType::Permanent
1044    } else {
1045        ErrorType::Unknown
1046    }
1047}