rust_rabbit/consumer.rs
1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{ErrorType, MessageEnvelope, WireMessage},
5 retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9 options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10 types::{AMQPValue, FieldTable},
11 BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Semaphore;
18use tracing::{debug, error, warn};
19
20/// Message wrapper with retry tracking
21#[derive(Debug)]
22pub struct Message<T>
23where
24 T: Clone,
25{
26 pub data: T,
27 pub retry_attempt: u32,
28 tag: u64,
29 channel: Arc<Channel>,
30}
31
32impl<T> Clone for Message<T>
33where
34 T: Clone,
35{
36 fn clone(&self) -> Self {
37 Self {
38 data: self.data.clone(),
39 retry_attempt: self.retry_attempt,
40 tag: self.tag,
41 channel: Arc::clone(&self.channel),
42 }
43 }
44}
45
46impl<T> Message<T>
47where
48 T: Clone,
49{
50 /// Acknowledge the message
51 pub async fn ack(&self) -> Result<(), RustRabbitError> {
52 self.channel
53 .basic_ack(self.tag, BasicAckOptions::default())
54 .await
55 .map_err(RustRabbitError::from)
56 }
57
58 /// Reject and requeue the message
59 pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
60 self.channel
61 .basic_nack(
62 self.tag,
63 lapin::options::BasicNackOptions {
64 multiple: false,
65 requeue,
66 },
67 )
68 .await
69 .map_err(RustRabbitError::from)
70 }
71}
72
73/// Consumer configuration builder
74pub struct ConsumerBuilder {
75 connection: Arc<Connection>,
76 queue_name: String,
77 exchange_name: Option<String>,
78 routing_key: Option<String>,
79 retry_config: Option<RetryConfig>,
80 prefetch_count: Option<u16>,
81 auto_ack: bool,
82}
83
84impl ConsumerBuilder {
85 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
86 Self {
87 connection,
88 queue_name: queue_name.into(),
89 exchange_name: None,
90 routing_key: None,
91 retry_config: None,
92 prefetch_count: Some(10),
93 auto_ack: true,
94 }
95 }
96
97 /// Bind to an exchange with routing key
98 pub fn bind_to_exchange(
99 mut self,
100 exchange: impl Into<String>,
101 routing_key: impl Into<String>,
102 ) -> Self {
103 self.exchange_name = Some(exchange.into());
104 self.routing_key = Some(routing_key.into());
105 self
106 }
107
108 /// Set routing key (for use with bind_to_exchange)
109 pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
110 self.routing_key = Some(routing_key.into());
111 self
112 }
113
114 /// Set concurrency level (same as prefetch count)
115 pub fn concurrency(mut self, count: u16) -> Self {
116 self.prefetch_count = Some(count);
117 self
118 }
119
120 /// Configure retry behavior
121 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
122 self.retry_config = Some(retry_config);
123 self
124 }
125
126 /// Set TTL for dead letter queue (auto-cleanup failed messages)
127 /// This is a convenience method that modifies the retry_config if it exists
128 ///
129 /// # Example
130 /// ```ignore
131 /// let consumer = Consumer::builder(connection, "orders")
132 /// .with_retry(RetryConfig::exponential_default())
133 /// .with_dlq_ttl(Duration::from_secs(86400)) // 1 day
134 /// .build();
135 /// ```
136 pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
137 if let Some(retry_config) = self.retry_config.as_mut() {
138 retry_config.dlq_ttl = Some(ttl);
139 }
140 self
141 }
142
143 /// Set prefetch count
144 pub fn with_prefetch(mut self, count: u16) -> Self {
145 self.prefetch_count = Some(count);
146 self
147 }
148
149 /// Disable auto-acknowledge (manual ack required)
150 pub fn manual_ack(mut self) -> Self {
151 self.auto_ack = false;
152 self
153 }
154
155 /// Build the consumer
156 pub fn build(self) -> Consumer {
157 Consumer {
158 connection: self.connection,
159 queue_name: self.queue_name,
160 exchange_name: self.exchange_name,
161 routing_key: self.routing_key,
162 retry_config: self.retry_config,
163 prefetch_count: self.prefetch_count.unwrap_or(10),
164 auto_ack: self.auto_ack,
165 }
166 }
167}
168
169/// Simplified Consumer for message consumption
170pub struct Consumer {
171 connection: Arc<Connection>,
172 queue_name: String,
173 exchange_name: Option<String>,
174 routing_key: Option<String>,
175 retry_config: Option<RetryConfig>,
176 prefetch_count: u16,
177 auto_ack: bool,
178}
179
180impl Consumer {
181 /// Create a new consumer builder
182 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
183 ConsumerBuilder::new(connection, queue_name)
184 }
185
186 /// Create retry queue with TTL
187 async fn create_retry_queue(
188 &self,
189 channel: &Channel,
190 retry_attempt: u32,
191 delay: std::time::Duration,
192 ) -> Result<String, RustRabbitError> {
193 let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
194 let delay_ms = delay.as_millis() as i64;
195
196 // Create retry queue with TTL that routes back to original queue
197 let mut args = FieldTable::default();
198 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
199 args.insert(
200 "x-dead-letter-exchange".into(),
201 AMQPValue::LongString("".into()),
202 ); // Default exchange
203 args.insert(
204 "x-dead-letter-routing-key".into(),
205 AMQPValue::LongString(self.queue_name.clone().into()),
206 );
207
208 channel
209 .queue_declare(
210 &retry_queue_name,
211 QueueDeclareOptions {
212 durable: true,
213 ..Default::default()
214 },
215 args,
216 )
217 .await?;
218
219 debug!(
220 "Created retry queue: {} with TTL: {}ms",
221 retry_queue_name, delay_ms
222 );
223 Ok(retry_queue_name)
224 }
225
226 /// Create DLQ (Dead Letter Queue) with optional TTL
227 async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
228 let dlq_name = format!("{}.dlq", self.queue_name);
229
230 // Build queue arguments with optional TTL
231 let mut args = FieldTable::default();
232 if let Some(retry_config) = &self.retry_config {
233 if let Some(ttl) = &retry_config.dlq_ttl {
234 let ttl_ms = ttl.as_millis() as i64;
235 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
236 debug!("DLQ TTL: {}ms", ttl_ms);
237 }
238 }
239
240 channel
241 .queue_declare(
242 &dlq_name,
243 QueueDeclareOptions {
244 durable: true,
245 ..Default::default()
246 },
247 args,
248 )
249 .await?;
250
251 debug!("Created DLQ: {}", dlq_name);
252 Ok(dlq_name)
253 }
254
255 /// Send message to retry queue with delay
256 async fn send_to_retry_queue(
257 &self,
258 channel: &Channel,
259 message_data: &[u8],
260 retry_attempt: u32,
261 delay: std::time::Duration,
262 ) -> Result<(), RustRabbitError> {
263 let retry_queue_name = self
264 .create_retry_queue(channel, retry_attempt, delay)
265 .await?;
266
267 // Publish to retry queue
268 channel
269 .basic_publish(
270 "", // Default exchange
271 &retry_queue_name,
272 BasicPublishOptions::default(),
273 message_data,
274 BasicProperties::default()
275 .with_content_type("application/json".into())
276 .with_delivery_mode(2), // Persistent
277 )
278 .await?
279 .await?;
280
281 debug!("Sent message to retry queue: {}", retry_queue_name);
282 Ok(())
283 }
284
285 /// Send message to DLQ
286 async fn send_to_dlq_simple(
287 &self,
288 channel: &Channel,
289 message_data: &[u8],
290 ) -> Result<(), RustRabbitError> {
291 let dlq_name = self.create_dlq(channel).await?;
292
293 // Publish to DLQ
294 channel
295 .basic_publish(
296 "", // Default exchange
297 &dlq_name,
298 BasicPublishOptions::default(),
299 message_data,
300 BasicProperties::default()
301 .with_content_type("application/json".into())
302 .with_delivery_mode(2), // Persistent
303 )
304 .await?
305 .await?;
306
307 debug!("Sent message to DLQ: {}", dlq_name);
308 Ok(())
309 }
310
311 /// Create delay exchange using RabbitMQ delayed message exchange plugin
312 /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
313 async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
314 if let Some(retry_config) = &self.retry_config {
315 let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
316
317 // Declare delay exchange with x-delayed-type argument
318 let mut args = FieldTable::default();
319 args.insert(
320 "x-delayed-type".into(),
321 AMQPValue::LongString("direct".into()),
322 );
323
324 channel
325 .exchange_declare(
326 &delay_exchange,
327 lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
328 lapin::options::ExchangeDeclareOptions {
329 durable: true,
330 ..Default::default()
331 },
332 args,
333 )
334 .await?;
335
336 debug!(
337 "Created delay exchange: {} (x-delayed-message type)",
338 delay_exchange
339 );
340 Ok(delay_exchange)
341 } else {
342 Err(RustRabbitError::Retry(
343 "Retry config not configured".to_string(),
344 ))
345 }
346 }
347
348 /// Send message to delay exchange with x-delay header for retry
349 /// Message will be automatically routed back to the original queue after delay
350 async fn send_to_delay_exchange(
351 &self,
352 channel: &Channel,
353 message_data: &[u8],
354 delay: std::time::Duration,
355 ) -> Result<(), RustRabbitError> {
356 let delay_exchange = self.create_delay_exchange(channel).await?;
357 let delay_ms = delay.as_millis() as i64;
358
359 // Publish to delay exchange with x-delay header
360 // The message will be re-delivered to original queue after delay
361 channel
362 .basic_publish(
363 &delay_exchange,
364 &self.queue_name, // Routing key: original queue name
365 BasicPublishOptions::default(),
366 message_data,
367 BasicProperties::default()
368 .with_content_type("application/json".into())
369 .with_delivery_mode(2) // Persistent
370 .with_headers({
371 let mut headers = FieldTable::default();
372 headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
373 headers
374 }),
375 )
376 .await?
377 .await?;
378
379 debug!(
380 "Sent message to delay exchange: {} with delay: {}ms",
381 delay_exchange, delay_ms
382 );
383 Ok(())
384 }
385
386 /// Start consuming messages
387 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
388 where
389 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
390 H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
391 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
392 {
393 let channel = self.connection.create_channel().await?;
394
395 // Set prefetch count
396 channel
397 .basic_qos(
398 self.prefetch_count,
399 lapin::options::BasicQosOptions::default(),
400 )
401 .await?;
402
403 // Setup infrastructure (queues, exchanges)
404 self.setup_infrastructure(&channel).await?;
405
406 // Start consuming
407 let mut consumer = channel
408 .basic_consume(
409 &self.queue_name,
410 "",
411 BasicConsumeOptions::default(),
412 FieldTable::default(),
413 )
414 .await?;
415
416 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
417
418 debug!("Started consuming from queue: {}", self.queue_name);
419
420 // Process messages
421 while let Some(delivery_result) = consumer.next().await {
422 let delivery = delivery_result?;
423 let permit = semaphore.clone().acquire_owned().await.unwrap();
424 let handler_clone = handler.clone();
425 let auto_ack = self.auto_ack;
426 let channel_clone = Arc::new(channel.clone());
427 let retry_config = self.retry_config.clone();
428 let consumer_self = Consumer {
429 connection: self.connection.clone(),
430 queue_name: self.queue_name.clone(),
431 exchange_name: self.exchange_name.clone(),
432 routing_key: self.routing_key.clone(),
433 retry_config: self.retry_config.clone(),
434 prefetch_count: self.prefetch_count,
435 auto_ack: self.auto_ack,
436 };
437
438 tokio::spawn(async move {
439 let _permit = permit;
440
441 // Deserialize as WireMessage format
442 match serde_json::from_slice::<crate::message::WireMessage<T>>(&delivery.data) {
443 Ok(wire_msg) => {
444 let message = Message {
445 data: wire_msg.data,
446 retry_attempt: wire_msg.retry_attempt,
447 tag: delivery.delivery_tag,
448 channel: channel_clone.clone(),
449 };
450
451 // Process message
452 match handler_clone(message.clone()).await {
453 Ok(()) => {
454 if auto_ack {
455 if let Err(e) = message.ack().await {
456 error!("Failed to ack message: {}", e);
457 }
458 }
459 debug!("Message processed successfully");
460 }
461 Err(e) => {
462 error!("Handler error: {}", e);
463 if auto_ack {
464 // Check if retry is configured
465 if let Some(retry_cfg) = &retry_config {
466 if message.retry_attempt < retry_cfg.max_retries {
467 // Calculate delay for next retry
468 if let Some(delay) =
469 retry_cfg.calculate_delay(message.retry_attempt)
470 {
471 warn!(
472 "Scheduling retry {} with delay {:?} for message",
473 message.retry_attempt + 1,
474 delay
475 );
476
477 // Update retry attempt in wire message
478 let wire_msg = WireMessage {
479 data: message.data.clone(),
480 retry_attempt: message.retry_attempt + 1,
481 };
482
483 let retry_payload =
484 match serde_json::to_vec(&wire_msg) {
485 Ok(payload) => payload,
486 Err(e) => {
487 error!(
488 "Failed to serialize retry message: {}",
489 e
490 );
491 if let Err(e) =
492 message.nack(false).await
493 {
494 error!(
495 "Failed to nack message: {}",
496 e
497 );
498 }
499 return;
500 }
501 };
502
503 // Send via appropriate strategy (TTL or DelayedExchange)
504 let send_result = if matches!(
505 retry_cfg.delay_strategy,
506 crate::retry::DelayStrategy::DelayedExchange
507 ) {
508 consumer_self
509 .send_to_delay_exchange(
510 &channel_clone,
511 &retry_payload,
512 delay,
513 )
514 .await
515 } else {
516 consumer_self
517 .send_to_retry_queue(
518 &channel_clone,
519 &retry_payload,
520 message.retry_attempt + 1,
521 delay,
522 )
523 .await
524 };
525
526 if let Err(e) = send_result {
527 error!("Failed to send retry message: {}", e);
528 if let Err(e) = message.nack(false).await {
529 error!("Failed to nack message: {}", e);
530 }
531 return;
532 }
533
534 // ACK original message (it's now queued for retry)
535 if let Err(e) = message.ack().await {
536 error!(
537 "Failed to ack message after retry: {}",
538 e
539 );
540 }
541 } else {
542 // No more retries, send to DLQ
543 warn!("Retry exhausted, sending to DLQ");
544 if let Err(e) = consumer_self
545 .send_to_dlq_simple(
546 &channel_clone,
547 &delivery.data,
548 )
549 .await
550 {
551 error!("Failed to send to DLQ: {}", e);
552 }
553 if let Err(e) = message.ack().await {
554 error!(
555 "Failed to ack message after DLQ: {}",
556 e
557 );
558 }
559 }
560 } else {
561 // Retry exhausted, send to DLQ
562 warn!("Max retries reached, sending to DLQ");
563 if let Err(e) = consumer_self
564 .send_to_dlq_simple(&channel_clone, &delivery.data)
565 .await
566 {
567 error!("Failed to send to DLQ: {}", e);
568 }
569 if let Err(e) = message.ack().await {
570 error!("Failed to ack message after DLQ: {}", e);
571 }
572 }
573 } else {
574 // No retry config, just nack
575 if let Err(e) = message.nack(false).await {
576 error!("Failed to nack message: {}", e);
577 }
578 }
579 }
580 }
581 }
582 }
583 Err(e) => {
584 error!("Failed to deserialize message: {}", e);
585 if auto_ack {
586 // Reject malformed messages
587 if let Err(e) = channel_clone
588 .basic_nack(
589 delivery.delivery_tag,
590 lapin::options::BasicNackOptions {
591 multiple: false,
592 requeue: false,
593 },
594 )
595 .await
596 {
597 error!("Failed to nack malformed message: {}", e);
598 }
599 }
600 }
601 }
602 });
603 }
604
605 Ok(())
606 }
607
608 /// Setup queue and exchange infrastructure
609 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
610 // Declare queue
611 channel
612 .queue_declare(
613 &self.queue_name,
614 QueueDeclareOptions {
615 durable: true,
616 ..Default::default()
617 },
618 FieldTable::default(),
619 )
620 .await?;
621
622 // Bind to exchange if specified
623 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
624 channel
625 .queue_bind(
626 &self.queue_name,
627 exchange,
628 routing_key,
629 lapin::options::QueueBindOptions::default(),
630 FieldTable::default(),
631 )
632 .await?;
633 }
634
635 // Setup delay exchange if using DelayedExchange strategy
636 if let Some(retry_config) = &self.retry_config {
637 if matches!(
638 retry_config.delay_strategy,
639 crate::retry::DelayStrategy::DelayedExchange
640 ) {
641 let delay_exchange = self.create_delay_exchange(channel).await?;
642
643 // Bind delay exchange to original queue
644 channel
645 .queue_bind(
646 &self.queue_name,
647 &delay_exchange,
648 &self.queue_name, // Routing key: original queue name
649 lapin::options::QueueBindOptions::default(),
650 FieldTable::default(),
651 )
652 .await?;
653
654 debug!(
655 "Bound queue {} to delay exchange {}",
656 self.queue_name, delay_exchange
657 );
658 }
659 }
660
661 Ok(())
662 }
663
664 /// Start consuming message envelopes with full retry support
665 pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
666 where
667 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
668 H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
669 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
670 {
671 let channel = self.connection.create_channel().await?;
672 let retry_config = self.retry_config.clone();
673
674 // Set prefetch count
675 channel
676 .basic_qos(
677 self.prefetch_count,
678 lapin::options::BasicQosOptions::default(),
679 )
680 .await?;
681
682 // Setup queue and exchange
683 self.setup_infrastructure(&channel).await?;
684
685 // Create consumer
686 let mut consumer = channel
687 .basic_consume(
688 &self.queue_name,
689 "rust-rabbit-envelope-consumer",
690 BasicConsumeOptions::default(),
691 FieldTable::default(),
692 )
693 .await?;
694
695 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
696
697 debug!(
698 "Started consuming envelopes from queue: {}",
699 self.queue_name
700 );
701
702 // Process message envelopes with retry support
703 while let Some(delivery_result) = consumer.next().await {
704 let delivery = delivery_result?;
705 let permit = semaphore.clone().acquire_owned().await.unwrap();
706 let handler_clone = handler.clone();
707 let auto_ack = self.auto_ack;
708 let channel_clone = Arc::new(channel.clone());
709 let retry_config_clone = retry_config.clone();
710 let queue_name = self.queue_name.clone();
711 let connection = self.connection.clone();
712
713 tokio::spawn(async move {
714 let _permit = permit;
715
716 // Try to deserialize as MessageEnvelope
717 match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
718 Ok(mut envelope) => {
719 debug!(
720 "Processing envelope {} (attempt {}/{})",
721 envelope.metadata.message_id,
722 envelope.metadata.retry_attempt + 1,
723 envelope.metadata.max_retries + 1
724 );
725
726 // Process message
727 match handler_clone(envelope.clone()).await {
728 Ok(()) => {
729 if auto_ack {
730 if let Err(e) = channel_clone
731 .basic_ack(
732 delivery.delivery_tag,
733 BasicAckOptions::default(),
734 )
735 .await
736 {
737 error!("Failed to ack message: {}", e);
738 }
739 }
740 debug!(
741 "Envelope {} processed successfully",
742 envelope.metadata.message_id
743 );
744 }
745 Err(e) => {
746 error!(
747 "Handler error for envelope {}: {}",
748 envelope.metadata.message_id, e
749 );
750
751 // Determine error type (simplified classification)
752 let error_type = classify_error(e.as_ref());
753
754 // Add error to envelope
755 envelope = envelope.with_error(
756 &e.to_string(),
757 error_type,
758 Some(&format!("Queue: {}", queue_name)),
759 );
760
761 if auto_ack {
762 // Check if we should retry
763 if let Some(retry_cfg) = &retry_config_clone {
764 if !envelope.is_retry_exhausted() {
765 // Calculate delay and schedule retry
766 if let Some(delay) = retry_cfg
767 .calculate_delay(envelope.metadata.retry_attempt)
768 {
769 warn!(
770 "Scheduling retry {} for envelope {} with delay {:?}",
771 envelope.metadata.retry_attempt + 1,
772 envelope.metadata.message_id,
773 delay
774 );
775
776 // Increment retry attempt in envelope
777 envelope.metadata.retry_attempt += 1;
778
779 // Serialize updated envelope
780 match serde_json::to_vec(&envelope) {
781 Ok(retry_payload) => {
782 // Create consumer instance for access to methods
783 let consumer_self = Consumer {
784 connection: connection.clone(),
785 queue_name: queue_name.clone(),
786 exchange_name: None,
787 routing_key: None,
788 retry_config: retry_config_clone
789 .clone(),
790 prefetch_count: 10,
791 auto_ack: true,
792 };
793
794 // Send via appropriate strategy (TTL or DelayedExchange)
795 let send_result = if matches!(
796 retry_config_clone
797 .as_ref()
798 .map(|c| c.delay_strategy),
799 Some(crate::retry::DelayStrategy::DelayedExchange)
800 ) {
801 consumer_self
802 .send_to_delay_exchange(
803 &channel_clone,
804 &retry_payload,
805 delay,
806 )
807 .await
808 } else {
809 consumer_self
810 .send_to_retry_queue(
811 &channel_clone,
812 &retry_payload,
813 envelope.metadata.retry_attempt,
814 delay,
815 )
816 .await
817 };
818
819 if let Err(e) = send_result {
820 error!("Failed to send envelope for retry: {}", e);
821 // Fallback to simple nack
822 if let Err(e) = channel_clone
823 .basic_nack(
824 delivery.delivery_tag,
825 lapin::options::BasicNackOptions {
826 multiple: false,
827 requeue: false,
828 },
829 )
830 .await
831 {
832 error!("Failed to nack message: {}", e);
833 }
834 return;
835 }
836
837 // ACK original message (it's now queued for retry)
838 if let Err(e) = channel_clone
839 .basic_ack(
840 delivery.delivery_tag,
841 BasicAckOptions::default(),
842 )
843 .await
844 {
845 error!("Failed to ack message after retry: {}", e);
846 }
847 }
848 Err(e) => {
849 error!("Failed to serialize envelope for retry: {}", e);
850 // Fallback to simple nack
851 if let Err(e) = channel_clone
852 .basic_nack(
853 delivery.delivery_tag,
854 lapin::options::BasicNackOptions {
855 multiple: false,
856 requeue: false,
857 },
858 )
859 .await
860 {
861 error!("Failed to nack message: {}", e);
862 }
863 }
864 }
865 } else {
866 // No more retries, send to DLQ
867 Self::send_to_dlq(
868 &envelope,
869 retry_cfg,
870 &connection,
871 &queue_name,
872 )
873 .await;
874
875 // ACK original message
876 if let Err(e) = channel_clone
877 .basic_ack(
878 delivery.delivery_tag,
879 BasicAckOptions::default(),
880 )
881 .await
882 {
883 error!(
884 "Failed to ack message after DLQ: {}",
885 e
886 );
887 }
888 }
889 } else {
890 // Retry exhausted, send to DLQ
891 warn!(
892 "Retry exhausted for envelope {}",
893 envelope.metadata.message_id
894 );
895 Self::send_to_dlq(
896 &envelope,
897 retry_cfg,
898 &connection,
899 &queue_name,
900 )
901 .await;
902
903 // ACK original message
904 if let Err(e) = channel_clone
905 .basic_ack(
906 delivery.delivery_tag,
907 BasicAckOptions::default(),
908 )
909 .await
910 {
911 error!("Failed to ack message after DLQ: {}", e);
912 }
913 }
914 } else {
915 // No retry config, just nack
916 if let Err(e) = channel_clone
917 .basic_nack(
918 delivery.delivery_tag,
919 lapin::options::BasicNackOptions {
920 multiple: false,
921 requeue: false,
922 },
923 )
924 .await
925 {
926 error!("Failed to nack message: {}", e);
927 }
928 }
929 }
930 }
931 }
932 }
933 Err(e) => {
934 error!("Failed to deserialize message envelope: {}", e);
935 if auto_ack {
936 // Reject malformed messages
937 if let Err(e) = channel_clone
938 .basic_nack(
939 delivery.delivery_tag,
940 lapin::options::BasicNackOptions {
941 multiple: false,
942 requeue: false,
943 },
944 )
945 .await
946 {
947 error!("Failed to nack malformed envelope: {}", e);
948 }
949 }
950 }
951 }
952 });
953 }
954
955 Ok(())
956 }
957
958 /// Send failed message to Dead Letter Queue
959 async fn send_to_dlq<T>(
960 envelope: &MessageEnvelope<T>,
961 retry_config: &RetryConfig,
962 connection: &Arc<Connection>,
963 queue_name: &str,
964 ) where
965 T: serde::Serialize,
966 {
967 match connection.create_channel().await {
968 Ok(dlq_channel) => {
969 let dlq_name = retry_config.get_dead_letter_queue(queue_name);
970
971 // Declare DLQ
972 if let Err(e) = dlq_channel
973 .queue_declare(
974 &dlq_name,
975 QueueDeclareOptions {
976 durable: true,
977 ..Default::default()
978 },
979 FieldTable::default(),
980 )
981 .await
982 {
983 error!("Failed to declare DLQ {}: {}", dlq_name, e);
984 return;
985 }
986
987 // Publish to DLQ with failure summary
988 let failure_summary = envelope.get_failure_summary();
989 let dlq_payload = serde_json::json!({
990 "envelope": envelope,
991 "failure_summary": failure_summary,
992 "sent_to_dlq_at": chrono::Utc::now(),
993 });
994
995 if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
996 if let Err(e) = dlq_channel
997 .basic_publish(
998 "",
999 &dlq_name,
1000 lapin::options::BasicPublishOptions::default(),
1001 &payload_bytes,
1002 lapin::BasicProperties::default(),
1003 )
1004 .await
1005 {
1006 error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1007 } else {
1008 warn!(
1009 "Sent envelope {} to DLQ: {}",
1010 envelope.metadata.message_id, failure_summary
1011 );
1012 }
1013 }
1014 }
1015 Err(e) => {
1016 error!("Failed to create DLQ channel: {}", e);
1017 }
1018 }
1019 }
1020}
1021
1022/// Classify error type based on error message (simplified heuristics)
1023fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1024 let error_msg = error.to_string().to_lowercase();
1025
1026 if error_msg.contains("timeout")
1027 || error_msg.contains("connection")
1028 || error_msg.contains("network")
1029 || error_msg.contains("temporary")
1030 {
1031 ErrorType::Transient
1032 } else if error_msg.contains("rate limit")
1033 || error_msg.contains("quota")
1034 || error_msg.contains("resource")
1035 {
1036 ErrorType::Resource
1037 } else if error_msg.contains("validation")
1038 || error_msg.contains("authentication")
1039 || error_msg.contains("authorization")
1040 || error_msg.contains("invalid")
1041 || error_msg.contains("bad request")
1042 {
1043 ErrorType::Permanent
1044 } else {
1045 ErrorType::Unknown
1046 }
1047}