rust_rabbit/
consumer.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{ErrorType, MassTransitEnvelope, MessageEnvelope},
5    retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9    options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10    types::{AMQPValue, FieldTable},
11    BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::Semaphore;
19use tracing::{debug, error, warn};
20
21// Helper functions for reading/writing headers
22const HEADER_RETRY_ATTEMPT: &str = "x-retry-attempt";
23const HEADER_CORRELATION_ID: &str = "x-correlation-id";
24
25/// Read retry_attempt from AMQP headers, defaulting to 0 if not present
26fn read_retry_attempt(properties: &BasicProperties) -> u32 {
27    if let Some(headers) = properties.headers() {
28        // Iterate over headers to find the key
29        for (header_key, value) in headers {
30            if header_key.as_str() == HEADER_RETRY_ATTEMPT {
31                if let AMQPValue::LongLongInt(attempt) = value {
32                    return *attempt as u32;
33                }
34            }
35        }
36    }
37    0
38}
39
40/// Read correlation_id from AMQP headers
41fn read_correlation_id(properties: &BasicProperties) -> Option<String> {
42    if let Some(headers) = properties.headers() {
43        // Iterate over headers to find the key
44        for (header_key, value) in headers {
45            if header_key.as_str() == HEADER_CORRELATION_ID {
46                if let AMQPValue::LongString(corr_id) = value {
47                    return Some(corr_id.to_string());
48                }
49            }
50        }
51    }
52    None
53}
54
55/// Create headers with retry_attempt and correlation_id
56///
57/// This function is kept for potential future use or testing scenarios
58/// where header creation is needed outside of the main message processing flow.
59#[allow(dead_code)]
60fn create_headers(retry_attempt: u32, correlation_id: Option<&str>) -> FieldTable {
61    let mut headers = FieldTable::default();
62    headers.insert(
63        HEADER_RETRY_ATTEMPT.into(),
64        AMQPValue::LongLongInt(retry_attempt as i64),
65    );
66    if let Some(corr_id) = correlation_id {
67        headers.insert(
68            HEADER_CORRELATION_ID.into(),
69            AMQPValue::LongString(corr_id.into()),
70        );
71    }
72    headers
73}
74
75/// Update headers with new retry_attempt, preserving existing headers
76fn update_headers_with_retry(
77    existing_headers: Option<&FieldTable>,
78    retry_attempt: u32,
79) -> FieldTable {
80    let mut headers = match existing_headers {
81        Some(h) => h.clone(),
82        None => FieldTable::default(),
83    };
84    headers.insert(
85        HEADER_RETRY_ATTEMPT.into(),
86        AMQPValue::LongLongInt(retry_attempt as i64),
87    );
88    headers
89}
90
91/// Consumer configuration builder
92pub struct ConsumerBuilder {
93    connection: Arc<Connection>,
94    queue_name: String,
95    exchange_name: Option<String>,
96    routing_key: Option<String>,
97    retry_config: Option<RetryConfig>,
98    prefetch_count: Option<u16>,
99    auto_ack: bool,
100}
101
102impl ConsumerBuilder {
103    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
104        Self {
105            connection,
106            queue_name: queue_name.into(),
107            exchange_name: None,
108            routing_key: None,
109            retry_config: None,
110            prefetch_count: Some(10),
111            auto_ack: true,
112        }
113    }
114
115    /// Bind to an exchange with routing key
116    pub fn bind_to_exchange(
117        mut self,
118        exchange: impl Into<String>,
119        routing_key: impl Into<String>,
120    ) -> Self {
121        self.exchange_name = Some(exchange.into());
122        self.routing_key = Some(routing_key.into());
123        self
124    }
125
126    /// Set routing key (for use with bind_to_exchange)
127    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
128        self.routing_key = Some(routing_key.into());
129        self
130    }
131
132    /// Configure retry behavior
133    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
134        self.retry_config = Some(retry_config);
135        self
136    }
137
138    /// Set TTL for dead letter queue (auto-cleanup failed messages)
139    /// This is a convenience method that modifies the retry_config if it exists
140    ///
141    /// # Example
142    /// ```rust,no_run
143    /// # use rust_rabbit::{Connection, Consumer, RetryConfig};
144    /// # use std::time::Duration;
145    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
146    /// # let connection = Connection::new("amqp://localhost").await?;
147    /// let consumer = Consumer::builder(connection, "orders")
148    ///     .with_retry(RetryConfig::exponential_default())
149    ///     .with_dlq_ttl(Duration::from_secs(86400))  // 1 day
150    ///     .build();
151    /// # Ok(())
152    /// # }
153    /// ```
154    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
155        if let Some(retry_config) = self.retry_config.as_mut() {
156            retry_config.dlq_ttl = Some(ttl);
157        }
158        self
159    }
160
161    /// Set prefetch count
162    pub fn with_prefetch(mut self, count: u16) -> Self {
163        self.prefetch_count = Some(count);
164        self
165    }
166
167    /// Disable auto-acknowledge (manual ack required)
168    pub fn manual_ack(mut self) -> Self {
169        self.auto_ack = false;
170        self
171    }
172
173    /// Build the consumer
174    pub fn build(self) -> Consumer {
175        Consumer {
176            connection: self.connection,
177            queue_name: self.queue_name,
178            exchange_name: self.exchange_name,
179            routing_key: self.routing_key,
180            retry_config: self.retry_config,
181            prefetch_count: self.prefetch_count.unwrap_or(10),
182            auto_ack: self.auto_ack,
183        }
184    }
185}
186
187/// Simplified Consumer for message consumption
188pub struct Consumer {
189    connection: Arc<Connection>,
190    queue_name: String,
191    exchange_name: Option<String>,
192    routing_key: Option<String>,
193    retry_config: Option<RetryConfig>,
194    prefetch_count: u16,
195    auto_ack: bool,
196}
197
198impl Consumer {
199    /// Create a new consumer builder
200    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
201        ConsumerBuilder::new(connection, queue_name)
202    }
203
204    /// Create retry queue with TTL
205    async fn create_retry_queue(
206        &self,
207        channel: &Channel,
208        retry_attempt: u32,
209        delay: std::time::Duration,
210    ) -> Result<String, RustRabbitError> {
211        let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
212        let delay_ms = delay.as_millis() as i64;
213
214        // Create retry queue with TTL that routes back to original queue
215        let mut args = FieldTable::default();
216        args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
217        args.insert(
218            "x-dead-letter-exchange".into(),
219            AMQPValue::LongString("".into()),
220        ); // Default exchange
221        args.insert(
222            "x-dead-letter-routing-key".into(),
223            AMQPValue::LongString(self.queue_name.clone().into()),
224        );
225
226        channel
227            .queue_declare(
228                &retry_queue_name,
229                QueueDeclareOptions {
230                    durable: true,
231                    ..Default::default()
232                },
233                args,
234            )
235            .await?;
236
237        debug!(
238            "Created retry queue: {} with TTL: {}ms",
239            retry_queue_name, delay_ms
240        );
241        Ok(retry_queue_name)
242    }
243
244    /// Create DLQ (Dead Letter Queue) with optional TTL
245    async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
246        let dlq_name = format!("{}.dlq", self.queue_name);
247
248        // Build queue arguments with optional TTL
249        let mut args = FieldTable::default();
250        if let Some(retry_config) = &self.retry_config {
251            if let Some(ttl) = &retry_config.dlq_ttl {
252                let ttl_ms = ttl.as_millis() as i64;
253                args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
254                debug!("DLQ TTL: {}ms", ttl_ms);
255            }
256        }
257
258        channel
259            .queue_declare(
260                &dlq_name,
261                QueueDeclareOptions {
262                    durable: true,
263                    ..Default::default()
264                },
265                args,
266            )
267            .await?;
268
269        debug!("Created DLQ: {}", dlq_name);
270        Ok(dlq_name)
271    }
272
273    /// Send message to retry queue with delay
274    async fn send_to_retry_queue(
275        &self,
276        channel: &Channel,
277        message_data: &[u8],
278        retry_attempt: u32,
279        delay: std::time::Duration,
280    ) -> Result<(), RustRabbitError> {
281        let retry_queue_name = self
282            .create_retry_queue(channel, retry_attempt, delay)
283            .await?;
284
285        // Publish to retry queue
286        channel
287            .basic_publish(
288                "", // Default exchange
289                &retry_queue_name,
290                BasicPublishOptions::default(),
291                message_data,
292                BasicProperties::default()
293                    .with_content_type("application/json".into())
294                    .with_delivery_mode(2), // Persistent
295            )
296            .await?
297            .await?;
298
299        debug!("Sent message to retry queue: {}", retry_queue_name);
300        Ok(())
301    }
302
303    /// Send message to retry queue with delay and custom headers
304    async fn send_to_retry_queue_with_headers(
305        &self,
306        channel: &Channel,
307        message_data: &[u8],
308        retry_attempt: u32,
309        delay: std::time::Duration,
310        headers: FieldTable,
311    ) -> Result<(), RustRabbitError> {
312        let retry_queue_name = self
313            .create_retry_queue(channel, retry_attempt, delay)
314            .await?;
315
316        // Publish to retry queue with headers
317        channel
318            .basic_publish(
319                "", // Default exchange
320                &retry_queue_name,
321                BasicPublishOptions::default(),
322                message_data,
323                BasicProperties::default()
324                    .with_content_type("application/json".into())
325                    .with_delivery_mode(2) // Persistent
326                    .with_headers(headers),
327            )
328            .await?
329            .await?;
330
331        debug!(
332            "Sent message to retry queue with headers: {}",
333            retry_queue_name
334        );
335        Ok(())
336    }
337
338    /// Send message to DLQ
339    async fn send_to_dlq_simple(
340        &self,
341        channel: &Channel,
342        message_data: &[u8],
343    ) -> Result<(), RustRabbitError> {
344        let dlq_name = self.create_dlq(channel).await?;
345
346        // Publish to DLQ
347        channel
348            .basic_publish(
349                "", // Default exchange
350                &dlq_name,
351                BasicPublishOptions::default(),
352                message_data,
353                BasicProperties::default()
354                    .with_content_type("application/json".into())
355                    .with_delivery_mode(2), // Persistent
356            )
357            .await?
358            .await?;
359
360        debug!("Sent message to DLQ: {}", dlq_name);
361        Ok(())
362    }
363
364    /// Create delay exchange using RabbitMQ delayed message exchange plugin
365    /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
366    async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
367        if let Some(retry_config) = &self.retry_config {
368            let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
369
370            // Declare delay exchange with x-delayed-type argument
371            let mut args = FieldTable::default();
372            args.insert(
373                "x-delayed-type".into(),
374                AMQPValue::LongString("direct".into()),
375            );
376
377            channel
378                .exchange_declare(
379                    &delay_exchange,
380                    lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
381                    lapin::options::ExchangeDeclareOptions {
382                        durable: true,
383                        ..Default::default()
384                    },
385                    args,
386                )
387                .await?;
388
389            debug!(
390                "Created delay exchange: {} (x-delayed-message type)",
391                delay_exchange
392            );
393            Ok(delay_exchange)
394        } else {
395            Err(RustRabbitError::Retry(
396                "Retry config not configured".to_string(),
397            ))
398        }
399    }
400
401    /// Send message to delay exchange with x-delay header for retry
402    /// Message will be automatically routed back to the original queue after delay
403    async fn send_to_delay_exchange(
404        &self,
405        channel: &Channel,
406        message_data: &[u8],
407        delay: std::time::Duration,
408    ) -> Result<(), RustRabbitError> {
409        let delay_exchange = self.create_delay_exchange(channel).await?;
410        let delay_ms = delay.as_millis() as i64;
411
412        // Publish to delay exchange with x-delay header
413        // The message will be re-delivered to original queue after delay
414        channel
415            .basic_publish(
416                &delay_exchange,
417                &self.queue_name, // Routing key: original queue name
418                BasicPublishOptions::default(),
419                message_data,
420                BasicProperties::default()
421                    .with_content_type("application/json".into())
422                    .with_delivery_mode(2) // Persistent
423                    .with_headers({
424                        let mut headers = FieldTable::default();
425                        headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
426                        headers
427                    }),
428            )
429            .await?
430            .await?;
431
432        debug!(
433            "Sent message to delay exchange: {} with delay: {}ms",
434            delay_exchange, delay_ms
435        );
436        Ok(())
437    }
438
439    /// Send message to delay exchange with x-delay header and custom headers for retry
440    /// Message will be automatically routed back to the original queue after delay
441    async fn send_to_delay_exchange_with_headers(
442        &self,
443        channel: &Channel,
444        message_data: &[u8],
445        delay: std::time::Duration,
446        mut headers: FieldTable,
447    ) -> Result<(), RustRabbitError> {
448        let delay_exchange = self.create_delay_exchange(channel).await?;
449        let delay_ms = delay.as_millis() as i64;
450
451        // Add x-delay header to existing headers
452        headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
453
454        // Publish to delay exchange with headers
455        // The message will be re-delivered to original queue after delay
456        channel
457            .basic_publish(
458                &delay_exchange,
459                &self.queue_name, // Routing key: original queue name
460                BasicPublishOptions::default(),
461                message_data,
462                BasicProperties::default()
463                    .with_content_type("application/json".into())
464                    .with_delivery_mode(2) // Persistent
465                    .with_headers(headers),
466            )
467            .await?
468            .await?;
469
470        debug!(
471            "Sent message to delay exchange with headers: {} with delay: {}ms",
472            delay_exchange, delay_ms
473        );
474        Ok(())
475    }
476
477    /// Start consuming messages with smart MassTransit detection
478    /// Handler receives just the payload type T (no wrapper)
479    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
480    where
481        T: DeserializeOwned + Send + Clone + Sync + 'static + Serialize,
482        H: Fn(T) -> Fut + Send + Sync + Clone + 'static,
483        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
484    {
485        let channel = self.connection.create_channel().await?;
486
487        // Set prefetch count
488        channel
489            .basic_qos(
490                self.prefetch_count,
491                lapin::options::BasicQosOptions::default(),
492            )
493            .await?;
494
495        // Setup infrastructure (queues, exchanges)
496        self.setup_infrastructure(&channel).await?;
497
498        // Start consuming
499        let mut consumer = channel
500            .basic_consume(
501                &self.queue_name,
502                "",
503                BasicConsumeOptions::default(),
504                FieldTable::default(),
505            )
506            .await?;
507
508        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
509
510        debug!("Started consuming from queue: {}", self.queue_name);
511
512        // Process messages
513        while let Some(delivery_result) = consumer.next().await {
514            let delivery = delivery_result?;
515            let permit = semaphore.clone().acquire_owned().await.unwrap();
516            let handler_clone = handler.clone();
517            let auto_ack = self.auto_ack;
518            let channel_clone = Arc::new(channel.clone());
519            let retry_config = self.retry_config.clone();
520            let consumer_self = Consumer {
521                connection: self.connection.clone(),
522                queue_name: self.queue_name.clone(),
523                exchange_name: self.exchange_name.clone(),
524                routing_key: self.routing_key.clone(),
525                retry_config: self.retry_config.clone(),
526                prefetch_count: self.prefetch_count,
527                auto_ack: self.auto_ack,
528            };
529
530            tokio::spawn(async move {
531                let _permit = permit;
532                let delivery_tag = delivery.delivery_tag;
533                let properties = delivery.properties;
534
535                // Smart detection: Try MassTransit format first
536                let (payload, correlation_id_from_mt) =
537                    match MassTransitEnvelope::from_slice(&delivery.data) {
538                        Ok(mt_envelope) => {
539                            // MassTransit format detected - extract payload
540                            match mt_envelope.extract_message::<T>() {
541                                Ok(data) => {
542                                    debug!("Detected MassTransit format, extracted payload");
543                                    (data, mt_envelope.correlation_id().map(|s| s.to_string()))
544                                }
545                                Err(e) => {
546                                    error!(
547                                        "Failed to extract payload from MassTransit envelope: {}",
548                                        e
549                                    );
550                                    if auto_ack {
551                                        if let Err(e) = channel_clone
552                                            .basic_nack(
553                                                delivery_tag,
554                                                lapin::options::BasicNackOptions {
555                                                    multiple: false,
556                                                    requeue: false,
557                                                },
558                                            )
559                                            .await
560                                        {
561                                            error!("Failed to nack malformed message: {}", e);
562                                        }
563                                    }
564                                    return;
565                                }
566                            }
567                        }
568                        Err(_) => {
569                            // Not MassTransit format - try direct deserialization
570                            match serde_json::from_slice::<T>(&delivery.data) {
571                                Ok(data) => {
572                                    debug!("Direct format detected");
573                                    (data, None)
574                                }
575                                Err(e) => {
576                                    error!("Failed to deserialize message: {}", e);
577                                    if auto_ack {
578                                        if let Err(e) = channel_clone
579                                            .basic_nack(
580                                                delivery_tag,
581                                                lapin::options::BasicNackOptions {
582                                                    multiple: false,
583                                                    requeue: false,
584                                                },
585                                            )
586                                            .await
587                                        {
588                                            error!("Failed to nack malformed message: {}", e);
589                                        }
590                                    }
591                                    return;
592                                }
593                            }
594                        }
595                    };
596
597                // Read metadata from headers
598                let retry_attempt = read_retry_attempt(&properties);
599                let correlation_id =
600                    correlation_id_from_mt.or_else(|| read_correlation_id(&properties));
601
602                // If we got correlation_id from MassTransit but it's not in headers, we should add it
603                // But for now, we'll just use it for retries
604
605                // Process message - handler receives just T
606                match handler_clone(payload.clone()).await {
607                    Ok(()) => {
608                        if auto_ack {
609                            if let Err(e) = channel_clone
610                                .basic_ack(delivery_tag, BasicAckOptions::default())
611                                .await
612                            {
613                                error!("Failed to ack message: {}", e);
614                            }
615                        }
616                        debug!("Message processed successfully");
617                    }
618                    Err(e) => {
619                        error!("Handler error: {}", e);
620                        if auto_ack {
621                            // Check if retry is configured
622                            if let Some(retry_cfg) = &retry_config {
623                                if retry_attempt < retry_cfg.max_retries {
624                                    // Calculate delay for next retry
625                                    if let Some(delay) = retry_cfg.calculate_delay(retry_attempt) {
626                                        warn!(
627                                            "Scheduling retry {} with delay {:?} for message",
628                                            retry_attempt + 1,
629                                            delay
630                                        );
631
632                                        // Serialize payload (raw, no wrapper)
633                                        let payload_bytes = match serde_json::to_vec(&payload) {
634                                            Ok(bytes) => bytes,
635                                            Err(e) => {
636                                                error!(
637                                                    "Failed to serialize payload for retry: {}",
638                                                    e
639                                                );
640                                                if let Err(e) = channel_clone
641                                                    .basic_nack(
642                                                        delivery_tag,
643                                                        lapin::options::BasicNackOptions {
644                                                            multiple: false,
645                                                            requeue: false,
646                                                        },
647                                                    )
648                                                    .await
649                                                {
650                                                    error!("Failed to nack message: {}", e);
651                                                }
652                                                return;
653                                            }
654                                        };
655
656                                        // Update headers with new retry_attempt, preserving existing headers
657                                        let mut updated_headers = update_headers_with_retry(
658                                            properties.headers().as_ref(),
659                                            retry_attempt + 1,
660                                        );
661
662                                        // Preserve correlation_id in headers if present
663                                        if let Some(corr_id) = &correlation_id {
664                                            updated_headers.insert(
665                                                HEADER_CORRELATION_ID.into(),
666                                                AMQPValue::LongString(corr_id.clone().into()),
667                                            );
668                                        }
669
670                                        // Send via appropriate strategy (TTL or DelayedExchange)
671                                        let send_result = if matches!(
672                                            retry_cfg.delay_strategy,
673                                            crate::retry::DelayStrategy::DelayedExchange
674                                        ) {
675                                            consumer_self
676                                                .send_to_delay_exchange_with_headers(
677                                                    &channel_clone,
678                                                    &payload_bytes,
679                                                    delay,
680                                                    updated_headers,
681                                                )
682                                                .await
683                                        } else {
684                                            consumer_self
685                                                .send_to_retry_queue_with_headers(
686                                                    &channel_clone,
687                                                    &payload_bytes,
688                                                    retry_attempt + 1,
689                                                    delay,
690                                                    updated_headers,
691                                                )
692                                                .await
693                                        };
694
695                                        if let Err(e) = send_result {
696                                            error!("Failed to send retry message: {}", e);
697                                            if let Err(e) = channel_clone
698                                                .basic_nack(
699                                                    delivery_tag,
700                                                    lapin::options::BasicNackOptions {
701                                                        multiple: false,
702                                                        requeue: false,
703                                                    },
704                                                )
705                                                .await
706                                            {
707                                                error!("Failed to nack message: {}", e);
708                                            }
709                                            return;
710                                        }
711
712                                        // ACK original message (it's now queued for retry)
713                                        if let Err(e) = channel_clone
714                                            .basic_ack(delivery_tag, BasicAckOptions::default())
715                                            .await
716                                        {
717                                            error!("Failed to ack message after retry: {}", e);
718                                        }
719                                    } else {
720                                        // No more retries, send to DLQ
721                                        warn!("Retry exhausted, sending to DLQ");
722                                        let payload_bytes = match serde_json::to_vec(&payload) {
723                                            Ok(bytes) => bytes,
724                                            Err(e) => {
725                                                error!(
726                                                    "Failed to serialize payload for DLQ: {}. Creating error payload.",
727                                                    e
728                                                );
729                                                // Create descriptive error payload instead of using raw delivery.data
730                                                serde_json::to_vec(&serde_json::json!({
731                                                    "error": "Failed to serialize payload for DLQ",
732                                                    "serialization_error": e.to_string(),
733                                                    "retry_attempt": retry_attempt,
734                                                    "original_message_size": delivery.data.len()
735                                                }))
736                                                .unwrap_or_else(|_| {
737                                                    // Last resort: minimal error message
738                                                    format!(
739                                                        r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
740                                                        retry_attempt
741                                                    )
742                                                    .into_bytes()
743                                                })
744                                            }
745                                        };
746                                        if let Err(e) = consumer_self
747                                            .send_to_dlq_simple(&channel_clone, &payload_bytes)
748                                            .await
749                                        {
750                                            error!("Failed to send to DLQ: {}", e);
751                                        }
752                                        if let Err(e) = channel_clone
753                                            .basic_ack(delivery_tag, BasicAckOptions::default())
754                                            .await
755                                        {
756                                            error!("Failed to ack message after DLQ: {}", e);
757                                        }
758                                    }
759                                } else {
760                                    // Retry exhausted, send to DLQ
761                                    warn!("Max retries reached, sending to DLQ");
762                                    let payload_bytes = match serde_json::to_vec(&payload) {
763                                        Ok(bytes) => bytes,
764                                        Err(e) => {
765                                            error!(
766                                                "Failed to serialize payload for DLQ: {}. Creating error payload.",
767                                                e
768                                            );
769                                            // Create descriptive error payload instead of using raw delivery.data
770                                            serde_json::to_vec(&serde_json::json!({
771                                                "error": "Failed to serialize payload for DLQ",
772                                                "serialization_error": e.to_string(),
773                                                "retry_attempt": retry_attempt,
774                                                "original_message_size": delivery.data.len()
775                                            }))
776                                            .unwrap_or_else(|_| {
777                                                // Last resort: minimal error message
778                                                format!(
779                                                    r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
780                                                    retry_attempt
781                                                )
782                                                .into_bytes()
783                                            })
784                                        }
785                                    };
786                                    if let Err(e) = consumer_self
787                                        .send_to_dlq_simple(&channel_clone, &payload_bytes)
788                                        .await
789                                    {
790                                        error!("Failed to send to DLQ: {}", e);
791                                    }
792                                    if let Err(e) = channel_clone
793                                        .basic_ack(delivery_tag, BasicAckOptions::default())
794                                        .await
795                                    {
796                                        error!("Failed to ack message after DLQ: {}", e);
797                                    }
798                                }
799                            } else {
800                                // No retry config, just nack
801                                if let Err(e) = channel_clone
802                                    .basic_nack(
803                                        delivery_tag,
804                                        lapin::options::BasicNackOptions {
805                                            multiple: false,
806                                            requeue: false,
807                                        },
808                                    )
809                                    .await
810                                {
811                                    error!("Failed to nack message: {}", e);
812                                }
813                            }
814                        }
815                    }
816                }
817            });
818        }
819
820        Ok(())
821    }
822
823    /// Setup queue and exchange infrastructure
824    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
825        // Declare queue
826        channel
827            .queue_declare(
828                &self.queue_name,
829                QueueDeclareOptions {
830                    durable: true,
831                    ..Default::default()
832                },
833                FieldTable::default(),
834            )
835            .await?;
836
837        // Bind to exchange if specified
838        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
839            channel
840                .queue_bind(
841                    &self.queue_name,
842                    exchange,
843                    routing_key,
844                    lapin::options::QueueBindOptions::default(),
845                    FieldTable::default(),
846                )
847                .await?;
848        }
849
850        // Setup delay exchange if using DelayedExchange strategy
851        if let Some(retry_config) = &self.retry_config {
852            if matches!(
853                retry_config.delay_strategy,
854                crate::retry::DelayStrategy::DelayedExchange
855            ) {
856                let delay_exchange = self.create_delay_exchange(channel).await?;
857
858                // Bind delay exchange to original queue
859                channel
860                    .queue_bind(
861                        &self.queue_name,
862                        &delay_exchange,
863                        &self.queue_name, // Routing key: original queue name
864                        lapin::options::QueueBindOptions::default(),
865                        FieldTable::default(),
866                    )
867                    .await?;
868
869                debug!(
870                    "Bound queue {} to delay exchange {}",
871                    self.queue_name, delay_exchange
872                );
873            }
874        }
875
876        Ok(())
877    }
878
879    /// Start consuming message envelopes with full retry support
880    pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
881    where
882        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
883        H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
884        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
885    {
886        let channel = self.connection.create_channel().await?;
887        let retry_config = self.retry_config.clone();
888
889        // Set prefetch count
890        channel
891            .basic_qos(
892                self.prefetch_count,
893                lapin::options::BasicQosOptions::default(),
894            )
895            .await?;
896
897        // Setup queue and exchange
898        self.setup_infrastructure(&channel).await?;
899
900        // Create consumer
901        let mut consumer = channel
902            .basic_consume(
903                &self.queue_name,
904                "rust-rabbit-envelope-consumer",
905                BasicConsumeOptions::default(),
906                FieldTable::default(),
907            )
908            .await?;
909
910        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
911
912        debug!(
913            "Started consuming envelopes from queue: {}",
914            self.queue_name
915        );
916
917        // Process message envelopes with retry support
918        while let Some(delivery_result) = consumer.next().await {
919            let delivery = delivery_result?;
920            let permit = semaphore.clone().acquire_owned().await.unwrap();
921            let handler_clone = handler.clone();
922            let auto_ack = self.auto_ack;
923            let channel_clone = Arc::new(channel.clone());
924            let retry_config_clone = retry_config.clone();
925            let queue_name = self.queue_name.clone();
926            let connection = self.connection.clone();
927
928            tokio::spawn(async move {
929                let _permit = permit;
930
931                // Try to deserialize as MessageEnvelope
932                match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
933                    Ok(mut envelope) => {
934                        debug!(
935                            "Processing envelope {} (attempt {}/{})",
936                            envelope.metadata.message_id,
937                            envelope.metadata.retry_attempt + 1,
938                            envelope.metadata.max_retries + 1
939                        );
940
941                        // Process message
942                        match handler_clone(envelope.clone()).await {
943                            Ok(()) => {
944                                if auto_ack {
945                                    if let Err(e) = channel_clone
946                                        .basic_ack(
947                                            delivery.delivery_tag,
948                                            BasicAckOptions::default(),
949                                        )
950                                        .await
951                                    {
952                                        error!("Failed to ack message: {}", e);
953                                    }
954                                }
955                                debug!(
956                                    "Envelope {} processed successfully",
957                                    envelope.metadata.message_id
958                                );
959                            }
960                            Err(e) => {
961                                error!(
962                                    "Handler error for envelope {}: {}",
963                                    envelope.metadata.message_id, e
964                                );
965
966                                // Determine error type (simplified classification)
967                                let error_type = classify_error(e.as_ref());
968
969                                // Add error to envelope
970                                envelope = envelope.with_error(
971                                    &e.to_string(),
972                                    error_type,
973                                    Some(&format!("Queue: {}", queue_name)),
974                                );
975
976                                if auto_ack {
977                                    // Check if we should retry
978                                    if let Some(retry_cfg) = &retry_config_clone {
979                                        if !envelope.is_retry_exhausted() {
980                                            // Calculate delay and schedule retry
981                                            if let Some(delay) = retry_cfg
982                                                .calculate_delay(envelope.metadata.retry_attempt)
983                                            {
984                                                warn!(
985                                                    "Scheduling retry {} for envelope {} with delay {:?}",
986                                                    envelope.metadata.retry_attempt + 1,
987                                                    envelope.metadata.message_id,
988                                                    delay
989                                                );
990
991                                                // Increment retry attempt in envelope
992                                                envelope.metadata.retry_attempt += 1;
993
994                                                // Serialize updated envelope
995                                                match serde_json::to_vec(&envelope) {
996                                                    Ok(retry_payload) => {
997                                                        // Create consumer instance for access to methods
998                                                        let consumer_self = Consumer {
999                                                            connection: connection.clone(),
1000                                                            queue_name: queue_name.clone(),
1001                                                            exchange_name: None,
1002                                                            routing_key: None,
1003                                                            retry_config: retry_config_clone
1004                                                                .clone(),
1005                                                            prefetch_count: 10,
1006                                                            auto_ack: true,
1007                                                        };
1008
1009                                                        // Send via appropriate strategy (TTL or DelayedExchange)
1010                                                        let send_result = if matches!(
1011                                                            retry_config_clone
1012                                                                .as_ref()
1013                                                                .map(|c| c.delay_strategy),
1014                                                            Some(crate::retry::DelayStrategy::DelayedExchange)
1015                                                        ) {
1016                                                            consumer_self
1017                                                                .send_to_delay_exchange(
1018                                                                    &channel_clone,
1019                                                                    &retry_payload,
1020                                                                    delay,
1021                                                                )
1022                                                                .await
1023                                                        } else {
1024                                                            consumer_self
1025                                                                .send_to_retry_queue(
1026                                                                    &channel_clone,
1027                                                                    &retry_payload,
1028                                                                    envelope.metadata.retry_attempt,
1029                                                                    delay,
1030                                                                )
1031                                                                .await
1032                                                        };
1033
1034                                                        if let Err(e) = send_result {
1035                                                            error!("Failed to send envelope for retry: {}", e);
1036                                                            // Fallback to simple nack
1037                                                            if let Err(e) = channel_clone
1038                                                                .basic_nack(
1039                                                                    delivery.delivery_tag,
1040                                                                    lapin::options::BasicNackOptions {
1041                                                                        multiple: false,
1042                                                                        requeue: false,
1043                                                                    },
1044                                                                )
1045                                                                .await
1046                                                            {
1047                                                                error!("Failed to nack message: {}", e);
1048                                                            }
1049                                                            return;
1050                                                        }
1051
1052                                                        // ACK original message (it's now queued for retry)
1053                                                        if let Err(e) = channel_clone
1054                                                            .basic_ack(
1055                                                                delivery.delivery_tag,
1056                                                                BasicAckOptions::default(),
1057                                                            )
1058                                                            .await
1059                                                        {
1060                                                            error!("Failed to ack message after retry: {}", e);
1061                                                        }
1062                                                    }
1063                                                    Err(e) => {
1064                                                        error!("Failed to serialize envelope for retry: {}", e);
1065                                                        // Fallback to simple nack
1066                                                        if let Err(e) = channel_clone
1067                                                            .basic_nack(
1068                                                                delivery.delivery_tag,
1069                                                                lapin::options::BasicNackOptions {
1070                                                                    multiple: false,
1071                                                                    requeue: false,
1072                                                                },
1073                                                            )
1074                                                            .await
1075                                                        {
1076                                                            error!("Failed to nack message: {}", e);
1077                                                        }
1078                                                    }
1079                                                }
1080                                            } else {
1081                                                // No more retries, send to DLQ
1082                                                Self::send_to_dlq(
1083                                                    &envelope,
1084                                                    retry_cfg,
1085                                                    &connection,
1086                                                    &queue_name,
1087                                                )
1088                                                .await;
1089
1090                                                // ACK original message
1091                                                if let Err(e) = channel_clone
1092                                                    .basic_ack(
1093                                                        delivery.delivery_tag,
1094                                                        BasicAckOptions::default(),
1095                                                    )
1096                                                    .await
1097                                                {
1098                                                    error!(
1099                                                        "Failed to ack message after DLQ: {}",
1100                                                        e
1101                                                    );
1102                                                }
1103                                            }
1104                                        } else {
1105                                            // Retry exhausted, send to DLQ
1106                                            warn!(
1107                                                "Retry exhausted for envelope {}",
1108                                                envelope.metadata.message_id
1109                                            );
1110                                            Self::send_to_dlq(
1111                                                &envelope,
1112                                                retry_cfg,
1113                                                &connection,
1114                                                &queue_name,
1115                                            )
1116                                            .await;
1117
1118                                            // ACK original message
1119                                            if let Err(e) = channel_clone
1120                                                .basic_ack(
1121                                                    delivery.delivery_tag,
1122                                                    BasicAckOptions::default(),
1123                                                )
1124                                                .await
1125                                            {
1126                                                error!("Failed to ack message after DLQ: {}", e);
1127                                            }
1128                                        }
1129                                    } else {
1130                                        // No retry config, just nack
1131                                        if let Err(e) = channel_clone
1132                                            .basic_nack(
1133                                                delivery.delivery_tag,
1134                                                lapin::options::BasicNackOptions {
1135                                                    multiple: false,
1136                                                    requeue: false,
1137                                                },
1138                                            )
1139                                            .await
1140                                        {
1141                                            error!("Failed to nack message: {}", e);
1142                                        }
1143                                    }
1144                                }
1145                            }
1146                        }
1147                    }
1148                    Err(e) => {
1149                        error!("Failed to deserialize message envelope: {}", e);
1150                        if auto_ack {
1151                            // Reject malformed messages
1152                            if let Err(e) = channel_clone
1153                                .basic_nack(
1154                                    delivery.delivery_tag,
1155                                    lapin::options::BasicNackOptions {
1156                                        multiple: false,
1157                                        requeue: false,
1158                                    },
1159                                )
1160                                .await
1161                            {
1162                                error!("Failed to nack malformed envelope: {}", e);
1163                            }
1164                        }
1165                    }
1166                }
1167            });
1168        }
1169
1170        Ok(())
1171    }
1172
1173    /// Send failed message to Dead Letter Queue
1174    async fn send_to_dlq<T>(
1175        envelope: &MessageEnvelope<T>,
1176        retry_config: &RetryConfig,
1177        connection: &Arc<Connection>,
1178        queue_name: &str,
1179    ) where
1180        T: serde::Serialize,
1181    {
1182        match connection.create_channel().await {
1183            Ok(dlq_channel) => {
1184                let dlq_name = retry_config.get_dead_letter_queue(queue_name);
1185
1186                // Declare DLQ
1187                if let Err(e) = dlq_channel
1188                    .queue_declare(
1189                        &dlq_name,
1190                        QueueDeclareOptions {
1191                            durable: true,
1192                            ..Default::default()
1193                        },
1194                        FieldTable::default(),
1195                    )
1196                    .await
1197                {
1198                    error!("Failed to declare DLQ {}: {}", dlq_name, e);
1199                    return;
1200                }
1201
1202                // Publish to DLQ with failure summary
1203                let failure_summary = envelope.get_failure_summary();
1204                let dlq_payload = serde_json::json!({
1205                    "envelope": envelope,
1206                    "failure_summary": failure_summary,
1207                    "sent_to_dlq_at": chrono::Utc::now(),
1208                });
1209
1210                if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
1211                    if let Err(e) = dlq_channel
1212                        .basic_publish(
1213                            "",
1214                            &dlq_name,
1215                            lapin::options::BasicPublishOptions::default(),
1216                            &payload_bytes,
1217                            lapin::BasicProperties::default(),
1218                        )
1219                        .await
1220                    {
1221                        error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1222                    } else {
1223                        warn!(
1224                            "Sent envelope {} to DLQ: {}",
1225                            envelope.metadata.message_id, failure_summary
1226                        );
1227                    }
1228                }
1229            }
1230            Err(e) => {
1231                error!("Failed to create DLQ channel: {}", e);
1232            }
1233        }
1234    }
1235}
1236
1237/// Classify error type based on error message (simplified heuristics)
1238fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1239    let error_msg = error.to_string().to_lowercase();
1240
1241    if error_msg.contains("timeout")
1242        || error_msg.contains("connection")
1243        || error_msg.contains("network")
1244        || error_msg.contains("temporary")
1245    {
1246        ErrorType::Transient
1247    } else if error_msg.contains("rate limit")
1248        || error_msg.contains("quota")
1249        || error_msg.contains("resource")
1250    {
1251        ErrorType::Resource
1252    } else if error_msg.contains("validation")
1253        || error_msg.contains("authentication")
1254        || error_msg.contains("authorization")
1255        || error_msg.contains("invalid")
1256        || error_msg.contains("bad request")
1257    {
1258        ErrorType::Permanent
1259    } else {
1260        ErrorType::Unknown
1261    }
1262}