rust_rabbit/consumer.rs
1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{ErrorType, MassTransitEnvelope, MessageEnvelope},
5 retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9 options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10 types::{AMQPValue, FieldTable},
11 BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::Semaphore;
19use tracing::{debug, error, warn};
20
21// Helper functions for reading/writing headers
22const HEADER_RETRY_ATTEMPT: &str = "x-retry-attempt";
23const HEADER_CORRELATION_ID: &str = "x-correlation-id";
24
25/// Read retry_attempt from AMQP headers, defaulting to 0 if not present
26fn read_retry_attempt(properties: &BasicProperties) -> u32 {
27 if let Some(headers) = properties.headers() {
28 // Iterate over headers to find the key
29 for (header_key, value) in headers {
30 if header_key.as_str() == HEADER_RETRY_ATTEMPT {
31 if let AMQPValue::LongLongInt(attempt) = value {
32 return *attempt as u32;
33 }
34 }
35 }
36 }
37 0
38}
39
40/// Read correlation_id from AMQP headers
41fn read_correlation_id(properties: &BasicProperties) -> Option<String> {
42 if let Some(headers) = properties.headers() {
43 // Iterate over headers to find the key
44 for (header_key, value) in headers {
45 if header_key.as_str() == HEADER_CORRELATION_ID {
46 if let AMQPValue::LongString(corr_id) = value {
47 return Some(corr_id.to_string());
48 }
49 }
50 }
51 }
52 None
53}
54
55/// Create headers with retry_attempt and correlation_id
56///
57/// This function is kept for potential future use or testing scenarios
58/// where header creation is needed outside of the main message processing flow.
59#[allow(dead_code)]
60fn create_headers(retry_attempt: u32, correlation_id: Option<&str>) -> FieldTable {
61 let mut headers = FieldTable::default();
62 headers.insert(
63 HEADER_RETRY_ATTEMPT.into(),
64 AMQPValue::LongLongInt(retry_attempt as i64),
65 );
66 if let Some(corr_id) = correlation_id {
67 headers.insert(
68 HEADER_CORRELATION_ID.into(),
69 AMQPValue::LongString(corr_id.into()),
70 );
71 }
72 headers
73}
74
75/// Update headers with new retry_attempt, preserving existing headers
76fn update_headers_with_retry(
77 existing_headers: Option<&FieldTable>,
78 retry_attempt: u32,
79) -> FieldTable {
80 let mut headers = match existing_headers {
81 Some(h) => h.clone(),
82 None => FieldTable::default(),
83 };
84 headers.insert(
85 HEADER_RETRY_ATTEMPT.into(),
86 AMQPValue::LongLongInt(retry_attempt as i64),
87 );
88 headers
89}
90
91/// Consumer configuration builder
92pub struct ConsumerBuilder {
93 connection: Arc<Connection>,
94 queue_name: String,
95 exchange_name: Option<String>,
96 routing_key: Option<String>,
97 retry_config: Option<RetryConfig>,
98 prefetch_count: Option<u16>,
99 auto_ack: bool,
100}
101
102impl ConsumerBuilder {
103 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
104 Self {
105 connection,
106 queue_name: queue_name.into(),
107 exchange_name: None,
108 routing_key: None,
109 retry_config: None,
110 prefetch_count: Some(10),
111 auto_ack: true,
112 }
113 }
114
115 /// Bind to an exchange with routing key
116 pub fn bind_to_exchange(
117 mut self,
118 exchange: impl Into<String>,
119 routing_key: impl Into<String>,
120 ) -> Self {
121 self.exchange_name = Some(exchange.into());
122 self.routing_key = Some(routing_key.into());
123 self
124 }
125
126 /// Set routing key (for use with bind_to_exchange)
127 pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
128 self.routing_key = Some(routing_key.into());
129 self
130 }
131
132 /// Configure retry behavior
133 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
134 self.retry_config = Some(retry_config);
135 self
136 }
137
138 /// Set TTL for dead letter queue (auto-cleanup failed messages)
139 /// This is a convenience method that modifies the retry_config if it exists
140 ///
141 /// # Example
142 /// ```ignore
143 /// let consumer = Consumer::builder(connection, "orders")
144 /// .with_retry(RetryConfig::exponential_default())
145 /// .with_dlq_ttl(Duration::from_secs(86400)) // 1 day
146 /// .build();
147 /// ```
148 pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
149 if let Some(retry_config) = self.retry_config.as_mut() {
150 retry_config.dlq_ttl = Some(ttl);
151 }
152 self
153 }
154
155 /// Set prefetch count
156 pub fn with_prefetch(mut self, count: u16) -> Self {
157 self.prefetch_count = Some(count);
158 self
159 }
160
161 /// Disable auto-acknowledge (manual ack required)
162 pub fn manual_ack(mut self) -> Self {
163 self.auto_ack = false;
164 self
165 }
166
167 /// Build the consumer
168 pub fn build(self) -> Consumer {
169 Consumer {
170 connection: self.connection,
171 queue_name: self.queue_name,
172 exchange_name: self.exchange_name,
173 routing_key: self.routing_key,
174 retry_config: self.retry_config,
175 prefetch_count: self.prefetch_count.unwrap_or(10),
176 auto_ack: self.auto_ack,
177 }
178 }
179}
180
181/// Simplified Consumer for message consumption
182pub struct Consumer {
183 connection: Arc<Connection>,
184 queue_name: String,
185 exchange_name: Option<String>,
186 routing_key: Option<String>,
187 retry_config: Option<RetryConfig>,
188 prefetch_count: u16,
189 auto_ack: bool,
190}
191
192impl Consumer {
193 /// Create a new consumer builder
194 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
195 ConsumerBuilder::new(connection, queue_name)
196 }
197
198 /// Create retry queue with TTL
199 async fn create_retry_queue(
200 &self,
201 channel: &Channel,
202 retry_attempt: u32,
203 delay: std::time::Duration,
204 ) -> Result<String, RustRabbitError> {
205 let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
206 let delay_ms = delay.as_millis() as i64;
207
208 // Create retry queue with TTL that routes back to original queue
209 let mut args = FieldTable::default();
210 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
211 args.insert(
212 "x-dead-letter-exchange".into(),
213 AMQPValue::LongString("".into()),
214 ); // Default exchange
215 args.insert(
216 "x-dead-letter-routing-key".into(),
217 AMQPValue::LongString(self.queue_name.clone().into()),
218 );
219
220 channel
221 .queue_declare(
222 &retry_queue_name,
223 QueueDeclareOptions {
224 durable: true,
225 ..Default::default()
226 },
227 args,
228 )
229 .await?;
230
231 debug!(
232 "Created retry queue: {} with TTL: {}ms",
233 retry_queue_name, delay_ms
234 );
235 Ok(retry_queue_name)
236 }
237
238 /// Create DLQ (Dead Letter Queue) with optional TTL
239 async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
240 let dlq_name = format!("{}.dlq", self.queue_name);
241
242 // Build queue arguments with optional TTL
243 let mut args = FieldTable::default();
244 if let Some(retry_config) = &self.retry_config {
245 if let Some(ttl) = &retry_config.dlq_ttl {
246 let ttl_ms = ttl.as_millis() as i64;
247 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
248 debug!("DLQ TTL: {}ms", ttl_ms);
249 }
250 }
251
252 channel
253 .queue_declare(
254 &dlq_name,
255 QueueDeclareOptions {
256 durable: true,
257 ..Default::default()
258 },
259 args,
260 )
261 .await?;
262
263 debug!("Created DLQ: {}", dlq_name);
264 Ok(dlq_name)
265 }
266
267 /// Send message to retry queue with delay
268 async fn send_to_retry_queue(
269 &self,
270 channel: &Channel,
271 message_data: &[u8],
272 retry_attempt: u32,
273 delay: std::time::Duration,
274 ) -> Result<(), RustRabbitError> {
275 let retry_queue_name = self
276 .create_retry_queue(channel, retry_attempt, delay)
277 .await?;
278
279 // Publish to retry queue
280 channel
281 .basic_publish(
282 "", // Default exchange
283 &retry_queue_name,
284 BasicPublishOptions::default(),
285 message_data,
286 BasicProperties::default()
287 .with_content_type("application/json".into())
288 .with_delivery_mode(2), // Persistent
289 )
290 .await?
291 .await?;
292
293 debug!("Sent message to retry queue: {}", retry_queue_name);
294 Ok(())
295 }
296
297 /// Send message to retry queue with delay and custom headers
298 async fn send_to_retry_queue_with_headers(
299 &self,
300 channel: &Channel,
301 message_data: &[u8],
302 retry_attempt: u32,
303 delay: std::time::Duration,
304 headers: FieldTable,
305 ) -> Result<(), RustRabbitError> {
306 let retry_queue_name = self
307 .create_retry_queue(channel, retry_attempt, delay)
308 .await?;
309
310 // Publish to retry queue with headers
311 channel
312 .basic_publish(
313 "", // Default exchange
314 &retry_queue_name,
315 BasicPublishOptions::default(),
316 message_data,
317 BasicProperties::default()
318 .with_content_type("application/json".into())
319 .with_delivery_mode(2) // Persistent
320 .with_headers(headers),
321 )
322 .await?
323 .await?;
324
325 debug!(
326 "Sent message to retry queue with headers: {}",
327 retry_queue_name
328 );
329 Ok(())
330 }
331
332 /// Send message to DLQ
333 async fn send_to_dlq_simple(
334 &self,
335 channel: &Channel,
336 message_data: &[u8],
337 ) -> Result<(), RustRabbitError> {
338 let dlq_name = self.create_dlq(channel).await?;
339
340 // Publish to DLQ
341 channel
342 .basic_publish(
343 "", // Default exchange
344 &dlq_name,
345 BasicPublishOptions::default(),
346 message_data,
347 BasicProperties::default()
348 .with_content_type("application/json".into())
349 .with_delivery_mode(2), // Persistent
350 )
351 .await?
352 .await?;
353
354 debug!("Sent message to DLQ: {}", dlq_name);
355 Ok(())
356 }
357
358 /// Create delay exchange using RabbitMQ delayed message exchange plugin
359 /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
360 async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
361 if let Some(retry_config) = &self.retry_config {
362 let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
363
364 // Declare delay exchange with x-delayed-type argument
365 let mut args = FieldTable::default();
366 args.insert(
367 "x-delayed-type".into(),
368 AMQPValue::LongString("direct".into()),
369 );
370
371 channel
372 .exchange_declare(
373 &delay_exchange,
374 lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
375 lapin::options::ExchangeDeclareOptions {
376 durable: true,
377 ..Default::default()
378 },
379 args,
380 )
381 .await?;
382
383 debug!(
384 "Created delay exchange: {} (x-delayed-message type)",
385 delay_exchange
386 );
387 Ok(delay_exchange)
388 } else {
389 Err(RustRabbitError::Retry(
390 "Retry config not configured".to_string(),
391 ))
392 }
393 }
394
395 /// Send message to delay exchange with x-delay header for retry
396 /// Message will be automatically routed back to the original queue after delay
397 async fn send_to_delay_exchange(
398 &self,
399 channel: &Channel,
400 message_data: &[u8],
401 delay: std::time::Duration,
402 ) -> Result<(), RustRabbitError> {
403 let delay_exchange = self.create_delay_exchange(channel).await?;
404 let delay_ms = delay.as_millis() as i64;
405
406 // Publish to delay exchange with x-delay header
407 // The message will be re-delivered to original queue after delay
408 channel
409 .basic_publish(
410 &delay_exchange,
411 &self.queue_name, // Routing key: original queue name
412 BasicPublishOptions::default(),
413 message_data,
414 BasicProperties::default()
415 .with_content_type("application/json".into())
416 .with_delivery_mode(2) // Persistent
417 .with_headers({
418 let mut headers = FieldTable::default();
419 headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
420 headers
421 }),
422 )
423 .await?
424 .await?;
425
426 debug!(
427 "Sent message to delay exchange: {} with delay: {}ms",
428 delay_exchange, delay_ms
429 );
430 Ok(())
431 }
432
433 /// Send message to delay exchange with x-delay header and custom headers for retry
434 /// Message will be automatically routed back to the original queue after delay
435 async fn send_to_delay_exchange_with_headers(
436 &self,
437 channel: &Channel,
438 message_data: &[u8],
439 delay: std::time::Duration,
440 mut headers: FieldTable,
441 ) -> Result<(), RustRabbitError> {
442 let delay_exchange = self.create_delay_exchange(channel).await?;
443 let delay_ms = delay.as_millis() as i64;
444
445 // Add x-delay header to existing headers
446 headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
447
448 // Publish to delay exchange with headers
449 // The message will be re-delivered to original queue after delay
450 channel
451 .basic_publish(
452 &delay_exchange,
453 &self.queue_name, // Routing key: original queue name
454 BasicPublishOptions::default(),
455 message_data,
456 BasicProperties::default()
457 .with_content_type("application/json".into())
458 .with_delivery_mode(2) // Persistent
459 .with_headers(headers),
460 )
461 .await?
462 .await?;
463
464 debug!(
465 "Sent message to delay exchange with headers: {} with delay: {}ms",
466 delay_exchange, delay_ms
467 );
468 Ok(())
469 }
470
471 /// Start consuming messages with smart MassTransit detection
472 /// Handler receives just the payload type T (no wrapper)
473 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
474 where
475 T: DeserializeOwned + Send + Clone + Sync + 'static + Serialize,
476 H: Fn(T) -> Fut + Send + Sync + Clone + 'static,
477 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
478 {
479 let channel = self.connection.create_channel().await?;
480
481 // Set prefetch count
482 channel
483 .basic_qos(
484 self.prefetch_count,
485 lapin::options::BasicQosOptions::default(),
486 )
487 .await?;
488
489 // Setup infrastructure (queues, exchanges)
490 self.setup_infrastructure(&channel).await?;
491
492 // Start consuming
493 let mut consumer = channel
494 .basic_consume(
495 &self.queue_name,
496 "",
497 BasicConsumeOptions::default(),
498 FieldTable::default(),
499 )
500 .await?;
501
502 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
503
504 debug!("Started consuming from queue: {}", self.queue_name);
505
506 // Process messages
507 while let Some(delivery_result) = consumer.next().await {
508 let delivery = delivery_result?;
509 let permit = semaphore.clone().acquire_owned().await.unwrap();
510 let handler_clone = handler.clone();
511 let auto_ack = self.auto_ack;
512 let channel_clone = Arc::new(channel.clone());
513 let retry_config = self.retry_config.clone();
514 let consumer_self = Consumer {
515 connection: self.connection.clone(),
516 queue_name: self.queue_name.clone(),
517 exchange_name: self.exchange_name.clone(),
518 routing_key: self.routing_key.clone(),
519 retry_config: self.retry_config.clone(),
520 prefetch_count: self.prefetch_count,
521 auto_ack: self.auto_ack,
522 };
523
524 tokio::spawn(async move {
525 let _permit = permit;
526 let delivery_tag = delivery.delivery_tag;
527 let properties = delivery.properties;
528
529 // Smart detection: Try MassTransit format first
530 let (payload, correlation_id_from_mt) =
531 match MassTransitEnvelope::from_slice(&delivery.data) {
532 Ok(mt_envelope) => {
533 // MassTransit format detected - extract payload
534 match mt_envelope.extract_message::<T>() {
535 Ok(data) => {
536 debug!("Detected MassTransit format, extracted payload");
537 (data, mt_envelope.correlation_id().map(|s| s.to_string()))
538 }
539 Err(e) => {
540 error!(
541 "Failed to extract payload from MassTransit envelope: {}",
542 e
543 );
544 if auto_ack {
545 if let Err(e) = channel_clone
546 .basic_nack(
547 delivery_tag,
548 lapin::options::BasicNackOptions {
549 multiple: false,
550 requeue: false,
551 },
552 )
553 .await
554 {
555 error!("Failed to nack malformed message: {}", e);
556 }
557 }
558 return;
559 }
560 }
561 }
562 Err(_) => {
563 // Not MassTransit format - try direct deserialization
564 match serde_json::from_slice::<T>(&delivery.data) {
565 Ok(data) => {
566 debug!("Direct format detected");
567 (data, None)
568 }
569 Err(e) => {
570 error!("Failed to deserialize message: {}", e);
571 if auto_ack {
572 if let Err(e) = channel_clone
573 .basic_nack(
574 delivery_tag,
575 lapin::options::BasicNackOptions {
576 multiple: false,
577 requeue: false,
578 },
579 )
580 .await
581 {
582 error!("Failed to nack malformed message: {}", e);
583 }
584 }
585 return;
586 }
587 }
588 }
589 };
590
591 // Read metadata from headers
592 let retry_attempt = read_retry_attempt(&properties);
593 let correlation_id =
594 correlation_id_from_mt.or_else(|| read_correlation_id(&properties));
595
596 // If we got correlation_id from MassTransit but it's not in headers, we should add it
597 // But for now, we'll just use it for retries
598
599 // Process message - handler receives just T
600 match handler_clone(payload.clone()).await {
601 Ok(()) => {
602 if auto_ack {
603 if let Err(e) = channel_clone
604 .basic_ack(delivery_tag, BasicAckOptions::default())
605 .await
606 {
607 error!("Failed to ack message: {}", e);
608 }
609 }
610 debug!("Message processed successfully");
611 }
612 Err(e) => {
613 error!("Handler error: {}", e);
614 if auto_ack {
615 // Check if retry is configured
616 if let Some(retry_cfg) = &retry_config {
617 if retry_attempt < retry_cfg.max_retries {
618 // Calculate delay for next retry
619 if let Some(delay) = retry_cfg.calculate_delay(retry_attempt) {
620 warn!(
621 "Scheduling retry {} with delay {:?} for message",
622 retry_attempt + 1,
623 delay
624 );
625
626 // Serialize payload (raw, no wrapper)
627 let payload_bytes = match serde_json::to_vec(&payload) {
628 Ok(bytes) => bytes,
629 Err(e) => {
630 error!(
631 "Failed to serialize payload for retry: {}",
632 e
633 );
634 if let Err(e) = channel_clone
635 .basic_nack(
636 delivery_tag,
637 lapin::options::BasicNackOptions {
638 multiple: false,
639 requeue: false,
640 },
641 )
642 .await
643 {
644 error!("Failed to nack message: {}", e);
645 }
646 return;
647 }
648 };
649
650 // Update headers with new retry_attempt, preserving existing headers
651 let mut updated_headers = update_headers_with_retry(
652 properties.headers().as_ref(),
653 retry_attempt + 1,
654 );
655
656 // Preserve correlation_id in headers if present
657 if let Some(corr_id) = &correlation_id {
658 updated_headers.insert(
659 HEADER_CORRELATION_ID.into(),
660 AMQPValue::LongString(corr_id.clone().into()),
661 );
662 }
663
664 // Send via appropriate strategy (TTL or DelayedExchange)
665 let send_result = if matches!(
666 retry_cfg.delay_strategy,
667 crate::retry::DelayStrategy::DelayedExchange
668 ) {
669 consumer_self
670 .send_to_delay_exchange_with_headers(
671 &channel_clone,
672 &payload_bytes,
673 delay,
674 updated_headers,
675 )
676 .await
677 } else {
678 consumer_self
679 .send_to_retry_queue_with_headers(
680 &channel_clone,
681 &payload_bytes,
682 retry_attempt + 1,
683 delay,
684 updated_headers,
685 )
686 .await
687 };
688
689 if let Err(e) = send_result {
690 error!("Failed to send retry message: {}", e);
691 if let Err(e) = channel_clone
692 .basic_nack(
693 delivery_tag,
694 lapin::options::BasicNackOptions {
695 multiple: false,
696 requeue: false,
697 },
698 )
699 .await
700 {
701 error!("Failed to nack message: {}", e);
702 }
703 return;
704 }
705
706 // ACK original message (it's now queued for retry)
707 if let Err(e) = channel_clone
708 .basic_ack(delivery_tag, BasicAckOptions::default())
709 .await
710 {
711 error!("Failed to ack message after retry: {}", e);
712 }
713 } else {
714 // No more retries, send to DLQ
715 warn!("Retry exhausted, sending to DLQ");
716 let payload_bytes = match serde_json::to_vec(&payload) {
717 Ok(bytes) => bytes,
718 Err(e) => {
719 error!(
720 "Failed to serialize payload for DLQ: {}. Creating error payload.",
721 e
722 );
723 // Create descriptive error payload instead of using raw delivery.data
724 serde_json::to_vec(&serde_json::json!({
725 "error": "Failed to serialize payload for DLQ",
726 "serialization_error": e.to_string(),
727 "retry_attempt": retry_attempt,
728 "original_message_size": delivery.data.len()
729 }))
730 .unwrap_or_else(|_| {
731 // Last resort: minimal error message
732 format!(
733 r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
734 retry_attempt
735 )
736 .into_bytes()
737 })
738 }
739 };
740 if let Err(e) = consumer_self
741 .send_to_dlq_simple(&channel_clone, &payload_bytes)
742 .await
743 {
744 error!("Failed to send to DLQ: {}", e);
745 }
746 if let Err(e) = channel_clone
747 .basic_ack(delivery_tag, BasicAckOptions::default())
748 .await
749 {
750 error!("Failed to ack message after DLQ: {}", e);
751 }
752 }
753 } else {
754 // Retry exhausted, send to DLQ
755 warn!("Max retries reached, sending to DLQ");
756 let payload_bytes = match serde_json::to_vec(&payload) {
757 Ok(bytes) => bytes,
758 Err(e) => {
759 error!(
760 "Failed to serialize payload for DLQ: {}. Creating error payload.",
761 e
762 );
763 // Create descriptive error payload instead of using raw delivery.data
764 serde_json::to_vec(&serde_json::json!({
765 "error": "Failed to serialize payload for DLQ",
766 "serialization_error": e.to_string(),
767 "retry_attempt": retry_attempt,
768 "original_message_size": delivery.data.len()
769 }))
770 .unwrap_or_else(|_| {
771 // Last resort: minimal error message
772 format!(
773 r#"{{"error":"Serialization failed for DLQ","attempt":{}}}"#,
774 retry_attempt
775 )
776 .into_bytes()
777 })
778 }
779 };
780 if let Err(e) = consumer_self
781 .send_to_dlq_simple(&channel_clone, &payload_bytes)
782 .await
783 {
784 error!("Failed to send to DLQ: {}", e);
785 }
786 if let Err(e) = channel_clone
787 .basic_ack(delivery_tag, BasicAckOptions::default())
788 .await
789 {
790 error!("Failed to ack message after DLQ: {}", e);
791 }
792 }
793 } else {
794 // No retry config, just nack
795 if let Err(e) = channel_clone
796 .basic_nack(
797 delivery_tag,
798 lapin::options::BasicNackOptions {
799 multiple: false,
800 requeue: false,
801 },
802 )
803 .await
804 {
805 error!("Failed to nack message: {}", e);
806 }
807 }
808 }
809 }
810 }
811 });
812 }
813
814 Ok(())
815 }
816
817 /// Setup queue and exchange infrastructure
818 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
819 // Declare queue
820 channel
821 .queue_declare(
822 &self.queue_name,
823 QueueDeclareOptions {
824 durable: true,
825 ..Default::default()
826 },
827 FieldTable::default(),
828 )
829 .await?;
830
831 // Bind to exchange if specified
832 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
833 channel
834 .queue_bind(
835 &self.queue_name,
836 exchange,
837 routing_key,
838 lapin::options::QueueBindOptions::default(),
839 FieldTable::default(),
840 )
841 .await?;
842 }
843
844 // Setup delay exchange if using DelayedExchange strategy
845 if let Some(retry_config) = &self.retry_config {
846 if matches!(
847 retry_config.delay_strategy,
848 crate::retry::DelayStrategy::DelayedExchange
849 ) {
850 let delay_exchange = self.create_delay_exchange(channel).await?;
851
852 // Bind delay exchange to original queue
853 channel
854 .queue_bind(
855 &self.queue_name,
856 &delay_exchange,
857 &self.queue_name, // Routing key: original queue name
858 lapin::options::QueueBindOptions::default(),
859 FieldTable::default(),
860 )
861 .await?;
862
863 debug!(
864 "Bound queue {} to delay exchange {}",
865 self.queue_name, delay_exchange
866 );
867 }
868 }
869
870 Ok(())
871 }
872
873 /// Start consuming message envelopes with full retry support
874 pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
875 where
876 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
877 H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
878 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
879 {
880 let channel = self.connection.create_channel().await?;
881 let retry_config = self.retry_config.clone();
882
883 // Set prefetch count
884 channel
885 .basic_qos(
886 self.prefetch_count,
887 lapin::options::BasicQosOptions::default(),
888 )
889 .await?;
890
891 // Setup queue and exchange
892 self.setup_infrastructure(&channel).await?;
893
894 // Create consumer
895 let mut consumer = channel
896 .basic_consume(
897 &self.queue_name,
898 "rust-rabbit-envelope-consumer",
899 BasicConsumeOptions::default(),
900 FieldTable::default(),
901 )
902 .await?;
903
904 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
905
906 debug!(
907 "Started consuming envelopes from queue: {}",
908 self.queue_name
909 );
910
911 // Process message envelopes with retry support
912 while let Some(delivery_result) = consumer.next().await {
913 let delivery = delivery_result?;
914 let permit = semaphore.clone().acquire_owned().await.unwrap();
915 let handler_clone = handler.clone();
916 let auto_ack = self.auto_ack;
917 let channel_clone = Arc::new(channel.clone());
918 let retry_config_clone = retry_config.clone();
919 let queue_name = self.queue_name.clone();
920 let connection = self.connection.clone();
921
922 tokio::spawn(async move {
923 let _permit = permit;
924
925 // Try to deserialize as MessageEnvelope
926 match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
927 Ok(mut envelope) => {
928 debug!(
929 "Processing envelope {} (attempt {}/{})",
930 envelope.metadata.message_id,
931 envelope.metadata.retry_attempt + 1,
932 envelope.metadata.max_retries + 1
933 );
934
935 // Process message
936 match handler_clone(envelope.clone()).await {
937 Ok(()) => {
938 if auto_ack {
939 if let Err(e) = channel_clone
940 .basic_ack(
941 delivery.delivery_tag,
942 BasicAckOptions::default(),
943 )
944 .await
945 {
946 error!("Failed to ack message: {}", e);
947 }
948 }
949 debug!(
950 "Envelope {} processed successfully",
951 envelope.metadata.message_id
952 );
953 }
954 Err(e) => {
955 error!(
956 "Handler error for envelope {}: {}",
957 envelope.metadata.message_id, e
958 );
959
960 // Determine error type (simplified classification)
961 let error_type = classify_error(e.as_ref());
962
963 // Add error to envelope
964 envelope = envelope.with_error(
965 &e.to_string(),
966 error_type,
967 Some(&format!("Queue: {}", queue_name)),
968 );
969
970 if auto_ack {
971 // Check if we should retry
972 if let Some(retry_cfg) = &retry_config_clone {
973 if !envelope.is_retry_exhausted() {
974 // Calculate delay and schedule retry
975 if let Some(delay) = retry_cfg
976 .calculate_delay(envelope.metadata.retry_attempt)
977 {
978 warn!(
979 "Scheduling retry {} for envelope {} with delay {:?}",
980 envelope.metadata.retry_attempt + 1,
981 envelope.metadata.message_id,
982 delay
983 );
984
985 // Increment retry attempt in envelope
986 envelope.metadata.retry_attempt += 1;
987
988 // Serialize updated envelope
989 match serde_json::to_vec(&envelope) {
990 Ok(retry_payload) => {
991 // Create consumer instance for access to methods
992 let consumer_self = Consumer {
993 connection: connection.clone(),
994 queue_name: queue_name.clone(),
995 exchange_name: None,
996 routing_key: None,
997 retry_config: retry_config_clone
998 .clone(),
999 prefetch_count: 10,
1000 auto_ack: true,
1001 };
1002
1003 // Send via appropriate strategy (TTL or DelayedExchange)
1004 let send_result = if matches!(
1005 retry_config_clone
1006 .as_ref()
1007 .map(|c| c.delay_strategy),
1008 Some(crate::retry::DelayStrategy::DelayedExchange)
1009 ) {
1010 consumer_self
1011 .send_to_delay_exchange(
1012 &channel_clone,
1013 &retry_payload,
1014 delay,
1015 )
1016 .await
1017 } else {
1018 consumer_self
1019 .send_to_retry_queue(
1020 &channel_clone,
1021 &retry_payload,
1022 envelope.metadata.retry_attempt,
1023 delay,
1024 )
1025 .await
1026 };
1027
1028 if let Err(e) = send_result {
1029 error!("Failed to send envelope for retry: {}", e);
1030 // Fallback to simple nack
1031 if let Err(e) = channel_clone
1032 .basic_nack(
1033 delivery.delivery_tag,
1034 lapin::options::BasicNackOptions {
1035 multiple: false,
1036 requeue: false,
1037 },
1038 )
1039 .await
1040 {
1041 error!("Failed to nack message: {}", e);
1042 }
1043 return;
1044 }
1045
1046 // ACK original message (it's now queued for retry)
1047 if let Err(e) = channel_clone
1048 .basic_ack(
1049 delivery.delivery_tag,
1050 BasicAckOptions::default(),
1051 )
1052 .await
1053 {
1054 error!("Failed to ack message after retry: {}", e);
1055 }
1056 }
1057 Err(e) => {
1058 error!("Failed to serialize envelope for retry: {}", e);
1059 // Fallback to simple nack
1060 if let Err(e) = channel_clone
1061 .basic_nack(
1062 delivery.delivery_tag,
1063 lapin::options::BasicNackOptions {
1064 multiple: false,
1065 requeue: false,
1066 },
1067 )
1068 .await
1069 {
1070 error!("Failed to nack message: {}", e);
1071 }
1072 }
1073 }
1074 } else {
1075 // No more retries, send to DLQ
1076 Self::send_to_dlq(
1077 &envelope,
1078 retry_cfg,
1079 &connection,
1080 &queue_name,
1081 )
1082 .await;
1083
1084 // ACK original message
1085 if let Err(e) = channel_clone
1086 .basic_ack(
1087 delivery.delivery_tag,
1088 BasicAckOptions::default(),
1089 )
1090 .await
1091 {
1092 error!(
1093 "Failed to ack message after DLQ: {}",
1094 e
1095 );
1096 }
1097 }
1098 } else {
1099 // Retry exhausted, send to DLQ
1100 warn!(
1101 "Retry exhausted for envelope {}",
1102 envelope.metadata.message_id
1103 );
1104 Self::send_to_dlq(
1105 &envelope,
1106 retry_cfg,
1107 &connection,
1108 &queue_name,
1109 )
1110 .await;
1111
1112 // ACK original message
1113 if let Err(e) = channel_clone
1114 .basic_ack(
1115 delivery.delivery_tag,
1116 BasicAckOptions::default(),
1117 )
1118 .await
1119 {
1120 error!("Failed to ack message after DLQ: {}", e);
1121 }
1122 }
1123 } else {
1124 // No retry config, just nack
1125 if let Err(e) = channel_clone
1126 .basic_nack(
1127 delivery.delivery_tag,
1128 lapin::options::BasicNackOptions {
1129 multiple: false,
1130 requeue: false,
1131 },
1132 )
1133 .await
1134 {
1135 error!("Failed to nack message: {}", e);
1136 }
1137 }
1138 }
1139 }
1140 }
1141 }
1142 Err(e) => {
1143 error!("Failed to deserialize message envelope: {}", e);
1144 if auto_ack {
1145 // Reject malformed messages
1146 if let Err(e) = channel_clone
1147 .basic_nack(
1148 delivery.delivery_tag,
1149 lapin::options::BasicNackOptions {
1150 multiple: false,
1151 requeue: false,
1152 },
1153 )
1154 .await
1155 {
1156 error!("Failed to nack malformed envelope: {}", e);
1157 }
1158 }
1159 }
1160 }
1161 });
1162 }
1163
1164 Ok(())
1165 }
1166
1167 /// Send failed message to Dead Letter Queue
1168 async fn send_to_dlq<T>(
1169 envelope: &MessageEnvelope<T>,
1170 retry_config: &RetryConfig,
1171 connection: &Arc<Connection>,
1172 queue_name: &str,
1173 ) where
1174 T: serde::Serialize,
1175 {
1176 match connection.create_channel().await {
1177 Ok(dlq_channel) => {
1178 let dlq_name = retry_config.get_dead_letter_queue(queue_name);
1179
1180 // Declare DLQ
1181 if let Err(e) = dlq_channel
1182 .queue_declare(
1183 &dlq_name,
1184 QueueDeclareOptions {
1185 durable: true,
1186 ..Default::default()
1187 },
1188 FieldTable::default(),
1189 )
1190 .await
1191 {
1192 error!("Failed to declare DLQ {}: {}", dlq_name, e);
1193 return;
1194 }
1195
1196 // Publish to DLQ with failure summary
1197 let failure_summary = envelope.get_failure_summary();
1198 let dlq_payload = serde_json::json!({
1199 "envelope": envelope,
1200 "failure_summary": failure_summary,
1201 "sent_to_dlq_at": chrono::Utc::now(),
1202 });
1203
1204 if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
1205 if let Err(e) = dlq_channel
1206 .basic_publish(
1207 "",
1208 &dlq_name,
1209 lapin::options::BasicPublishOptions::default(),
1210 &payload_bytes,
1211 lapin::BasicProperties::default(),
1212 )
1213 .await
1214 {
1215 error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1216 } else {
1217 warn!(
1218 "Sent envelope {} to DLQ: {}",
1219 envelope.metadata.message_id, failure_summary
1220 );
1221 }
1222 }
1223 }
1224 Err(e) => {
1225 error!("Failed to create DLQ channel: {}", e);
1226 }
1227 }
1228 }
1229}
1230
1231/// Classify error type based on error message (simplified heuristics)
1232fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1233 let error_msg = error.to_string().to_lowercase();
1234
1235 if error_msg.contains("timeout")
1236 || error_msg.contains("connection")
1237 || error_msg.contains("network")
1238 || error_msg.contains("temporary")
1239 {
1240 ErrorType::Transient
1241 } else if error_msg.contains("rate limit")
1242 || error_msg.contains("quota")
1243 || error_msg.contains("resource")
1244 {
1245 ErrorType::Resource
1246 } else if error_msg.contains("validation")
1247 || error_msg.contains("authentication")
1248 || error_msg.contains("authorization")
1249 || error_msg.contains("invalid")
1250 || error_msg.contains("bad request")
1251 {
1252 ErrorType::Permanent
1253 } else {
1254 ErrorType::Unknown
1255 }
1256}