rust_rabbit/consumer.rs
1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{ErrorType, MessageEnvelope, WireMessage},
5 retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9 options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10 types::{AMQPValue, FieldTable},
11 BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::Semaphore;
18use tracing::{debug, error, warn};
19
20/// Message wrapper with retry tracking
21#[derive(Debug)]
22pub struct Message<T>
23where
24 T: Clone,
25{
26 pub data: T,
27 pub retry_attempt: u32,
28 tag: u64,
29 channel: Arc<Channel>,
30}
31
32impl<T> Clone for Message<T>
33where
34 T: Clone,
35{
36 fn clone(&self) -> Self {
37 Self {
38 data: self.data.clone(),
39 retry_attempt: self.retry_attempt,
40 tag: self.tag,
41 channel: Arc::clone(&self.channel),
42 }
43 }
44}
45
46impl<T> Message<T>
47where
48 T: Clone,
49{
50 /// Acknowledge the message
51 pub async fn ack(&self) -> Result<(), RustRabbitError> {
52 self.channel
53 .basic_ack(self.tag, BasicAckOptions::default())
54 .await
55 .map_err(RustRabbitError::from)
56 }
57
58 /// Reject and requeue the message
59 pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
60 self.channel
61 .basic_nack(
62 self.tag,
63 lapin::options::BasicNackOptions {
64 multiple: false,
65 requeue,
66 },
67 )
68 .await
69 .map_err(RustRabbitError::from)
70 }
71}
72
73/// Consumer configuration builder
74pub struct ConsumerBuilder {
75 connection: Arc<Connection>,
76 queue_name: String,
77 exchange_name: Option<String>,
78 routing_key: Option<String>,
79 retry_config: Option<RetryConfig>,
80 prefetch_count: Option<u16>,
81 auto_ack: bool,
82}
83
84impl ConsumerBuilder {
85 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
86 Self {
87 connection,
88 queue_name: queue_name.into(),
89 exchange_name: None,
90 routing_key: None,
91 retry_config: None,
92 prefetch_count: Some(10),
93 auto_ack: true,
94 }
95 }
96
97 /// Bind to an exchange with routing key
98 pub fn bind_to_exchange(
99 mut self,
100 exchange: impl Into<String>,
101 routing_key: impl Into<String>,
102 ) -> Self {
103 self.exchange_name = Some(exchange.into());
104 self.routing_key = Some(routing_key.into());
105 self
106 }
107
108 /// Set routing key (for use with bind_to_exchange)
109 pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
110 self.routing_key = Some(routing_key.into());
111 self
112 }
113
114 /// Configure retry behavior
115 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
116 self.retry_config = Some(retry_config);
117 self
118 }
119
120 /// Set TTL for dead letter queue (auto-cleanup failed messages)
121 /// This is a convenience method that modifies the retry_config if it exists
122 ///
123 /// # Example
124 /// ```ignore
125 /// let consumer = Consumer::builder(connection, "orders")
126 /// .with_retry(RetryConfig::exponential_default())
127 /// .with_dlq_ttl(Duration::from_secs(86400)) // 1 day
128 /// .build();
129 /// ```
130 pub fn with_dlq_ttl(mut self, ttl: Duration) -> Self {
131 if let Some(retry_config) = self.retry_config.as_mut() {
132 retry_config.dlq_ttl = Some(ttl);
133 }
134 self
135 }
136
137 /// Set prefetch count
138 pub fn with_prefetch(mut self, count: u16) -> Self {
139 self.prefetch_count = Some(count);
140 self
141 }
142
143 /// Disable auto-acknowledge (manual ack required)
144 pub fn manual_ack(mut self) -> Self {
145 self.auto_ack = false;
146 self
147 }
148
149 /// Build the consumer
150 pub fn build(self) -> Consumer {
151 Consumer {
152 connection: self.connection,
153 queue_name: self.queue_name,
154 exchange_name: self.exchange_name,
155 routing_key: self.routing_key,
156 retry_config: self.retry_config,
157 prefetch_count: self.prefetch_count.unwrap_or(10),
158 auto_ack: self.auto_ack,
159 }
160 }
161}
162
163/// Simplified Consumer for message consumption
164pub struct Consumer {
165 connection: Arc<Connection>,
166 queue_name: String,
167 exchange_name: Option<String>,
168 routing_key: Option<String>,
169 retry_config: Option<RetryConfig>,
170 prefetch_count: u16,
171 auto_ack: bool,
172}
173
174impl Consumer {
175 /// Create a new consumer builder
176 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
177 ConsumerBuilder::new(connection, queue_name)
178 }
179
180 /// Create retry queue with TTL
181 async fn create_retry_queue(
182 &self,
183 channel: &Channel,
184 retry_attempt: u32,
185 delay: std::time::Duration,
186 ) -> Result<String, RustRabbitError> {
187 let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
188 let delay_ms = delay.as_millis() as i64;
189
190 // Create retry queue with TTL that routes back to original queue
191 let mut args = FieldTable::default();
192 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
193 args.insert(
194 "x-dead-letter-exchange".into(),
195 AMQPValue::LongString("".into()),
196 ); // Default exchange
197 args.insert(
198 "x-dead-letter-routing-key".into(),
199 AMQPValue::LongString(self.queue_name.clone().into()),
200 );
201
202 channel
203 .queue_declare(
204 &retry_queue_name,
205 QueueDeclareOptions {
206 durable: true,
207 ..Default::default()
208 },
209 args,
210 )
211 .await?;
212
213 debug!(
214 "Created retry queue: {} with TTL: {}ms",
215 retry_queue_name, delay_ms
216 );
217 Ok(retry_queue_name)
218 }
219
220 /// Create DLQ (Dead Letter Queue) with optional TTL
221 async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
222 let dlq_name = format!("{}.dlq", self.queue_name);
223
224 // Build queue arguments with optional TTL
225 let mut args = FieldTable::default();
226 if let Some(retry_config) = &self.retry_config {
227 if let Some(ttl) = &retry_config.dlq_ttl {
228 let ttl_ms = ttl.as_millis() as i64;
229 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(ttl_ms));
230 debug!("DLQ TTL: {}ms", ttl_ms);
231 }
232 }
233
234 channel
235 .queue_declare(
236 &dlq_name,
237 QueueDeclareOptions {
238 durable: true,
239 ..Default::default()
240 },
241 args,
242 )
243 .await?;
244
245 debug!("Created DLQ: {}", dlq_name);
246 Ok(dlq_name)
247 }
248
249 /// Send message to retry queue with delay
250 async fn send_to_retry_queue(
251 &self,
252 channel: &Channel,
253 message_data: &[u8],
254 retry_attempt: u32,
255 delay: std::time::Duration,
256 ) -> Result<(), RustRabbitError> {
257 let retry_queue_name = self
258 .create_retry_queue(channel, retry_attempt, delay)
259 .await?;
260
261 // Publish to retry queue
262 channel
263 .basic_publish(
264 "", // Default exchange
265 &retry_queue_name,
266 BasicPublishOptions::default(),
267 message_data,
268 BasicProperties::default()
269 .with_content_type("application/json".into())
270 .with_delivery_mode(2), // Persistent
271 )
272 .await?
273 .await?;
274
275 debug!("Sent message to retry queue: {}", retry_queue_name);
276 Ok(())
277 }
278
279 /// Send message to DLQ
280 async fn send_to_dlq_simple(
281 &self,
282 channel: &Channel,
283 message_data: &[u8],
284 ) -> Result<(), RustRabbitError> {
285 let dlq_name = self.create_dlq(channel).await?;
286
287 // Publish to DLQ
288 channel
289 .basic_publish(
290 "", // Default exchange
291 &dlq_name,
292 BasicPublishOptions::default(),
293 message_data,
294 BasicProperties::default()
295 .with_content_type("application/json".into())
296 .with_delivery_mode(2), // Persistent
297 )
298 .await?
299 .await?;
300
301 debug!("Sent message to DLQ: {}", dlq_name);
302 Ok(())
303 }
304
305 /// Create delay exchange using RabbitMQ delayed message exchange plugin
306 /// Requires rabbitmq_delayed_message_exchange plugin to be installed on RabbitMQ
307 async fn create_delay_exchange(&self, channel: &Channel) -> Result<String, RustRabbitError> {
308 if let Some(retry_config) = &self.retry_config {
309 let delay_exchange = retry_config.get_delay_exchange(&self.queue_name);
310
311 // Declare delay exchange with x-delayed-type argument
312 let mut args = FieldTable::default();
313 args.insert(
314 "x-delayed-type".into(),
315 AMQPValue::LongString("direct".into()),
316 );
317
318 channel
319 .exchange_declare(
320 &delay_exchange,
321 lapin::ExchangeKind::Custom("x-delayed-message".to_string()),
322 lapin::options::ExchangeDeclareOptions {
323 durable: true,
324 ..Default::default()
325 },
326 args,
327 )
328 .await?;
329
330 debug!(
331 "Created delay exchange: {} (x-delayed-message type)",
332 delay_exchange
333 );
334 Ok(delay_exchange)
335 } else {
336 Err(RustRabbitError::Retry(
337 "Retry config not configured".to_string(),
338 ))
339 }
340 }
341
342 /// Send message to delay exchange with x-delay header for retry
343 /// Message will be automatically routed back to the original queue after delay
344 async fn send_to_delay_exchange(
345 &self,
346 channel: &Channel,
347 message_data: &[u8],
348 delay: std::time::Duration,
349 ) -> Result<(), RustRabbitError> {
350 let delay_exchange = self.create_delay_exchange(channel).await?;
351 let delay_ms = delay.as_millis() as i64;
352
353 // Publish to delay exchange with x-delay header
354 // The message will be re-delivered to original queue after delay
355 channel
356 .basic_publish(
357 &delay_exchange,
358 &self.queue_name, // Routing key: original queue name
359 BasicPublishOptions::default(),
360 message_data,
361 BasicProperties::default()
362 .with_content_type("application/json".into())
363 .with_delivery_mode(2) // Persistent
364 .with_headers({
365 let mut headers = FieldTable::default();
366 headers.insert("x-delay".into(), AMQPValue::LongLongInt(delay_ms));
367 headers
368 }),
369 )
370 .await?
371 .await?;
372
373 debug!(
374 "Sent message to delay exchange: {} with delay: {}ms",
375 delay_exchange, delay_ms
376 );
377 Ok(())
378 }
379
380 /// Start consuming messages
381 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
382 where
383 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
384 H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
385 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
386 {
387 let channel = self.connection.create_channel().await?;
388
389 // Set prefetch count
390 channel
391 .basic_qos(
392 self.prefetch_count,
393 lapin::options::BasicQosOptions::default(),
394 )
395 .await?;
396
397 // Setup infrastructure (queues, exchanges)
398 self.setup_infrastructure(&channel).await?;
399
400 // Start consuming
401 let mut consumer = channel
402 .basic_consume(
403 &self.queue_name,
404 "",
405 BasicConsumeOptions::default(),
406 FieldTable::default(),
407 )
408 .await?;
409
410 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
411
412 debug!("Started consuming from queue: {}", self.queue_name);
413
414 // Process messages
415 while let Some(delivery_result) = consumer.next().await {
416 let delivery = delivery_result?;
417 let permit = semaphore.clone().acquire_owned().await.unwrap();
418 let handler_clone = handler.clone();
419 let auto_ack = self.auto_ack;
420 let channel_clone = Arc::new(channel.clone());
421 let retry_config = self.retry_config.clone();
422 let consumer_self = Consumer {
423 connection: self.connection.clone(),
424 queue_name: self.queue_name.clone(),
425 exchange_name: self.exchange_name.clone(),
426 routing_key: self.routing_key.clone(),
427 retry_config: self.retry_config.clone(),
428 prefetch_count: self.prefetch_count,
429 auto_ack: self.auto_ack,
430 };
431
432 tokio::spawn(async move {
433 let _permit = permit;
434
435 // Deserialize as WireMessage format
436 match serde_json::from_slice::<crate::message::WireMessage<T>>(&delivery.data) {
437 Ok(wire_msg) => {
438 let message = Message {
439 data: wire_msg.data,
440 retry_attempt: wire_msg.retry_attempt,
441 tag: delivery.delivery_tag,
442 channel: channel_clone.clone(),
443 };
444
445 // Process message
446 match handler_clone(message.clone()).await {
447 Ok(()) => {
448 if auto_ack {
449 if let Err(e) = message.ack().await {
450 error!("Failed to ack message: {}", e);
451 }
452 }
453 debug!("Message processed successfully");
454 }
455 Err(e) => {
456 error!("Handler error: {}", e);
457 if auto_ack {
458 // Check if retry is configured
459 if let Some(retry_cfg) = &retry_config {
460 if message.retry_attempt < retry_cfg.max_retries {
461 // Calculate delay for next retry
462 if let Some(delay) =
463 retry_cfg.calculate_delay(message.retry_attempt)
464 {
465 warn!(
466 "Scheduling retry {} with delay {:?} for message",
467 message.retry_attempt + 1,
468 delay
469 );
470
471 // Update retry attempt in wire message
472 let wire_msg = WireMessage {
473 data: message.data.clone(),
474 retry_attempt: message.retry_attempt + 1,
475 };
476
477 let retry_payload =
478 match serde_json::to_vec(&wire_msg) {
479 Ok(payload) => payload,
480 Err(e) => {
481 error!(
482 "Failed to serialize retry message: {}",
483 e
484 );
485 if let Err(e) =
486 message.nack(false).await
487 {
488 error!(
489 "Failed to nack message: {}",
490 e
491 );
492 }
493 return;
494 }
495 };
496
497 // Send via appropriate strategy (TTL or DelayedExchange)
498 let send_result = if matches!(
499 retry_cfg.delay_strategy,
500 crate::retry::DelayStrategy::DelayedExchange
501 ) {
502 consumer_self
503 .send_to_delay_exchange(
504 &channel_clone,
505 &retry_payload,
506 delay,
507 )
508 .await
509 } else {
510 consumer_self
511 .send_to_retry_queue(
512 &channel_clone,
513 &retry_payload,
514 message.retry_attempt + 1,
515 delay,
516 )
517 .await
518 };
519
520 if let Err(e) = send_result {
521 error!("Failed to send retry message: {}", e);
522 if let Err(e) = message.nack(false).await {
523 error!("Failed to nack message: {}", e);
524 }
525 return;
526 }
527
528 // ACK original message (it's now queued for retry)
529 if let Err(e) = message.ack().await {
530 error!(
531 "Failed to ack message after retry: {}",
532 e
533 );
534 }
535 } else {
536 // No more retries, send to DLQ
537 warn!("Retry exhausted, sending to DLQ");
538 if let Err(e) = consumer_self
539 .send_to_dlq_simple(
540 &channel_clone,
541 &delivery.data,
542 )
543 .await
544 {
545 error!("Failed to send to DLQ: {}", e);
546 }
547 if let Err(e) = message.ack().await {
548 error!(
549 "Failed to ack message after DLQ: {}",
550 e
551 );
552 }
553 }
554 } else {
555 // Retry exhausted, send to DLQ
556 warn!("Max retries reached, sending to DLQ");
557 if let Err(e) = consumer_self
558 .send_to_dlq_simple(&channel_clone, &delivery.data)
559 .await
560 {
561 error!("Failed to send to DLQ: {}", e);
562 }
563 if let Err(e) = message.ack().await {
564 error!("Failed to ack message after DLQ: {}", e);
565 }
566 }
567 } else {
568 // No retry config, just nack
569 if let Err(e) = message.nack(false).await {
570 error!("Failed to nack message: {}", e);
571 }
572 }
573 }
574 }
575 }
576 }
577 Err(e) => {
578 error!("Failed to deserialize message: {}", e);
579 if auto_ack {
580 // Reject malformed messages
581 if let Err(e) = channel_clone
582 .basic_nack(
583 delivery.delivery_tag,
584 lapin::options::BasicNackOptions {
585 multiple: false,
586 requeue: false,
587 },
588 )
589 .await
590 {
591 error!("Failed to nack malformed message: {}", e);
592 }
593 }
594 }
595 }
596 });
597 }
598
599 Ok(())
600 }
601
602 /// Setup queue and exchange infrastructure
603 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
604 // Declare queue
605 channel
606 .queue_declare(
607 &self.queue_name,
608 QueueDeclareOptions {
609 durable: true,
610 ..Default::default()
611 },
612 FieldTable::default(),
613 )
614 .await?;
615
616 // Bind to exchange if specified
617 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
618 channel
619 .queue_bind(
620 &self.queue_name,
621 exchange,
622 routing_key,
623 lapin::options::QueueBindOptions::default(),
624 FieldTable::default(),
625 )
626 .await?;
627 }
628
629 // Setup delay exchange if using DelayedExchange strategy
630 if let Some(retry_config) = &self.retry_config {
631 if matches!(
632 retry_config.delay_strategy,
633 crate::retry::DelayStrategy::DelayedExchange
634 ) {
635 let delay_exchange = self.create_delay_exchange(channel).await?;
636
637 // Bind delay exchange to original queue
638 channel
639 .queue_bind(
640 &self.queue_name,
641 &delay_exchange,
642 &self.queue_name, // Routing key: original queue name
643 lapin::options::QueueBindOptions::default(),
644 FieldTable::default(),
645 )
646 .await?;
647
648 debug!(
649 "Bound queue {} to delay exchange {}",
650 self.queue_name, delay_exchange
651 );
652 }
653 }
654
655 Ok(())
656 }
657
658 /// Start consuming message envelopes with full retry support
659 pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
660 where
661 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
662 H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
663 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
664 {
665 let channel = self.connection.create_channel().await?;
666 let retry_config = self.retry_config.clone();
667
668 // Set prefetch count
669 channel
670 .basic_qos(
671 self.prefetch_count,
672 lapin::options::BasicQosOptions::default(),
673 )
674 .await?;
675
676 // Setup queue and exchange
677 self.setup_infrastructure(&channel).await?;
678
679 // Create consumer
680 let mut consumer = channel
681 .basic_consume(
682 &self.queue_name,
683 "rust-rabbit-envelope-consumer",
684 BasicConsumeOptions::default(),
685 FieldTable::default(),
686 )
687 .await?;
688
689 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
690
691 debug!(
692 "Started consuming envelopes from queue: {}",
693 self.queue_name
694 );
695
696 // Process message envelopes with retry support
697 while let Some(delivery_result) = consumer.next().await {
698 let delivery = delivery_result?;
699 let permit = semaphore.clone().acquire_owned().await.unwrap();
700 let handler_clone = handler.clone();
701 let auto_ack = self.auto_ack;
702 let channel_clone = Arc::new(channel.clone());
703 let retry_config_clone = retry_config.clone();
704 let queue_name = self.queue_name.clone();
705 let connection = self.connection.clone();
706
707 tokio::spawn(async move {
708 let _permit = permit;
709
710 // Try to deserialize as MessageEnvelope
711 match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
712 Ok(mut envelope) => {
713 debug!(
714 "Processing envelope {} (attempt {}/{})",
715 envelope.metadata.message_id,
716 envelope.metadata.retry_attempt + 1,
717 envelope.metadata.max_retries + 1
718 );
719
720 // Process message
721 match handler_clone(envelope.clone()).await {
722 Ok(()) => {
723 if auto_ack {
724 if let Err(e) = channel_clone
725 .basic_ack(
726 delivery.delivery_tag,
727 BasicAckOptions::default(),
728 )
729 .await
730 {
731 error!("Failed to ack message: {}", e);
732 }
733 }
734 debug!(
735 "Envelope {} processed successfully",
736 envelope.metadata.message_id
737 );
738 }
739 Err(e) => {
740 error!(
741 "Handler error for envelope {}: {}",
742 envelope.metadata.message_id, e
743 );
744
745 // Determine error type (simplified classification)
746 let error_type = classify_error(e.as_ref());
747
748 // Add error to envelope
749 envelope = envelope.with_error(
750 &e.to_string(),
751 error_type,
752 Some(&format!("Queue: {}", queue_name)),
753 );
754
755 if auto_ack {
756 // Check if we should retry
757 if let Some(retry_cfg) = &retry_config_clone {
758 if !envelope.is_retry_exhausted() {
759 // Calculate delay and schedule retry
760 if let Some(delay) = retry_cfg
761 .calculate_delay(envelope.metadata.retry_attempt)
762 {
763 warn!(
764 "Scheduling retry {} for envelope {} with delay {:?}",
765 envelope.metadata.retry_attempt + 1,
766 envelope.metadata.message_id,
767 delay
768 );
769
770 // Increment retry attempt in envelope
771 envelope.metadata.retry_attempt += 1;
772
773 // Serialize updated envelope
774 match serde_json::to_vec(&envelope) {
775 Ok(retry_payload) => {
776 // Create consumer instance for access to methods
777 let consumer_self = Consumer {
778 connection: connection.clone(),
779 queue_name: queue_name.clone(),
780 exchange_name: None,
781 routing_key: None,
782 retry_config: retry_config_clone
783 .clone(),
784 prefetch_count: 10,
785 auto_ack: true,
786 };
787
788 // Send via appropriate strategy (TTL or DelayedExchange)
789 let send_result = if matches!(
790 retry_config_clone
791 .as_ref()
792 .map(|c| c.delay_strategy),
793 Some(crate::retry::DelayStrategy::DelayedExchange)
794 ) {
795 consumer_self
796 .send_to_delay_exchange(
797 &channel_clone,
798 &retry_payload,
799 delay,
800 )
801 .await
802 } else {
803 consumer_self
804 .send_to_retry_queue(
805 &channel_clone,
806 &retry_payload,
807 envelope.metadata.retry_attempt,
808 delay,
809 )
810 .await
811 };
812
813 if let Err(e) = send_result {
814 error!("Failed to send envelope for retry: {}", e);
815 // Fallback to simple nack
816 if let Err(e) = channel_clone
817 .basic_nack(
818 delivery.delivery_tag,
819 lapin::options::BasicNackOptions {
820 multiple: false,
821 requeue: false,
822 },
823 )
824 .await
825 {
826 error!("Failed to nack message: {}", e);
827 }
828 return;
829 }
830
831 // ACK original message (it's now queued for retry)
832 if let Err(e) = channel_clone
833 .basic_ack(
834 delivery.delivery_tag,
835 BasicAckOptions::default(),
836 )
837 .await
838 {
839 error!("Failed to ack message after retry: {}", e);
840 }
841 }
842 Err(e) => {
843 error!("Failed to serialize envelope for retry: {}", e);
844 // Fallback to simple nack
845 if let Err(e) = channel_clone
846 .basic_nack(
847 delivery.delivery_tag,
848 lapin::options::BasicNackOptions {
849 multiple: false,
850 requeue: false,
851 },
852 )
853 .await
854 {
855 error!("Failed to nack message: {}", e);
856 }
857 }
858 }
859 } else {
860 // No more retries, send to DLQ
861 Self::send_to_dlq(
862 &envelope,
863 retry_cfg,
864 &connection,
865 &queue_name,
866 )
867 .await;
868
869 // ACK original message
870 if let Err(e) = channel_clone
871 .basic_ack(
872 delivery.delivery_tag,
873 BasicAckOptions::default(),
874 )
875 .await
876 {
877 error!(
878 "Failed to ack message after DLQ: {}",
879 e
880 );
881 }
882 }
883 } else {
884 // Retry exhausted, send to DLQ
885 warn!(
886 "Retry exhausted for envelope {}",
887 envelope.metadata.message_id
888 );
889 Self::send_to_dlq(
890 &envelope,
891 retry_cfg,
892 &connection,
893 &queue_name,
894 )
895 .await;
896
897 // ACK original message
898 if let Err(e) = channel_clone
899 .basic_ack(
900 delivery.delivery_tag,
901 BasicAckOptions::default(),
902 )
903 .await
904 {
905 error!("Failed to ack message after DLQ: {}", e);
906 }
907 }
908 } else {
909 // No retry config, just nack
910 if let Err(e) = channel_clone
911 .basic_nack(
912 delivery.delivery_tag,
913 lapin::options::BasicNackOptions {
914 multiple: false,
915 requeue: false,
916 },
917 )
918 .await
919 {
920 error!("Failed to nack message: {}", e);
921 }
922 }
923 }
924 }
925 }
926 }
927 Err(e) => {
928 error!("Failed to deserialize message envelope: {}", e);
929 if auto_ack {
930 // Reject malformed messages
931 if let Err(e) = channel_clone
932 .basic_nack(
933 delivery.delivery_tag,
934 lapin::options::BasicNackOptions {
935 multiple: false,
936 requeue: false,
937 },
938 )
939 .await
940 {
941 error!("Failed to nack malformed envelope: {}", e);
942 }
943 }
944 }
945 }
946 });
947 }
948
949 Ok(())
950 }
951
952 /// Send failed message to Dead Letter Queue
953 async fn send_to_dlq<T>(
954 envelope: &MessageEnvelope<T>,
955 retry_config: &RetryConfig,
956 connection: &Arc<Connection>,
957 queue_name: &str,
958 ) where
959 T: serde::Serialize,
960 {
961 match connection.create_channel().await {
962 Ok(dlq_channel) => {
963 let dlq_name = retry_config.get_dead_letter_queue(queue_name);
964
965 // Declare DLQ
966 if let Err(e) = dlq_channel
967 .queue_declare(
968 &dlq_name,
969 QueueDeclareOptions {
970 durable: true,
971 ..Default::default()
972 },
973 FieldTable::default(),
974 )
975 .await
976 {
977 error!("Failed to declare DLQ {}: {}", dlq_name, e);
978 return;
979 }
980
981 // Publish to DLQ with failure summary
982 let failure_summary = envelope.get_failure_summary();
983 let dlq_payload = serde_json::json!({
984 "envelope": envelope,
985 "failure_summary": failure_summary,
986 "sent_to_dlq_at": chrono::Utc::now(),
987 });
988
989 if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
990 if let Err(e) = dlq_channel
991 .basic_publish(
992 "",
993 &dlq_name,
994 lapin::options::BasicPublishOptions::default(),
995 &payload_bytes,
996 lapin::BasicProperties::default(),
997 )
998 .await
999 {
1000 error!("Failed to publish to DLQ {}: {}", dlq_name, e);
1001 } else {
1002 warn!(
1003 "Sent envelope {} to DLQ: {}",
1004 envelope.metadata.message_id, failure_summary
1005 );
1006 }
1007 }
1008 }
1009 Err(e) => {
1010 error!("Failed to create DLQ channel: {}", e);
1011 }
1012 }
1013 }
1014}
1015
1016/// Classify error type based on error message (simplified heuristics)
1017fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
1018 let error_msg = error.to_string().to_lowercase();
1019
1020 if error_msg.contains("timeout")
1021 || error_msg.contains("connection")
1022 || error_msg.contains("network")
1023 || error_msg.contains("temporary")
1024 {
1025 ErrorType::Transient
1026 } else if error_msg.contains("rate limit")
1027 || error_msg.contains("quota")
1028 || error_msg.contains("resource")
1029 {
1030 ErrorType::Resource
1031 } else if error_msg.contains("validation")
1032 || error_msg.contains("authentication")
1033 || error_msg.contains("authorization")
1034 || error_msg.contains("invalid")
1035 || error_msg.contains("bad request")
1036 {
1037 ErrorType::Permanent
1038 } else {
1039 ErrorType::Unknown
1040 }
1041}