rust_rabbit/consumer.rs
1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{ErrorType, MassTransitEnvelope, MessageEnvelope},
5 retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9 options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10 types::{AMQPValue, FieldTable},
11 BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::Semaphore;
19use tracing::{debug, error, warn};
20
21// Helper functions for reading/writing headers
22const HEADER_RETRY_ATTEMPT: &str = "x-retry-attempt";
23const HEADER_CORRELATION_ID: &str = "x-correlation-id";
24
25/// Read retry_attempt from AMQP headers, defaulting to 0 if not present
26fn read_retry_attempt(properties: &BasicProperties) -> u32 {
27 if let Some(headers) = properties.headers() {
28 // Iterate over headers to find the key
29 for (header_key, value) in headers {
30 if header_key.as_str() == HEADER_RETRY_ATTEMPT {
31 if let AMQPValue::LongLongInt(attempt) = value {
32 return *attempt as u32;
33 }
34 }
35 }
36 }
37 0
38}
39
40/// Read correlation_id from AMQP headers
41fn read_correlation_id(properties: &BasicProperties) -> Option<String> {
42 if let Some(headers) = properties.headers() {
43 // Iterate over headers to find the key
44 for (header_key, value) in headers {
45 if header_key.as_str() == HEADER_CORRELATION_ID {
46 if let AMQPValue::LongString(corr_id) = value {
47 return Some(corr_id.to_string());
48 }
49 }
50 }
51 }
52 None
53}
54
55/// Create headers with retry_attempt and correlation_id
56///
57/// This function is kept for potential future use or testing scenarios
58/// where header creation is needed outside of the main message processing flow.
59#[allow(dead_code)]
60fn create_headers(retry_attempt: u32, correlation_id: Option<&str>) -> FieldTable {
61 let mut headers = FieldTable::default();
62 headers.insert(
63 HEADER_RETRY_ATTEMPT.into(),
64 AMQPValue::LongLongInt(retry_attempt as i64),
65 );
66 if let Some(corr_id) = correlation_id {
67 headers.insert(
68 HEADER_CORRELATION_ID.into(),
69 AMQPValue::LongString(corr_id.into()),
70 );
71 }
72 headers
73}
74
75/// Update headers with new retry_attempt, preserving existing headers
76fn update_headers_with_retry(
77 existing_headers: Option<&FieldTable>,
78 retry_attempt: u32,
79) -> FieldTable {
80 let mut headers = match existing_headers {
81 Some(h) => h.clone(),
82 None => FieldTable::default(),
83 };
84 headers.insert(
85 HEADER_RETRY_ATTEMPT.into(),
86 AMQPValue::LongLongInt(retry_attempt as i64),
87 );
88 headers
89}
90
91/// Consumer configuration builder
92pub struct ConsumerBuilder {
93 connection: Arc<Connection>,
94 queue_name: String,
95 exchange_name: Option<String>,
96 routing_key: Option<String>,
97 retry_config: Option<RetryConfig>,
98 prefetch_count: Option<u16>,
99 auto_ack: bool,
100}
101
102impl ConsumerBuilder {
103 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
104 Self {
105 connection,
106 queue_name: queue_name.into(),
107 exchange_name: None,
108 routing_key: None,
109 retry_config: None,
110 prefetch_count: Some(10),
111 auto_ack: true,
112 }
113 }
114
115 /// Bind to an exchange with routing key
116 pub fn bind_to_exchange(
117 mut self,
118 exchange: impl Into<String>,
119 routing_key: impl Into<String>,
120 ) -> Self {
121 self.exchange_name = Some(exchange.into());
122 self.routing_key = Some(routing_key.into());
123 self
124 }
125
126 /// Set routing key (for use with bind_to_exchange)
127 pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
128 self.routing_key = Some(routing_key.into());
129 self
130 }
131
132 /// Configure retry behavior
133 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
134 self.retry_config = Some(retry_config);
135 self
136 }
137
138 /// Set TTL for dead letter queue (auto-cleanup failed messages)
139 /// This is a convenience method that modifies the retry_config if it exists
140 ///
141 /// # Example
142 /// ```rust,no_run
143 /// # use rust_rabbit::{Connection, Consumer, RetryConfig};
144 /// # use std::time::Duration;
145 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
146 /// # let connection = Connection::new("amqp://localhost").await?;
147 /// let consumer = Consumer::builder(connection, "orders")
148 /// .with_retry(RetryConfig::exponential_default())
149 /// .with_dlq_ttl(Duration::from_secs(86400)) // 1 day
150 /// .build();
151 /// # Ok(())
152 /// # }
153 /// ```
154 pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
155 if let Some(retry_config) = self.retry_config.as_mut() {
156 retry_config.dlq_ttl = Some(ttl);
157 }
158 self
159 }
160
161 /// Set prefetch count
162 pub fn with_prefetch(mut self, count: u16) -> Self {
163 self.prefetch_count = Some(count);
164 self
165 }
166
167 /// Disable auto-acknowledge (manual ack required)
168 pub fn manual_ack(mut self) -> Self {
169 self.auto_ack = false;
170 self
171 }
172
173 /// Build the consumer
174 pub fn build(self) -> Consumer {
175 Consumer {
176 connection: self.connection,
177 queue_name: self.queue_name,
178 exchange_name: self.exchange_name,
179 routing_key: self.routing_key,
180 retry_config: self.retry_config,
181 prefetch_count: self.prefetch_count.unwrap_or(10),
182 auto_ack: self.auto_ack,
183 }
184 }
185}
186
187/// Simplified Consumer for message consumption
188pub struct Consumer {
189 connection: Arc<Connection>,
190 queue_name: String,
191 exchange_name: Option<String>,
192 routing_key: Option<String>,
193 retry_config: Option<RetryConfig>,
194 prefetch_count: u16,
195 auto_ack: bool,
196}
197
198impl Consumer {
199 /// Create a new consumer builder
200 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
201 ConsumerBuilder::new(connection, queue_name)
202 }
203
204 /// Create retry queue with TTL
205 async fn create_retry_queue(
206 &self,
207 channel: &Channel,
208 retry_attempt: u32,
209 delay: std::time::Duration,
210 ) -> Result<String, RustRabbitError> {
211 let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
212 let delay_ms = delay.as_millis() as i64;
213
214 // Create retry queue with TTL that routes back to original queue
215 let mut args = FieldTable::default();
216 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
217 args.insert(
218 "x-dead-letter-exchange".into(),
219 AMQPValue::LongString("".into()),
220 ); // Default exchange
221 args.insert(
222 "x-dead-letter-routing-key".into(),
223 AMQPValue::LongString(self.queue_name.clone().into()),
224 );
225
226 channel
227 .queue_declare(
228 &retry_queue_name,
229 QueueDeclareOptions {
230 durable: true,
231 ..Default::default()
232 },
233 args,
234 )
235 .await?;
236
237 debug!(
238 "Created retry queue: {} with TTL: {}ms",
239 retry_queue_name, delay_ms
240 );
241 Ok(retry_queue_name)
242 }
243
244 /// Create DLQ (Dead Letter Queue) with optional TTL
245 async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
246 let dlq_name = format!("{}.dlq", self.queue_name);
247
248 // Build queue arguments with optional TTL
249 let mut args = FieldTable::default();
250 if let Some(retry_config) = &self.retry_config {
251 if let Some(ttl) = &retry_config.dlq_ttl {
252 let ttl_ms = ttl.as_millis() as i64;
253 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
254 debug!("DLQ TTL: {}ms", ttl_ms);
255 }
256 }
257
258 channel
259 .queue_declare(
260 &dlq_name,
261 QueueDeclareOptions {
262 durable: true,
263 ..Default::default()
264 },
265 args,
266 )
267 .await?;
268
269 debug!("Created DLQ: {}", dlq_name);
270 Ok(dlq_name)
271 }
272
273 /// Send message to retry queue with delay
274 async fn send_to_retry_queue(
275 &self,
276 channel: &Channel,
277 message_data: &[u8],
278 retry_attempt: u32,
279 delay: std::time::Duration,
280 ) -> Result<(), RustRabbitError> {
281 let retry_queue_name = self
282 .create_retry_queue(channel, retry_attempt, delay)
283 .await?;
284
285 // Publish to retry queue
286 channel
287 .basic_publish(
288 "", // Default exchange
289 &retry_queue_name,
290 BasicPublishOptions::default(),
291 message_data,
292 BasicProperties::default()
293 .with_content_type("application/json".into())
294 .with_delivery_mode(2), // Persistent
295 )
296 .await?
297 .await?;
298
299 debug!("Sent message to retry queue: {}", retry_queue_name);
300 Ok(())
301 }
302
303 /// Send message to retry queue with delay and custom headers
304 async fn send_to_retry_queue_with_headers(
305 &self,
306 channel: &Channel,
307 message_data: &[u8],
308 retry_attempt: u32,
309 delay: std::time::Duration,
310 headers: FieldTable,
311 ) -> Result<(), RustRabbitError> {
312 let retry_queue_name = self
313 .create_retry_queue(channel, retry_attempt, delay)
314 .await?;
315
316 // Publish to retry queue with headers
317 channel
318 .basic_publish(
319 "", // Default exchange
320 &retry_queue_name,
321 BasicPublishOptions::default(),
322 message_data,
323 BasicProperties::default()
324 .with_content_type("application/json".into())
325 .with_delivery_mode(2) // Persistent
326 .with_headers(headers),
327 )
328 .await?
329 .await?;
330
331 debug!(
332 "Sent message to retry queue with headers: {}",
333 retry_queue_name
334 );
335 Ok(())
336 }
337
338 /// Send message to DLQ
339 async fn send_to_dlq_simple(
340 &self,
341 channel: &Channel,
342 message_data: &[u8],
343 ) -> Result<(), RustRabbitError> {
344 let dlq_name = self.create_dlq(channel).await?;
345
346 // Publish to DLQ
347 channel
348 .basic_publish(
349 "", // Default exchange
350 &dlq_name,
351 BasicPublishOptions::default(),
352 message_data,
353 BasicProperties::default()
354 .with_content_type("application/json".into())
355 .with_delivery_mode(2), // Persistent
356 )
357 .await?
358 .await?;
359
360 debug!("Sent message to DLQ: {}", dlq_name);
361 Ok(())
362 }
363
364 /// Create delay exchange using RabbitMQ delayed message exchange plugin
365 /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
366 async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
367 if let Some(retry_config) = &self.retry_config {
368 let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
369
370 // Declare delay exchange with x-delayed-type argument
371 let mut args = FieldTable::default();
372 args.insert(
373 "x-delayed-type".into(),
374 AMQPValue::LongString("direct".into()),
375 );
376
377 channel
378 .exchange_declare(
379 &delay_exchange,
380 lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
381 lapin::options::ExchangeDeclareOptions {
382 durable: true,
383 ..Default::default()
384 },
385 args,
386 )
387 .await?;
388
389 debug!(
390 "Created delay exchange: {} (x-delayed-message type)",
391 delay_exchange
392 );
393 Ok(delay_exchange)
394 } else {
395 Err(RustRabbitError::Retry(
396 "Retry config not configured".to_string(),
397 ))
398 }
399 }
400
401 /// Send message to delay exchange with x-delay header for retry
402 /// Message will be automatically routed back to the original queue after delay
403 async fn send_to_delay_exchange(
404 &self,
405 channel: &Channel,
406 message_data: &[u8],
407 delay: std::time::Duration,
408 ) -> Result<(), RustRabbitError> {
409 let delay_exchange = self.create_delay_exchange(channel).await?;
410 let delay_ms = delay.as_millis() as i64;
411
412 // Publish to delay exchange with x-delay header
413 // The message will be re-delivered to original queue after delay
414 channel
415 .basic_publish(
416 &delay_exchange,
417 &self.queue_name, // Routing key: original queue name
418 BasicPublishOptions::default(),
419 message_data,
420 BasicProperties::default()
421 .with_content_type("application/json".into())
422 .with_delivery_mode(2) // Persistent
423 .with_headers({
424 let mut headers = FieldTable::default();
425 headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
426 headers
427 }),
428 )
429 .await?
430 .await?;
431
432 debug!(
433 "Sent message to delay exchange: {} with delay: {}ms",
434 delay_exchange, delay_ms
435 );
436 Ok(())
437 }
438
439 /// Send message to delay exchange with x-delay header and custom headers for retry
440 /// Message will be automatically routed back to the original queue after delay
441 async fn send_to_delay_exchange_with_headers(
442 &self,
443 channel: &Channel,
444 message_data: &[u8],
445 delay: std::time::Duration,
446 mut headers: FieldTable,
447 ) -> Result<(), RustRabbitError> {
448 let delay_exchange = self.create_delay_exchange(channel).await?;
449 let delay_ms = delay.as_millis() as i64;
450
451 // Add x-delay header to existing headers
452 headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
453
454 // Publish to delay exchange with headers
455 // The message will be re-delivered to original queue after delay
456 channel
457 .basic_publish(
458 &delay_exchange,
459 &self.queue_name, // Routing key: original queue name
460 BasicPublishOptions::default(),
461 message_data,
462 BasicProperties::default()
463 .with_content_type("application/json".into())
464 .with_delivery_mode(2) // Persistent
465 .with_headers(headers),
466 )
467 .await?
468 .await?;
469
470 debug!(
471 "Sent message to delay exchange with headers: {} with delay: {}ms",
472 delay_exchange, delay_ms
473 );
474 Ok(())
475 }
476
477 /// Start consuming messages with smart MassTransit detection
478 /// Handler receives just the payload type T (no wrapper)
479 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
480 where
481 T: DeserializeOwned + Send + Clone + Sync + 'static + Serialize,
482 H: Fn(T) -> Fut + Send + Sync + Clone + 'static,
483 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
484 {
485 let channel = self.connection.create_channel().await?;
486
487 // Set prefetch count
488 channel
489 .basic_qos(
490 self.prefetch_count,
491 lapin::options::BasicQosOptions::default(),
492 )
493 .await?;
494
495 // Setup infrastructure (queues, exchanges)
496 self.setup_infrastructure(&channel).await?;
497
498 // Start consuming
499 let mut consumer = channel
500 .basic_consume(
501 &self.queue_name,
502 "",
503 BasicConsumeOptions::default(),
504 FieldTable::default(),
505 )
506 .await?;
507
508 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
509
510 debug!("Started consuming from queue: {}", self.queue_name);
511
512 // Process messages
513 while let Some(delivery_result) = consumer.next().await {
514 let delivery = delivery_result?;
515 let permit = semaphore.clone().acquire_owned().await.unwrap();
516 let handler_clone = handler.clone();
517 let auto_ack = self.auto_ack;
518 let channel_clone = Arc::new(channel.clone());
519 let retry_config = self.retry_config.clone();
520 let consumer_self = Consumer {
521 connection: self.connection.clone(),
522 queue_name: self.queue_name.clone(),
523 exchange_name: self.exchange_name.clone(),
524 routing_key: self.routing_key.clone(),
525 retry_config: self.retry_config.clone(),
526 prefetch_count: self.prefetch_count,
527 auto_ack: self.auto_ack,
528 };
529
530 tokio::spawn(async move {
531 let _permit = permit;
532 let delivery_tag = delivery.delivery_tag;
533 let properties = delivery.properties;
534
535 // Smart detection: Try MassTransit format first
536 let (payload, correlation_id_from_mt) =
537 match MassTransitEnvelope::from_slice(&delivery.data) {
538 Ok(mt_envelope) => {
539 // MassTransit format detected - extract payload
540 match mt_envelope.extract_message::<T>() {
541 Ok(data) => {
542 debug!("Detected MassTransit format, extracted payload");
543 (data, mt_envelope.correlation_id().map(|s| s.to_string()))
544 }
545 Err(e) => {
546 error!(
547 "Failed to extract payload from MassTransit envelope: {}",
548 e
549 );
550 if auto_ack {
551 if let Err(e) = channel_clone
552 .basic_nack(
553 delivery_tag,
554 lapin::options::BasicNackOptions {
555 multiple: false,
556 requeue: false,
557 },
558 )
559 .await
560 {
561 error!("Failed to nack malformed message: {}", e);
562 }
563 }
564 return;
565 }
566 }
567 }
568 Err(_) => {
569 // Not MassTransit format - try direct deserialization
570 match serde_json::from_slice::<T>(&delivery.data) {
571 Ok(data) => {
572 debug!("Direct format detected");
573 (data, None)
574 }
575 Err(e) => {
576 error!("Failed to deserialize message: {}", e);
577 if auto_ack {
578 if let Err(e) = channel_clone
579 .basic_nack(
580 delivery_tag,
581 lapin::options::BasicNackOptions {
582 multiple: false,
583 requeue: false,
584 },
585 )
586 .await
587 {
588 error!("Failed to nack malformed message: {}", e);
589 }
590 }
591 return;
592 }
593 }
594 }
595 };
596
597 // Read metadata from headers
598 let retry_attempt = read_retry_attempt(&properties);
599 let correlation_id =
600 correlation_id_from_mt.or_else(|| read_correlation_id(&properties));
601
602 // If we got correlation_id from MassTransit but it's not in headers, we should add it
603 // But for now, we'll just use it for retries
604
605 // Process message - handler receives just T
606 match handler_clone(payload.clone()).await {
607 Ok(()) => {
608 if auto_ack {
609 if let Err(e) = channel_clone
610 .basic_ack(delivery_tag, BasicAckOptions::default())
611 .await
612 {
613 error!("Failed to ack message: {}", e);
614 }
615 }
616 debug!("Message processed successfully");
617 }
618 Err(e) => {
619 error!("Handler error: {}", e);
620 if auto_ack {
621 // Check if retry is configured
622 if let Some(retry_cfg) = &retry_config {
623 if retry_attempt < retry_cfg.max_retries {
624 // Calculate delay for next retry
625 if let Some(delay) = retry_cfg.calculate_delay(retry_attempt) {
626 warn!(
627 "Scheduling retry {} with delay {:?} for message",
628 retry_attempt + 1,
629 delay
630 );
631
632 // Serialize payload (raw, no wrapper)
633 let payload_bytes = match serde_json::to_vec(&payload) {
634 Ok(bytes) => bytes,
635 Err(e) => {
636 error!(
637 "Failed to serialize payload for retry: {}",
638 e
639 );
640 if let Err(e) = channel_clone
641 .basic_nack(
642 delivery_tag,
643 lapin::options::BasicNackOptions {
644 multiple: false,
645 requeue: false,
646 },
647 )
648 .await
649 {
650 error!("Failed to nack message: {}", e);
651 }
652 return;
653 }
654 };
655
656 // Update headers with new retry_attempt, preserving existing headers
657 let mut updated_headers = update_headers_with_retry(
658 properties.headers().as_ref(),
659 retry_attempt + 1,
660 );
661
662 // Preserve correlation_id in headers if present
663 if let Some(corr_id) = &correlation_id {
664 updated_headers.insert(
665 HEADER_CORRELATION_ID.into(),
666 AMQPValue::LongString(corr_id.clone().into()),
667 );
668 }
669
670 // Send via appropriate strategy (TTL or DelayedExchange)
671 let send_result = if matches!(
672 retry_cfg.delay_strategy,
673 crate::retry::DelayStrategy::DelayedExchange
674 ) {
675 consumer_self
676 .send_to_delay_exchange_with_headers(
677 &channel_clone,
678 &payload_bytes,
679 delay,
680 updated_headers,
681 )
682 .await
683 } else {
684 consumer_self
685 .send_to_retry_queue_with_headers(
686 &channel_clone,
687 &payload_bytes,
688 retry_attempt + 1,
689 delay,
690 updated_headers,
691 )
692 .await
693 };
694
695 if let Err(e) = send_result {
696 error!("Failed to send retry message: {}", e);
697 if let Err(e) = channel_clone
698 .basic_nack(
699 delivery_tag,
700 lapin::options::BasicNackOptions {
701 multiple: false,
702 requeue: false,
703 },
704 )
705 .await
706 {
707 error!("Failed to nack message: {}", e);
708 }
709 return;
710 }
711
712 // ACK original message (it's now queued for retry)
713 if let Err(e) = channel_clone
714 .basic_ack(delivery_tag, BasicAckOptions::default())
715 .await
716 {
717 error!("Failed to ack message after retry: {}", e);
718 }
719 } else {
720 // No more retries, send to DLQ
721 warn!("Retry exhausted, sending to DLQ");
722 let payload_bytes = match serde_json::to_vec(&payload) {
723 Ok(bytes) => bytes,
724 Err(e) => {
725 error!(
726 "Failed to serialize payload for DLQ: {}. Creating error payload.",
727 e
728 );
729 // Create descriptive error payload instead of using raw delivery.data
730 serde_json::to_vec(&serde_json::json!({
731 "error": "Failed to serialize payload for DLQ",
732 "serialization_error": e.to_string(),
733 "retry_attempt": retry_attempt,
734 "original_message_size": delivery.data.len()
735 }))
736 .unwrap_or_else(|_| {
737 // Last resort: minimal error message
738 format!(
739 r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
740 retry_attempt
741 )
742 .into_bytes()
743 })
744 }
745 };
746 if let Err(e) = consumer_self
747 .send_to_dlq_simple(&channel_clone, &payload_bytes)
748 .await
749 {
750 error!("Failed to send to DLQ: {}", e);
751 }
752 if let Err(e) = channel_clone
753 .basic_ack(delivery_tag, BasicAckOptions::default())
754 .await
755 {
756 error!("Failed to ack message after DLQ: {}", e);
757 }
758 }
759 } else {
760 // Retry exhausted, send to DLQ
761 warn!("Max retries reached, sending to DLQ");
762 let payload_bytes = match serde_json::to_vec(&payload) {
763 Ok(bytes) => bytes,
764 Err(e) => {
765 error!(
766 "Failed to serialize payload for DLQ: {}. Creating error payload.",
767 e
768 );
769 // Create descriptive error payload instead of using raw delivery.data
770 serde_json::to_vec(&serde_json::json!({
771 "error": "Failed to serialize payload for DLQ",
772 "serialization_error": e.to_string(),
773 "retry_attempt": retry_attempt,
774 "original_message_size": delivery.data.len()
775 }))
776 .unwrap_or_else(|_| {
777 // Last resort: minimal error message
778 format!(
779 r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
780 retry_attempt
781 )
782 .into_bytes()
783 })
784 }
785 };
786 if let Err(e) = consumer_self
787 .send_to_dlq_simple(&channel_clone, &payload_bytes)
788 .await
789 {
790 error!("Failed to send to DLQ: {}", e);
791 }
792 if let Err(e) = channel_clone
793 .basic_ack(delivery_tag, BasicAckOptions::default())
794 .await
795 {
796 error!("Failed to ack message after DLQ: {}", e);
797 }
798 }
799 } else {
800 // No retry config, just nack
801 if let Err(e) = channel_clone
802 .basic_nack(
803 delivery_tag,
804 lapin::options::BasicNackOptions {
805 multiple: false,
806 requeue: false,
807 },
808 )
809 .await
810 {
811 error!("Failed to nack message: {}", e);
812 }
813 }
814 }
815 }
816 }
817 });
818 }
819
820 Ok(())
821 }
822
823 /// Setup queue and exchange infrastructure
824 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
825 // Declare queue
826 channel
827 .queue_declare(
828 &self.queue_name,
829 QueueDeclareOptions {
830 durable: true,
831 ..Default::default()
832 },
833 FieldTable::default(),
834 )
835 .await?;
836
837 // Bind to exchange if specified
838 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
839 channel
840 .queue_bind(
841 &self.queue_name,
842 exchange,
843 routing_key,
844 lapin::options::QueueBindOptions::default(),
845 FieldTable::default(),
846 )
847 .await?;
848 }
849
850 // Setup delay exchange if using DelayedExchange strategy
851 if let Some(retry_config) = &self.retry_config {
852 if matches!(
853 retry_config.delay_strategy,
854 crate::retry::DelayStrategy::DelayedExchange
855 ) {
856 let delay_exchange = self.create_delay_exchange(channel).await?;
857
858 // Bind delay exchange to original queue
859 channel
860 .queue_bind(
861 &self.queue_name,
862 &delay_exchange,
863 &self.queue_name, // Routing key: original queue name
864 lapin::options::QueueBindOptions::default(),
865 FieldTable::default(),
866 )
867 .await?;
868
869 debug!(
870 "Bound queue {} to delay exchange {}",
871 self.queue_name, delay_exchange
872 );
873 }
874 }
875
876 Ok(())
877 }
878
879 /// Start consuming message envelopes with full retry support
880 pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
881 where
882 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
883 H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
884 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
885 {
886 let channel = self.connection.create_channel().await?;
887 let retry_config = self.retry_config.clone();
888
889 // Set prefetch count
890 channel
891 .basic_qos(
892 self.prefetch_count,
893 lapin::options::BasicQosOptions::default(),
894 )
895 .await?;
896
897 // Setup queue and exchange
898 self.setup_infrastructure(&channel).await?;
899
900 // Create consumer
901 let mut consumer = channel
902 .basic_consume(
903 &self.queue_name,
904 "rust-rabbit-envelope-consumer",
905 BasicConsumeOptions::default(),
906 FieldTable::default(),
907 )
908 .await?;
909
910 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
911
912 debug!(
913 "Started consuming envelopes from queue: {}",
914 self.queue_name
915 );
916
917 // Process message envelopes with retry support
918 while let Some(delivery_result) = consumer.next().await {
919 let delivery = delivery_result?;
920 let permit = semaphore.clone().acquire_owned().await.unwrap();
921 let handler_clone = handler.clone();
922 let auto_ack = self.auto_ack;
923 let channel_clone = Arc::new(channel.clone());
924 let retry_config_clone = retry_config.clone();
925 let queue_name = self.queue_name.clone();
926 let connection = self.connection.clone();
927
928 tokio::spawn(async move {
929 let _permit = permit;
930
931 // Try to deserialize as MessageEnvelope
932 match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
933 Ok(mut envelope) => {
934 debug!(
935 "Processing envelope {} (attempt {}/{})",
936 envelope.metadata.message_id,
937 envelope.metadata.retry_attempt + 1,
938 envelope.metadata.max_retries + 1
939 );
940
941 // Process message
942 match handler_clone(envelope.clone()).await {
943 Ok(()) => {
944 if auto_ack {
945 if let Err(e) = channel_clone
946 .basic_ack(
947 delivery.delivery_tag,
948 BasicAckOptions::default(),
949 )
950 .await
951 {
952 error!("Failed to ack message: {}", e);
953 }
954 }
955 debug!(
956 "Envelope {} processed successfully",
957 envelope.metadata.message_id
958 );
959 }
960 Err(e) => {
961 error!(
962 "Handler error for envelope {}: {}",
963 envelope.metadata.message_id, e
964 );
965
966 // Determine error type (simplified classification)
967 let error_type = classify_error(e.as_ref());
968
969 // Add error to envelope
970 envelope = envelope.with_error(
971 &e.to_string(),
972 error_type,
973 Some(&format!("Queue: {}", queue_name)),
974 );
975
976 if auto_ack {
977 // Check if we should retry
978 if let Some(retry_cfg) = &retry_config_clone {
979 if !envelope.is_retry_exhausted() {
980 // Calculate delay and schedule retry
981 if let Some(delay) = retry_cfg
982 .calculate_delay(envelope.metadata.retry_attempt)
983 {
984 warn!(
985 "Scheduling retry {} for envelope {} with delay {:?}",
986 envelope.metadata.retry_attempt + 1,
987 envelope.metadata.message_id,
988 delay
989 );
990
991 // Increment retry attempt in envelope
992 envelope.metadata.retry_attempt += 1;
993
994 // Serialize updated envelope
995 match serde_json::to_vec(&envelope) {
996 Ok(retry_payload) => {
997 // Create consumer instance for access to methods
998 let consumer_self = Consumer {
999 connection: connection.clone(),
1000 queue_name: queue_name.clone(),
1001 exchange_name: None,
1002 routing_key: None,
1003 retry_config: retry_config_clone
1004 .clone(),
1005 prefetch_count: 10,
1006 auto_ack: true,
1007 };
1008
1009 // Send via appropriate strategy (TTL or DelayedExchange)
1010 let send_result = if matches!(
1011 retry_config_clone
1012 .as_ref()
1013 .map(|c| c.delay_strategy),
1014 Some(crate::retry::DelayStrategy::DelayedExchange)
1015 ) {
1016 consumer_self
1017 .send_to_delay_exchange(
1018 &channel_clone,
1019 &retry_payload,
1020 delay,
1021 )
1022 .await
1023 } else {
1024 consumer_self
1025 .send_to_retry_queue(
1026 &channel_clone,
1027 &retry_payload,
1028 envelope.metadata.retry_attempt,
1029 delay,
1030 )
1031 .await
1032 };
1033
1034 if let Err(e) = send_result {
1035 error!("Failed to send envelope for retry: {}", e);
1036 // Fallback to simple nack
1037 if let Err(e) = channel_clone
1038 .basic_nack(
1039 delivery.delivery_tag,
1040 lapin::options::BasicNackOptions {
1041 multiple: false,
1042 requeue: false,
1043 },
1044 )
1045 .await
1046 {
1047 error!("Failed to nack message: {}", e);
1048 }
1049 return;
1050 }
1051
1052 // ACK original message (it's now queued for retry)
1053 if let Err(e) = channel_clone
1054 .basic_ack(
1055 delivery.delivery_tag,
1056 BasicAckOptions::default(),
1057 )
1058 .await
1059 {
1060 error!("Failed to ack message after retry: {}", e);
1061 }
1062 }
1063 Err(e) => {
1064 error!("Failed to serialize envelope for retry: {}", e);
1065 // Fallback to simple nack
1066 if let Err(e) = channel_clone
1067 .basic_nack(
1068 delivery.delivery_tag,
1069 lapin::options::BasicNackOptions {
1070 multiple: false,
1071 requeue: false,
1072 },
1073 )
1074 .await
1075 {
1076 error!("Failed to nack message: {}", e);
1077 }
1078 }
1079 }
1080 } else {
1081 // No more retries, send to DLQ
1082 Self::send_to_dlq(
1083 &envelope,
1084 retry_cfg,
1085 &connection,
1086 &queue_name,
1087 )
1088 .await;
1089
1090 // ACK original message
1091 if let Err(e) = channel_clone
1092 .basic_ack(
1093 delivery.delivery_tag,
1094 BasicAckOptions::default(),
1095 )
1096 .await
1097 {
1098 error!(
1099 "Failed to ack message after DLQ: {}",
1100 e
1101 );
1102 }
1103 }
1104 } else {
1105 // Retry exhausted, send to DLQ
1106 warn!(
1107 "Retry exhausted for envelope {}",
1108 envelope.metadata.message_id
1109 );
1110 Self::send_to_dlq(
1111 &envelope,
1112 retry_cfg,
1113 &connection,
1114 &queue_name,
1115 )
1116 .await;
1117
1118 // ACK original message
1119 if let Err(e) = channel_clone
1120 .basic_ack(
1121 delivery.delivery_tag,
1122 BasicAckOptions::default(),
1123 )
1124 .await
1125 {
1126 error!("Failed to ack message after DLQ: {}", e);
1127 }
1128 }
1129 } else {
1130 // No retry config, just nack
1131 if let Err(e) = channel_clone
1132 .basic_nack(
1133 delivery.delivery_tag,
1134 lapin::options::BasicNackOptions {
1135 multiple: false,
1136 requeue: false,
1137 },
1138 )
1139 .await
1140 {
1141 error!("Failed to nack message: {}", e);
1142 }
1143 }
1144 }
1145 }
1146 }
1147 }
1148 Err(e) => {
1149 error!("Failed to deserialize message envelope: {}", e);
1150 if auto_ack {
1151 // Reject malformed messages
1152 if let Err(e) = channel_clone
1153 .basic_nack(
1154 delivery.delivery_tag,
1155 lapin::options::BasicNackOptions {
1156 multiple: false,
1157 requeue: false,
1158 },
1159 )
1160 .await
1161 {
1162 error!("Failed to nack malformed envelope: {}", e);
1163 }
1164 }
1165 }
1166 }
1167 });
1168 }
1169
1170 Ok(())
1171 }
1172
1173 /// Send failed message to Dead Letter Queue
1174 async fn send_to_dlq<T>(
1175 envelope: &MessageEnvelope<T>,
1176 retry_config: &RetryConfig,
1177 connection: &Arc<Connection>,
1178 queue_name: &str,
1179 ) where
1180 T: serde::Serialize,
1181 {
1182 match connection.create_channel().await {
1183 Ok(dlq_channel) => {
1184 let dlq_name = retry_config.get_dead_letter_queue(queue_name);
1185
1186 // Declare DLQ
1187 if let Err(e) = dlq_channel
1188 .queue_declare(
1189 &dlq_name,
1190 QueueDeclareOptions {
1191 durable: true,
1192 ..Default::default()
1193 },
1194 FieldTable::default(),
1195 )
1196 .await
1197 {
1198 error!("Failed to declare DLQ {}: {}", dlq_name, e);
1199 return;
1200 }
1201
1202 // Publish to DLQ with failure summary
1203 let failure_summary = envelope.get_failure_summary();
1204 let dlq_payload = serde_json::json!({
1205 "envelope": envelope,
1206 "failure_summary": failure_summary,
1207 "sent_to_dlq_at": chrono::Utc::now(),
1208 });
1209
1210 if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
1211 if let Err(e) = dlq_channel
1212 .basic_publish(
1213 "",
1214 &dlq_name,
1215 lapin::options::BasicPublishOptions::default(),
1216 &payload_bytes,
1217 lapin::BasicProperties::default(),
1218 )
1219 .await
1220 {
1221 error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1222 } else {
1223 warn!(
1224 "Sent envelope {} to DLQ: {}",
1225 envelope.metadata.message_id, failure_summary
1226 );
1227 }
1228 }
1229 }
1230 Err(e) => {
1231 error!("Failed to create DLQ channel: {}", e);
1232 }
1233 }
1234 }
1235}
1236
1237/// Classify error type based on error message (simplified heuristics)
1238fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1239 let error_msg = error.to_string().to_lowercase();
1240
1241 if error_msg.contains("timeout")
1242 || error_msg.contains("connection")
1243 || error_msg.contains("network")
1244 || error_msg.contains("temporary")
1245 {
1246 ErrorType::Transient
1247 } else if error_msg.contains("rate limit")
1248 || error_msg.contains("quota")
1249 || error_msg.contains("resource")
1250 {
1251 ErrorType::Resource
1252 } else if error_msg.contains("validation")
1253 || error_msg.contains("authentication")
1254 || error_msg.contains("authorization")
1255 || error_msg.contains("invalid")
1256 || error_msg.contains("bad request")
1257 {
1258 ErrorType::Permanent
1259 } else {
1260 ErrorType::Unknown
1261 }
1262}