rust_rabbit/
consumer.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{ErrorType, MassTransitEnvelope, MessageEnvelope},
5    retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9    options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10    types::{AMQPValue, FieldTable},
11    BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::Semaphore;
19use tracing::{debug, error, warn};
20
21// Helper functions for reading/writing headers
22const HEADER_RETRY_ATTEMPT: &str = "x-retry-attempt";
23const HEADER_CORRELATION_ID: &str = "x-correlation-id";
24
25/// Read retry_attempt from AMQP headers, defaulting to 0 if not present
26fn read_retry_attempt(properties: &BasicProperties) -> u32 {
27    if let Some(headers) = properties.headers() {
28        // Iterate over headers to find the key
29        for (header_key, value) in headers {
30            if header_key.as_str() == HEADER_RETRY_ATTEMPT {
31                if let AMQPValue::LongLongInt(attempt) = value {
32                    return *attempt as u32;
33                }
34            }
35        }
36    }
37    0
38}
39
40/// Read correlation_id from AMQP headers
41fn read_correlation_id(properties: &BasicProperties) -> Option<String> {
42    if let Some(headers) = properties.headers() {
43        // Iterate over headers to find the key
44        for (header_key, value) in headers {
45            if header_key.as_str() == HEADER_CORRELATION_ID {
46                if let AMQPValue::LongString(corr_id) = value {
47                    return Some(corr_id.to_string());
48                }
49            }
50        }
51    }
52    None
53}
54
55/// Create headers with retry_attempt and correlation_id
56///
57/// This function is kept for potential future use or testing scenarios
58/// where header creation is needed outside of the main message processing flow.
59#[allow(dead_code)]
60fn create_headers(retry_attempt: u32, correlation_id: Option<&str>) -> FieldTable {
61    let mut headers = FieldTable::default();
62    headers.insert(
63        HEADER_RETRY_ATTEMPT.into(),
64        AMQPValue::LongLongInt(retry_attempt as i64),
65    );
66    if let Some(corr_id) = correlation_id {
67        headers.insert(
68            HEADER_CORRELATION_ID.into(),
69            AMQPValue::LongString(corr_id.into()),
70        );
71    }
72    headers
73}
74
75/// Update headers with new retry_attempt, preserving existing headers
76fn update_headers_with_retry(
77    existing_headers: Option<&FieldTable>,
78    retry_attempt: u32,
79) -> FieldTable {
80    let mut headers = match existing_headers {
81        Some(h) => h.clone(),
82        None => FieldTable::default(),
83    };
84    headers.insert(
85        HEADER_RETRY_ATTEMPT.into(),
86        AMQPValue::LongLongInt(retry_attempt as i64),
87    );
88    headers
89}
90
91/// Consumer configuration builder
92pub struct ConsumerBuilder {
93    connection: Arc<Connection>,
94    queue_name: String,
95    exchange_name: Option<String>,
96    routing_key: Option<String>,
97    retry_config: Option<RetryConfig>,
98    prefetch_count: Option<u16>,
99    auto_ack: bool,
100}
101
102impl ConsumerBuilder {
103    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
104        Self {
105            connection,
106            queue_name: queue_name.into(),
107            exchange_name: None,
108            routing_key: None,
109            retry_config: None,
110            prefetch_count: Some(10),
111            auto_ack: true,
112        }
113    }
114
115    /// Bind to an exchange with routing key
116    pub fn bind_to_exchange(
117        mut self,
118        exchange: impl Into<String>,
119        routing_key: impl Into<String>,
120    ) -> Self {
121        self.exchange_name = Some(exchange.into());
122        self.routing_key = Some(routing_key.into());
123        self
124    }
125
126    /// Set routing key (for use with bind_to_exchange)
127    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
128        self.routing_key = Some(routing_key.into());
129        self
130    }
131
132    /// Configure retry behavior
133    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
134        self.retry_config = Some(retry_config);
135        self
136    }
137
138    /// Set TTL for dead letter queue (auto-cleanup failed messages)
139    /// This is a convenience method that modifies the retry_config if it exists
140    ///
141    /// # Example
142    /// ```ignore
143    /// let consumer = Consumer::builder(connection, "orders")
144    ///     .with_retry(RetryConfig::exponential_default())
145    ///     .with_dlq_ttl(Duration::from_secs(86400))  // 1 day
146    ///     .build();
147    /// ```
148    pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
149        if let Some(retry_config) = self.retry_config.as_mut() {
150            retry_config.dlq_ttl = Some(ttl);
151        }
152        self
153    }
154
155    /// Set prefetch count
156    pub fn with_prefetch(mut self, count: u16) -> Self {
157        self.prefetch_count = Some(count);
158        self
159    }
160
161    /// Disable auto-acknowledge (manual ack required)
162    pub fn manual_ack(mut self) -> Self {
163        self.auto_ack = false;
164        self
165    }
166
167    /// Build the consumer
168    pub fn build(self) -> Consumer {
169        Consumer {
170            connection: self.connection,
171            queue_name: self.queue_name,
172            exchange_name: self.exchange_name,
173            routing_key: self.routing_key,
174            retry_config: self.retry_config,
175            prefetch_count: self.prefetch_count.unwrap_or(10),
176            auto_ack: self.auto_ack,
177        }
178    }
179}
180
181/// Simplified Consumer for message consumption
182pub struct Consumer {
183    connection: Arc<Connection>,
184    queue_name: String,
185    exchange_name: Option<String>,
186    routing_key: Option<String>,
187    retry_config: Option<RetryConfig>,
188    prefetch_count: u16,
189    auto_ack: bool,
190}
191
192impl Consumer {
193    /// Create a new consumer builder
194    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
195        ConsumerBuilder::new(connection, queue_name)
196    }
197
198    /// Create retry queue with TTL
199    async fn create_retry_queue(
200        &self,
201        channel: &Channel,
202        retry_attempt: u32,
203        delay: std::time::Duration,
204    ) -> Result<String, RustRabbitError> {
205        let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
206        let delay_ms = delay.as_millis() as i64;
207
208        // Create retry queue with TTL that routes back to original queue
209        let mut args = FieldTable::default();
210        args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
211        args.insert(
212            "x-dead-letter-exchange".into(),
213            AMQPValue::LongString("".into()),
214        ); // Default exchange
215        args.insert(
216            "x-dead-letter-routing-key".into(),
217            AMQPValue::LongString(self.queue_name.clone().into()),
218        );
219
220        channel
221            .queue_declare(
222                &retry_queue_name,
223                QueueDeclareOptions {
224                    durable: true,
225                    ..Default::default()
226                },
227                args,
228            )
229            .await?;
230
231        debug!(
232            "Created retry queue: {} with TTL: {}ms",
233            retry_queue_name, delay_ms
234        );
235        Ok(retry_queue_name)
236    }
237
238    /// Create DLQ (Dead Letter Queue) with optional TTL
239    async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
240        let dlq_name = format!("{}.dlq", self.queue_name);
241
242        // Build queue arguments with optional TTL
243        let mut args = FieldTable::default();
244        if let Some(retry_config) = &self.retry_config {
245            if let Some(ttl) = &retry_config.dlq_ttl {
246                let ttl_ms = ttl.as_millis() as i64;
247                args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
248                debug!("DLQ TTL: {}ms", ttl_ms);
249            }
250        }
251
252        channel
253            .queue_declare(
254                &dlq_name,
255                QueueDeclareOptions {
256                    durable: true,
257                    ..Default::default()
258                },
259                args,
260            )
261            .await?;
262
263        debug!("Created DLQ: {}", dlq_name);
264        Ok(dlq_name)
265    }
266
267    /// Send message to retry queue with delay
268    async fn send_to_retry_queue(
269        &self,
270        channel: &Channel,
271        message_data: &[u8],
272        retry_attempt: u32,
273        delay: std::time::Duration,
274    ) -> Result<(), RustRabbitError> {
275        let retry_queue_name = self
276            .create_retry_queue(channel, retry_attempt, delay)
277            .await?;
278
279        // Publish to retry queue
280        channel
281            .basic_publish(
282                "", // Default exchange
283                &retry_queue_name,
284                BasicPublishOptions::default(),
285                message_data,
286                BasicProperties::default()
287                    .with_content_type("application/json".into())
288                    .with_delivery_mode(2), // Persistent
289            )
290            .await?
291            .await?;
292
293        debug!("Sent message to retry queue: {}", retry_queue_name);
294        Ok(())
295    }
296
297    /// Send message to retry queue with delay and custom headers
298    async fn send_to_retry_queue_with_headers(
299        &self,
300        channel: &Channel,
301        message_data: &[u8],
302        retry_attempt: u32,
303        delay: std::time::Duration,
304        headers: FieldTable,
305    ) -> Result<(), RustRabbitError> {
306        let retry_queue_name = self
307            .create_retry_queue(channel, retry_attempt, delay)
308            .await?;
309
310        // Publish to retry queue with headers
311        channel
312            .basic_publish(
313                "", // Default exchange
314                &retry_queue_name,
315                BasicPublishOptions::default(),
316                message_data,
317                BasicProperties::default()
318                    .with_content_type("application/json".into())
319                    .with_delivery_mode(2) // Persistent
320                    .with_headers(headers),
321            )
322            .await?
323            .await?;
324
325        debug!(
326            "Sent message to retry queue with headers: {}",
327            retry_queue_name
328        );
329        Ok(())
330    }
331
332    /// Send message to DLQ
333    async fn send_to_dlq_simple(
334        &self,
335        channel: &Channel,
336        message_data: &[u8],
337    ) -> Result<(), RustRabbitError> {
338        let dlq_name = self.create_dlq(channel).await?;
339
340        // Publish to DLQ
341        channel
342            .basic_publish(
343                "", // Default exchange
344                &dlq_name,
345                BasicPublishOptions::default(),
346                message_data,
347                BasicProperties::default()
348                    .with_content_type("application/json".into())
349                    .with_delivery_mode(2), // Persistent
350            )
351            .await?
352            .await?;
353
354        debug!("Sent message to DLQ: {}", dlq_name);
355        Ok(())
356    }
357
358    /// Create delay exchange using RabbitMQ delayed message exchange plugin
359    /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
360    async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
361        if let Some(retry_config) = &self.retry_config {
362            let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
363
364            // Declare delay exchange with x-delayed-type argument
365            let mut args = FieldTable::default();
366            args.insert(
367                "x-delayed-type".into(),
368                AMQPValue::LongString("direct".into()),
369            );
370
371            channel
372                .exchange_declare(
373                    &delay_exchange,
374                    lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
375                    lapin::options::ExchangeDeclareOptions {
376                        durable: true,
377                        ..Default::default()
378                    },
379                    args,
380                )
381                .await?;
382
383            debug!(
384                "Created delay exchange: {} (x-delayed-message type)",
385                delay_exchange
386            );
387            Ok(delay_exchange)
388        } else {
389            Err(RustRabbitError::Retry(
390                "Retry config not configured".to_string(),
391            ))
392        }
393    }
394
395    /// Send message to delay exchange with x-delay header for retry
396    /// Message will be automatically routed back to the original queue after delay
397    async fn send_to_delay_exchange(
398        &self,
399        channel: &Channel,
400        message_data: &[u8],
401        delay: std::time::Duration,
402    ) -> Result<(), RustRabbitError> {
403        let delay_exchange = self.create_delay_exchange(channel).await?;
404        let delay_ms = delay.as_millis() as i64;
405
406        // Publish to delay exchange with x-delay header
407        // The message will be re-delivered to original queue after delay
408        channel
409            .basic_publish(
410                &delay_exchange,
411                &self.queue_name, // Routing key: original queue name
412                BasicPublishOptions::default(),
413                message_data,
414                BasicProperties::default()
415                    .with_content_type("application/json".into())
416                    .with_delivery_mode(2) // Persistent
417                    .with_headers({
418                        let mut headers = FieldTable::default();
419                        headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
420                        headers
421                    }),
422            )
423            .await?
424            .await?;
425
426        debug!(
427            "Sent message to delay exchange: {} with delay: {}ms",
428            delay_exchange, delay_ms
429        );
430        Ok(())
431    }
432
433    /// Send message to delay exchange with x-delay header and custom headers for retry
434    /// Message will be automatically routed back to the original queue after delay
435    async fn send_to_delay_exchange_with_headers(
436        &self,
437        channel: &Channel,
438        message_data: &[u8],
439        delay: std::time::Duration,
440        mut headers: FieldTable,
441    ) -> Result<(), RustRabbitError> {
442        let delay_exchange = self.create_delay_exchange(channel).await?;
443        let delay_ms = delay.as_millis() as i64;
444
445        // Add x-delay header to existing headers
446        headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
447
448        // Publish to delay exchange with headers
449        // The message will be re-delivered to original queue after delay
450        channel
451            .basic_publish(
452                &delay_exchange,
453                &self.queue_name, // Routing key: original queue name
454                BasicPublishOptions::default(),
455                message_data,
456                BasicProperties::default()
457                    .with_content_type("application/json".into())
458                    .with_delivery_mode(2) // Persistent
459                    .with_headers(headers),
460            )
461            .await?
462            .await?;
463
464        debug!(
465            "Sent message to delay exchange with headers: {} with delay: {}ms",
466            delay_exchange, delay_ms
467        );
468        Ok(())
469    }
470
471    /// Start consuming messages with smart MassTransit detection
472    /// Handler receives just the payload type T (no wrapper)
473    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
474    where
475        T: DeserializeOwned + Send + Clone + Sync + 'static + Serialize,
476        H: Fn(T) -> Fut + Send + Sync + Clone + 'static,
477        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
478    {
479        let channel = self.connection.create_channel().await?;
480
481        // Set prefetch count
482        channel
483            .basic_qos(
484                self.prefetch_count,
485                lapin::options::BasicQosOptions::default(),
486            )
487            .await?;
488
489        // Setup infrastructure (queues, exchanges)
490        self.setup_infrastructure(&channel).await?;
491
492        // Start consuming
493        let mut consumer = channel
494            .basic_consume(
495                &self.queue_name,
496                "",
497                BasicConsumeOptions::default(),
498                FieldTable::default(),
499            )
500            .await?;
501
502        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
503
504        debug!("Started consuming from queue: {}", self.queue_name);
505
506        // Process messages
507        while let Some(delivery_result) = consumer.next().await {
508            let delivery = delivery_result?;
509            let permit = semaphore.clone().acquire_owned().await.unwrap();
510            let handler_clone = handler.clone();
511            let auto_ack = self.auto_ack;
512            let channel_clone = Arc::new(channel.clone());
513            let retry_config = self.retry_config.clone();
514            let consumer_self = Consumer {
515                connection: self.connection.clone(),
516                queue_name: self.queue_name.clone(),
517                exchange_name: self.exchange_name.clone(),
518                routing_key: self.routing_key.clone(),
519                retry_config: self.retry_config.clone(),
520                prefetch_count: self.prefetch_count,
521                auto_ack: self.auto_ack,
522            };
523
524            tokio::spawn(async move {
525                let _permit = permit;
526                let delivery_tag = delivery.delivery_tag;
527                let properties = delivery.properties;
528
529                // Smart detection: Try MassTransit format first
530                let (payload, correlation_id_from_mt) =
531                    match MassTransitEnvelope::from_slice(&delivery.data) {
532                        Ok(mt_envelope) => {
533                            // MassTransit format detected - extract payload
534                            match mt_envelope.extract_message::<T>() {
535                                Ok(data) => {
536                                    debug!("Detected MassTransit format, extracted payload");
537                                    (data, mt_envelope.correlation_id().map(|s| s.to_string()))
538                                }
539                                Err(e) => {
540                                    error!(
541                                        "Failed to extract payload from MassTransit envelope: {}",
542                                        e
543                                    );
544                                    if auto_ack {
545                                        if let Err(e) = channel_clone
546                                            .basic_nack(
547                                                delivery_tag,
548                                                lapin::options::BasicNackOptions {
549                                                    multiple: false,
550                                                    requeue: false,
551                                                },
552                                            )
553                                            .await
554                                        {
555                                            error!("Failed to nack malformed message: {}", e);
556                                        }
557                                    }
558                                    return;
559                                }
560                            }
561                        }
562                        Err(_) => {
563                            // Not MassTransit format - try direct deserialization
564                            match serde_json::from_slice::<T>(&delivery.data) {
565                                Ok(data) => {
566                                    debug!("Direct format detected");
567                                    (data, None)
568                                }
569                                Err(e) => {
570                                    error!("Failed to deserialize message: {}", e);
571                                    if auto_ack {
572                                        if let Err(e) = channel_clone
573                                            .basic_nack(
574                                                delivery_tag,
575                                                lapin::options::BasicNackOptions {
576                                                    multiple: false,
577                                                    requeue: false,
578                                                },
579                                            )
580                                            .await
581                                        {
582                                            error!("Failed to nack malformed message: {}", e);
583                                        }
584                                    }
585                                    return;
586                                }
587                            }
588                        }
589                    };
590
591                // Read metadata from headers
592                let retry_attempt = read_retry_attempt(&properties);
593                let correlation_id =
594                    correlation_id_from_mt.or_else(|| read_correlation_id(&properties));
595
596                // If we got correlation_id from MassTransit but it's not in headers, we should add it
597                // But for now, we'll just use it for retries
598
599                // Process message - handler receives just T
600                match handler_clone(payload.clone()).await {
601                    Ok(()) => {
602                        if auto_ack {
603                            if let Err(e) = channel_clone
604                                .basic_ack(delivery_tag, BasicAckOptions::default())
605                                .await
606                            {
607                                error!("Failed to ack message: {}", e);
608                            }
609                        }
610                        debug!("Message processed successfully");
611                    }
612                    Err(e) => {
613                        error!("Handler error: {}", e);
614                        if auto_ack {
615                            // Check if retry is configured
616                            if let Some(retry_cfg) = &retry_config {
617                                if retry_attempt < retry_cfg.max_retries {
618                                    // Calculate delay for next retry
619                                    if let Some(delay) = retry_cfg.calculate_delay(retry_attempt) {
620                                        warn!(
621                                            "Scheduling retry {} with delay {:?} for message",
622                                            retry_attempt + 1,
623                                            delay
624                                        );
625
626                                        // Serialize payload (raw, no wrapper)
627                                        let payload_bytes = match serde_json::to_vec(&payload) {
628                                            Ok(bytes) => bytes,
629                                            Err(e) => {
630                                                error!(
631                                                    "Failed to serialize payload for retry: {}",
632                                                    e
633                                                );
634                                                if let Err(e) = channel_clone
635                                                    .basic_nack(
636                                                        delivery_tag,
637                                                        lapin::options::BasicNackOptions {
638                                                            multiple: false,
639                                                            requeue: false,
640                                                        },
641                                                    )
642                                                    .await
643                                                {
644                                                    error!("Failed to nack message: {}", e);
645                                                }
646                                                return;
647                                            }
648                                        };
649
650                                        // Update headers with new retry_attempt, preserving existing headers
651                                        let mut updated_headers = update_headers_with_retry(
652                                            properties.headers().as_ref(),
653                                            retry_attempt + 1,
654                                        );
655
656                                        // Preserve correlation_id in headers if present
657                                        if let Some(corr_id) = &correlation_id {
658                                            updated_headers.insert(
659                                                HEADER_CORRELATION_ID.into(),
660                                                AMQPValue::LongString(corr_id.clone().into()),
661                                            );
662                                        }
663
664                                        // Send via appropriate strategy (TTL or DelayedExchange)
665                                        let send_result = if matches!(
666                                            retry_cfg.delay_strategy,
667                                            crate::retry::DelayStrategy::DelayedExchange
668                                        ) {
669                                            consumer_self
670                                                .send_to_delay_exchange_with_headers(
671                                                    &channel_clone,
672                                                    &payload_bytes,
673                                                    delay,
674                                                    updated_headers,
675                                                )
676                                                .await
677                                        } else {
678                                            consumer_self
679                                                .send_to_retry_queue_with_headers(
680                                                    &channel_clone,
681                                                    &payload_bytes,
682                                                    retry_attempt + 1,
683                                                    delay,
684                                                    updated_headers,
685                                                )
686                                                .await
687                                        };
688
689                                        if let Err(e) = send_result {
690                                            error!("Failed to send retry message: {}", e);
691                                            if let Err(e) = channel_clone
692                                                .basic_nack(
693                                                    delivery_tag,
694                                                    lapin::options::BasicNackOptions {
695                                                        multiple: false,
696                                                        requeue: false,
697                                                    },
698                                                )
699                                                .await
700                                            {
701                                                error!("Failed to nack message: {}", e);
702                                            }
703                                            return;
704                                        }
705
706                                        // ACK original message (it's now queued for retry)
707                                        if let Err(e) = channel_clone
708                                            .basic_ack(delivery_tag, BasicAckOptions::default())
709                                            .await
710                                        {
711                                            error!("Failed to ack message after retry: {}", e);
712                                        }
713                                    } else {
714                                        // No more retries, send to DLQ
715                                        warn!("Retry exhausted, sending to DLQ");
716                                        let payload_bytes = match serde_json::to_vec(&payload) {
717                                            Ok(bytes) => bytes,
718                                            Err(e) => {
719                                                error!(
720                                                    "Failed to serialize payload for DLQ: {}. Creating error payload.",
721                                                    e
722                                                );
723                                                // Create descriptive error payload instead of using raw delivery.data
724                                                serde_json::to_vec(&serde_json::json!({
725                                                    "error": "Failed to serialize payload for DLQ",
726                                                    "serialization_error": e.to_string(),
727                                                    "retry_attempt": retry_attempt,
728                                                    "original_message_size": delivery.data.len()
729                                                }))
730                                                .unwrap_or_else(|_| {
731                                                    // Last resort: minimal error message
732                                                    format!(
733                                                        r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
734                                                        retry_attempt
735                                                    )
736                                                    .into_bytes()
737                                                })
738                                            }
739                                        };
740                                        if let Err(e) = consumer_self
741                                            .send_to_dlq_simple(&channel_clone, &payload_bytes)
742                                            .await
743                                        {
744                                            error!("Failed to send to DLQ: {}", e);
745                                        }
746                                        if let Err(e) = channel_clone
747                                            .basic_ack(delivery_tag, BasicAckOptions::default())
748                                            .await
749                                        {
750                                            error!("Failed to ack message after DLQ: {}", e);
751                                        }
752                                    }
753                                } else {
754                                    // Retry exhausted, send to DLQ
755                                    warn!("Max retries reached, sending to DLQ");
756                                    let payload_bytes = match serde_json::to_vec(&payload) {
757                                        Ok(bytes) => bytes,
758                                        Err(e) => {
759                                            error!(
760                                                "Failed to serialize payload for DLQ: {}. Creating error payload.",
761                                                e
762                                            );
763                                            // Create descriptive error payload instead of using raw delivery.data
764                                            serde_json::to_vec(&serde_json::json!({
765                                                "error": "Failed to serialize payload for DLQ",
766                                                "serialization_error": e.to_string(),
767                                                "retry_attempt": retry_attempt,
768                                                "original_message_size": delivery.data.len()
769                                            }))
770                                            .unwrap_or_else(|_| {
771                                                // Last resort: minimal error message
772                                                format!(
773                                                    r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
774                                                    retry_attempt
775                                                )
776                                                .into_bytes()
777                                            })
778                                        }
779                                    };
780                                    if let Err(e) = consumer_self
781                                        .send_to_dlq_simple(&channel_clone, &payload_bytes)
782                                        .await
783                                    {
784                                        error!("Failed to send to DLQ: {}", e);
785                                    }
786                                    if let Err(e) = channel_clone
787                                        .basic_ack(delivery_tag, BasicAckOptions::default())
788                                        .await
789                                    {
790                                        error!("Failed to ack message after DLQ: {}", e);
791                                    }
792                                }
793                            } else {
794                                // No retry config, just nack
795                                if let Err(e) = channel_clone
796                                    .basic_nack(
797                                        delivery_tag,
798                                        lapin::options::BasicNackOptions {
799                                            multiple: false,
800                                            requeue: false,
801                                        },
802                                    )
803                                    .await
804                                {
805                                    error!("Failed to nack message: {}", e);
806                                }
807                            }
808                        }
809                    }
810                }
811            });
812        }
813
814        Ok(())
815    }
816
817    /// Setup queue and exchange infrastructure
818    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
819        // Declare queue
820        channel
821            .queue_declare(
822                &self.queue_name,
823                QueueDeclareOptions {
824                    durable: true,
825                    ..Default::default()
826                },
827                FieldTable::default(),
828            )
829            .await?;
830
831        // Bind to exchange if specified
832        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
833            channel
834                .queue_bind(
835                    &self.queue_name,
836                    exchange,
837                    routing_key,
838                    lapin::options::QueueBindOptions::default(),
839                    FieldTable::default(),
840                )
841                .await?;
842        }
843
844        // Setup delay exchange if using DelayedExchange strategy
845        if let Some(retry_config) = &self.retry_config {
846            if matches!(
847                retry_config.delay_strategy,
848                crate::retry::DelayStrategy::DelayedExchange
849            ) {
850                let delay_exchange = self.create_delay_exchange(channel).await?;
851
852                // Bind delay exchange to original queue
853                channel
854                    .queue_bind(
855                        &self.queue_name,
856                        &delay_exchange,
857                        &self.queue_name, // Routing key: original queue name
858                        lapin::options::QueueBindOptions::default(),
859                        FieldTable::default(),
860                    )
861                    .await?;
862
863                debug!(
864                    "Bound queue {} to delay exchange {}",
865                    self.queue_name, delay_exchange
866                );
867            }
868        }
869
870        Ok(())
871    }
872
873    /// Start consuming message envelopes with full retry support
874    pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
875    where
876        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
877        H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
878        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
879    {
880        let channel = self.connection.create_channel().await?;
881        let retry_config = self.retry_config.clone();
882
883        // Set prefetch count
884        channel
885            .basic_qos(
886                self.prefetch_count,
887                lapin::options::BasicQosOptions::default(),
888            )
889            .await?;
890
891        // Setup queue and exchange
892        self.setup_infrastructure(&channel).await?;
893
894        // Create consumer
895        let mut consumer = channel
896            .basic_consume(
897                &self.queue_name,
898                "rust-rabbit-envelope-consumer",
899                BasicConsumeOptions::default(),
900                FieldTable::default(),
901            )
902            .await?;
903
904        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
905
906        debug!(
907            "Started consuming envelopes from queue: {}",
908            self.queue_name
909        );
910
911        // Process message envelopes with retry support
912        while let Some(delivery_result) = consumer.next().await {
913            let delivery = delivery_result?;
914            let permit = semaphore.clone().acquire_owned().await.unwrap();
915            let handler_clone = handler.clone();
916            let auto_ack = self.auto_ack;
917            let channel_clone = Arc::new(channel.clone());
918            let retry_config_clone = retry_config.clone();
919            let queue_name = self.queue_name.clone();
920            let connection = self.connection.clone();
921
922            tokio::spawn(async move {
923                let _permit = permit;
924
925                // Try to deserialize as MessageEnvelope
926                match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
927                    Ok(mut envelope) => {
928                        debug!(
929                            "Processing envelope {} (attempt {}/{})",
930                            envelope.metadata.message_id,
931                            envelope.metadata.retry_attempt + 1,
932                            envelope.metadata.max_retries + 1
933                        );
934
935                        // Process message
936                        match handler_clone(envelope.clone()).await {
937                            Ok(()) => {
938                                if auto_ack {
939                                    if let Err(e) = channel_clone
940                                        .basic_ack(
941                                            delivery.delivery_tag,
942                                            BasicAckOptions::default(),
943                                        )
944                                        .await
945                                    {
946                                        error!("Failed to ack message: {}", e);
947                                    }
948                                }
949                                debug!(
950                                    "Envelope {} processed successfully",
951                                    envelope.metadata.message_id
952                                );
953                            }
954                            Err(e) => {
955                                error!(
956                                    "Handler error for envelope {}: {}",
957                                    envelope.metadata.message_id, e
958                                );
959
960                                // Determine error type (simplified classification)
961                                let error_type = classify_error(e.as_ref());
962
963                                // Add error to envelope
964                                envelope = envelope.with_error(
965                                    &e.to_string(),
966                                    error_type,
967                                    Some(&format!("Queue: {}", queue_name)),
968                                );
969
970                                if auto_ack {
971                                    // Check if we should retry
972                                    if let Some(retry_cfg) = &retry_config_clone {
973                                        if !envelope.is_retry_exhausted() {
974                                            // Calculate delay and schedule retry
975                                            if let Some(delay) = retry_cfg
976                                                .calculate_delay(envelope.metadata.retry_attempt)
977                                            {
978                                                warn!(
979                                                    "Scheduling retry {} for envelope {} with delay {:?}",
980                                                    envelope.metadata.retry_attempt + 1,
981                                                    envelope.metadata.message_id,
982                                                    delay
983                                                );
984
985                                                // Increment retry attempt in envelope
986                                                envelope.metadata.retry_attempt += 1;
987
988                                                // Serialize updated envelope
989                                                match serde_json::to_vec(&envelope) {
990                                                    Ok(retry_payload) => {
991                                                        // Create consumer instance for access to methods
992                                                        let consumer_self = Consumer {
993                                                            connection: connection.clone(),
994                                                            queue_name: queue_name.clone(),
995                                                            exchange_name: None,
996                                                            routing_key: None,
997                                                            retry_config: retry_config_clone
998                                                                .clone(),
999                                                            prefetch_count: 10,
1000                                                            auto_ack: true,
1001                                                        };
1002
1003                                                        // Send via appropriate strategy (TTL or DelayedExchange)
1004                                                        let send_result = if matches!(
1005                                                            retry_config_clone
1006                                                                .as_ref()
1007                                                                .map(|c| c.delay_strategy),
1008                                                            Some(crate::retry::DelayStrategy::DelayedExchange)
1009                                                        ) {
1010                                                            consumer_self
1011                                                                .send_to_delay_exchange(
1012                                                                    &channel_clone,
1013                                                                    &retry_payload,
1014                                                                    delay,
1015                                                                )
1016                                                                .await
1017                                                        } else {
1018                                                            consumer_self
1019                                                                .send_to_retry_queue(
1020                                                                    &channel_clone,
1021                                                                    &retry_payload,
1022                                                                    envelope.metadata.retry_attempt,
1023                                                                    delay,
1024                                                                )
1025                                                                .await
1026                                                        };
1027
1028                                                        if let Err(e) = send_result {
1029                                                            error!("Failed to send envelope for retry: {}", e);
1030                                                            // Fallback to simple nack
1031                                                            if let Err(e) = channel_clone
1032                                                                .basic_nack(
1033                                                                    delivery.delivery_tag,
1034                                                                    lapin::options::BasicNackOptions {
1035                                                                        multiple: false,
1036                                                                        requeue: false,
1037                                                                    },
1038                                                                )
1039                                                                .await
1040                                                            {
1041                                                                error!("Failed to nack message: {}", e);
1042                                                            }
1043                                                            return;
1044                                                        }
1045
1046                                                        // ACK original message (it's now queued for retry)
1047                                                        if let Err(e) = channel_clone
1048                                                            .basic_ack(
1049                                                                delivery.delivery_tag,
1050                                                                BasicAckOptions::default(),
1051                                                            )
1052                                                            .await
1053                                                        {
1054                                                            error!("Failed to ack message after retry: {}", e);
1055                                                        }
1056                                                    }
1057                                                    Err(e) => {
1058                                                        error!("Failed to serialize envelope for retry: {}", e);
1059                                                        // Fallback to simple nack
1060                                                        if let Err(e) = channel_clone
1061                                                            .basic_nack(
1062                                                                delivery.delivery_tag,
1063                                                                lapin::options::BasicNackOptions {
1064                                                                    multiple: false,
1065                                                                    requeue: false,
1066                                                                },
1067                                                            )
1068                                                            .await
1069                                                        {
1070                                                            error!("Failed to nack message: {}", e);
1071                                                        }
1072                                                    }
1073                                                }
1074                                            } else {
1075                                                // No more retries, send to DLQ
1076                                                Self::send_to_dlq(
1077                                                    &envelope,
1078                                                    retry_cfg,
1079                                                    &connection,
1080                                                    &queue_name,
1081                                                )
1082                                                .await;
1083
1084                                                // ACK original message
1085                                                if let Err(e) = channel_clone
1086                                                    .basic_ack(
1087                                                        delivery.delivery_tag,
1088                                                        BasicAckOptions::default(),
1089                                                    )
1090                                                    .await
1091                                                {
1092                                                    error!(
1093                                                        "Failed to ack message after DLQ: {}",
1094                                                        e
1095                                                    );
1096                                                }
1097                                            }
1098                                        } else {
1099                                            // Retry exhausted, send to DLQ
1100                                            warn!(
1101                                                "Retry exhausted for envelope {}",
1102                                                envelope.metadata.message_id
1103                                            );
1104                                            Self::send_to_dlq(
1105                                                &envelope,
1106                                                retry_cfg,
1107                                                &connection,
1108                                                &queue_name,
1109                                            )
1110                                            .await;
1111
1112                                            // ACK original message
1113                                            if let Err(e) = channel_clone
1114                                                .basic_ack(
1115                                                    delivery.delivery_tag,
1116                                                    BasicAckOptions::default(),
1117                                                )
1118                                                .await
1119                                            {
1120                                                error!("Failed to ack message after DLQ: {}", e);
1121                                            }
1122                                        }
1123                                    } else {
1124                                        // No retry config, just nack
1125                                        if let Err(e) = channel_clone
1126                                            .basic_nack(
1127                                                delivery.delivery_tag,
1128                                                lapin::options::BasicNackOptions {
1129                                                    multiple: false,
1130                                                    requeue: false,
1131                                                },
1132                                            )
1133                                            .await
1134                                        {
1135                                            error!("Failed to nack message: {}", e);
1136                                        }
1137                                    }
1138                                }
1139                            }
1140                        }
1141                    }
1142                    Err(e) => {
1143                        error!("Failed to deserialize message envelope: {}", e);
1144                        if auto_ack {
1145                            // Reject malformed messages
1146                            if let Err(e) = channel_clone
1147                                .basic_nack(
1148                                    delivery.delivery_tag,
1149                                    lapin::options::BasicNackOptions {
1150                                        multiple: false,
1151                                        requeue: false,
1152                                    },
1153                                )
1154                                .await
1155                            {
1156                                error!("Failed to nack malformed envelope: {}", e);
1157                            }
1158                        }
1159                    }
1160                }
1161            });
1162        }
1163
1164        Ok(())
1165    }
1166
1167    /// Send failed message to Dead Letter Queue
1168    async fn send_to_dlq<T>(
1169        envelope: &MessageEnvelope<T>,
1170        retry_config: &RetryConfig,
1171        connection: &Arc<Connection>,
1172        queue_name: &str,
1173    ) where
1174        T: serde::Serialize,
1175    {
1176        match connection.create_channel().await {
1177            Ok(dlq_channel) => {
1178                let dlq_name = retry_config.get_dead_letter_queue(queue_name);
1179
1180                // Declare DLQ
1181                if let Err(e) = dlq_channel
1182                    .queue_declare(
1183                        &dlq_name,
1184                        QueueDeclareOptions {
1185                            durable: true,
1186                            ..Default::default()
1187                        },
1188                        FieldTable::default(),
1189                    )
1190                    .await
1191                {
1192                    error!("Failed to declare DLQ {}: {}", dlq_name, e);
1193                    return;
1194                }
1195
1196                // Publish to DLQ with failure summary
1197                let failure_summary = envelope.get_failure_summary();
1198                let dlq_payload = serde_json::json!({
1199                    "envelope": envelope,
1200                    "failure_summary": failure_summary,
1201                    "sent_to_dlq_at": chrono::Utc::now(),
1202                });
1203
1204                if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
1205                    if let Err(e) = dlq_channel
1206                        .basic_publish(
1207                            "",
1208                            &dlq_name,
1209                            lapin::options::BasicPublishOptions::default(),
1210                            &payload_bytes,
1211                            lapin::BasicProperties::default(),
1212                        )
1213                        .await
1214                    {
1215                        error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1216                    } else {
1217                        warn!(
1218                            "Sent envelope {} to DLQ: {}",
1219                            envelope.metadata.message_id, failure_summary
1220                        );
1221                    }
1222                }
1223            }
1224            Err(e) => {
1225                error!("Failed to create DLQ channel: {}", e);
1226            }
1227        }
1228    }
1229}
1230
1231/// Classify error type based on error message (simplified heuristics)
1232fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1233    let error_msg = error.to_string().to_lowercase();
1234
1235    if error_msg.contains("timeout")
1236        || error_msg.contains("connection")
1237        || error_msg.contains("network")
1238        || error_msg.contains("temporary")
1239    {
1240        ErrorType::Transient
1241    } else if error_msg.contains("rate limit")
1242        || error_msg.contains("quota")
1243        || error_msg.contains("resource")
1244    {
1245        ErrorType::Resource
1246    } else if error_msg.contains("validation")
1247        || error_msg.contains("authentication")
1248        || error_msg.contains("authorization")
1249        || error_msg.contains("invalid")
1250        || error_msg.contains("bad request")
1251    {
1252        ErrorType::Permanent
1253    } else {
1254        ErrorType::Unknown
1255    }
1256}