1use crate::{
2 connection::ConnectionManager,
3 error::{ProcessingError, RabbitError, Result},
4 metrics::RustRabbitMetrics,
5 publisher::{CustomExchangeDeclareOptions, CustomQueueDeclareOptions, Publisher},
6 retry::{DelayedMessageExchange, RetryPolicy},
7};
8use async_trait::async_trait;
9use futures::StreamExt;
10use lapin::{
11 message::Delivery,
12 options::{
13 BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
14 BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions,
15 QueueDeclareOptions as LapinQueueDeclareOptions,
16 },
17 types::FieldTable,
18 BasicProperties, Channel, ExchangeKind,
19};
20use serde::de::DeserializeOwned;
21use std::sync::Arc;
22use tokio::sync::Semaphore;
23use tracing::{debug, error, info, warn};
24
25#[async_trait]
32pub trait BaseConsumer<T>: Send + Sync + 'static
33where
34 T: DeserializeOwned + Send + Sync,
35{
36 async fn handle(
43 &self,
44 message: T,
45 context: MessageContext,
46 ) -> std::result::Result<(), ProcessingError>;
47}
48
49#[async_trait]
51pub trait MessageHandler<T>: Send + Sync + 'static
52where
53 T: DeserializeOwned + Send + Sync,
54{
55 async fn handle(&self, message: T, context: MessageContext) -> MessageResult;
57}
58
59#[derive(Debug, Clone)]
61pub struct MessageContext {
62 pub message_id: Option<String>,
63 pub correlation_id: Option<String>,
64 pub reply_to: Option<String>,
65 pub delivery_tag: u64,
66 pub redelivered: bool,
67 pub exchange: String,
68 pub routing_key: String,
69 pub headers: FieldTable,
70 pub timestamp: Option<u64>,
71 pub retry_count: u32,
72}
73
74#[derive(Debug)]
76pub enum MessageResult {
77 Ack,
79 Retry,
81 Reject,
83 Requeue,
85}
86
87#[derive(Debug, Clone)]
89pub struct ConsumerOptions {
90 pub queue_name: String,
92
93 pub consumer_tag: Option<String>,
95
96 pub concurrency: usize,
98
99 pub prefetch_count: Option<u16>,
101
102 pub auto_declare_queue: bool,
104
105 pub queue_options: CustomQueueDeclareOptions,
107
108 pub auto_declare_exchange: bool,
110
111 pub exchange_name: Option<String>,
113
114 pub exchange_options: CustomExchangeDeclareOptions,
116
117 pub routing_key: Option<String>,
119
120 pub retry_policy: Option<RetryPolicy>,
122
123 pub dead_letter_exchange: Option<String>,
125
126 pub auto_ack: bool,
128
129 pub exclusive: bool,
131
132 pub arguments: FieldTable,
134}
135
136impl ConsumerOptions {
137 pub fn builder<S: Into<String>>(queue_name: S) -> ConsumerOptionsBuilder {
139 ConsumerOptionsBuilder::new(queue_name.into())
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct ConsumerOptionsBuilder {
146 queue_name: String,
147 consumer_tag: Option<String>,
148 concurrency: usize,
149 prefetch_count: Option<u16>,
150 auto_declare_queue: bool,
151 queue_options: CustomQueueDeclareOptions,
152 auto_declare_exchange: bool,
153 exchange_name: Option<String>,
154 exchange_options: CustomExchangeDeclareOptions,
155 routing_key: Option<String>,
156 retry_policy: Option<RetryPolicy>,
157 dead_letter_exchange: Option<String>,
158 auto_ack: bool,
159 exclusive: bool,
160 arguments: FieldTable,
161}
162
163impl ConsumerOptionsBuilder {
164 pub fn new(queue_name: String) -> Self {
166 Self {
167 queue_name,
168 consumer_tag: None,
169 concurrency: 1,
170 prefetch_count: Some(10),
171 auto_declare_queue: false,
172 queue_options: CustomQueueDeclareOptions::default(),
173 auto_declare_exchange: false,
174 exchange_name: None,
175 exchange_options: CustomExchangeDeclareOptions::default(),
176 routing_key: None,
177 retry_policy: None,
178 dead_letter_exchange: None,
179 auto_ack: false,
180 exclusive: false,
181 arguments: FieldTable::default(),
182 }
183 }
184
185 pub fn consumer_tag<S: Into<String>>(mut self, tag: S) -> Self {
187 self.consumer_tag = Some(tag.into());
188 self
189 }
190
191 pub fn concurrency(mut self, concurrency: usize) -> Self {
193 self.concurrency = concurrency;
194 self
195 }
196
197 pub fn prefetch_count(mut self, count: u16) -> Self {
199 self.prefetch_count = Some(count);
200 self
201 }
202
203 pub fn no_prefetch_limit(mut self) -> Self {
205 self.prefetch_count = None;
206 self
207 }
208
209 pub fn auto_declare_queue(mut self) -> Self {
211 self.auto_declare_queue = true;
212 self
213 }
214
215 pub fn auto_declare_exchange(mut self) -> Self {
217 self.auto_declare_exchange = true;
218 self
219 }
220
221 pub fn exchange_name<S: Into<String>>(mut self, name: S) -> Self {
223 self.exchange_name = Some(name.into());
224 self
225 }
226
227 pub fn exchange_options(mut self, options: CustomExchangeDeclareOptions) -> Self {
229 self.exchange_options = options;
230 self
231 }
232
233 pub fn routing_key<S: Into<String>>(mut self, key: S) -> Self {
235 self.routing_key = Some(key.into());
236 self
237 }
238
239 pub fn queue_options(mut self, options: CustomQueueDeclareOptions) -> Self {
241 self.queue_options = options;
242 self
243 }
244
245 pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
247 self.retry_policy = Some(policy);
248 self
249 }
250
251 pub fn dead_letter_exchange<S: Into<String>>(mut self, exchange: S) -> Self {
253 self.dead_letter_exchange = Some(exchange.into());
254 self
255 }
256
257 pub fn auto_ack(mut self) -> Self {
259 self.auto_ack = true;
260 self
261 }
262
263 pub fn manual_ack(mut self) -> Self {
265 self.auto_ack = false;
266 self
267 }
268
269 pub fn exclusive(mut self) -> Self {
271 self.exclusive = true;
272 self
273 }
274
275 pub fn high_throughput(mut self) -> Self {
277 self.concurrency = 20;
278 self.prefetch_count = Some(50);
279 self.auto_ack = false;
280 self
281 }
282
283 pub fn reliable(mut self) -> Self {
285 self.concurrency = 1;
286 self.prefetch_count = Some(1);
287 self.auto_ack = false;
288 self
289 }
290
291 pub fn development(mut self) -> Self {
293 self.concurrency = 1;
294 self.prefetch_count = Some(1);
295 self.auto_ack = true;
296 self.auto_declare_queue = true;
297 self.auto_declare_exchange = true; self
299 }
300
301 pub fn minutes_retry(mut self) -> Self {
308 let queue_name = self.queue_name.clone();
309
310 self.auto_declare_queue = true;
311 self.auto_declare_exchange = true;
312 self.retry_policy = Some(RetryPolicy::minutes_exponential_for_queue(&queue_name));
313 self.concurrency = 1; self.prefetch_count = Some(1); self.auto_ack = false; self
317 }
318
319 pub fn build(self) -> ConsumerOptions {
321 ConsumerOptions {
322 queue_name: self.queue_name,
323 consumer_tag: self.consumer_tag,
324 concurrency: self.concurrency,
325 prefetch_count: self.prefetch_count,
326 auto_declare_queue: self.auto_declare_queue,
327 queue_options: self.queue_options,
328 auto_declare_exchange: self.auto_declare_exchange,
329 exchange_name: self.exchange_name,
330 exchange_options: self.exchange_options,
331 routing_key: self.routing_key,
332 retry_policy: self.retry_policy,
333 dead_letter_exchange: self.dead_letter_exchange,
334 auto_ack: self.auto_ack,
335 exclusive: self.exclusive,
336 arguments: self.arguments,
337 }
338 }
339}
340
341impl Default for ConsumerOptions {
342 fn default() -> Self {
343 Self {
344 queue_name: String::new(),
345 consumer_tag: None,
346 concurrency: 1,
347 prefetch_count: Some(10),
348 auto_declare_queue: false,
349 queue_options: CustomQueueDeclareOptions::default(),
350 auto_declare_exchange: false,
351 exchange_name: None,
352 exchange_options: CustomExchangeDeclareOptions::default(),
353 routing_key: None,
354 retry_policy: None,
355 dead_letter_exchange: None,
356 auto_ack: false,
357 exclusive: false,
358 arguments: FieldTable::default(),
359 }
360 }
361}
362
363pub struct Consumer {
365 #[allow(dead_code)] connection_manager: ConnectionManager,
367 options: ConsumerOptions,
368 channel: Channel,
369 semaphore: Arc<Semaphore>,
370 metrics: Option<RustRabbitMetrics>,
371 publisher: Publisher,
372}
373
374impl Consumer {
375 pub async fn new(
377 connection_manager: ConnectionManager,
378 options: ConsumerOptions,
379 ) -> Result<Self> {
380 let connection = connection_manager.get_connection().await?;
381 let channel = connection.create_channel().await?;
382
383 if let Some(prefetch_count) = options.prefetch_count {
385 debug!("Setting prefetch_count: {}", prefetch_count);
386 channel
387 .basic_qos(
388 prefetch_count,
389 lapin::options::BasicQosOptions { global: false },
390 )
391 .await
392 .map_err(|e| {
393 error!("Failed to set QoS prefetch_count={}: {}", prefetch_count, e);
394 RabbitError::Connection(e)
395 })?;
396 debug!("Successfully set prefetch_count: {}", prefetch_count);
397 }
398
399 if options.auto_declare_queue {
401 Self::declare_queue_and_exchange(&channel, &options).await?;
402 }
403
404 let semaphore = Arc::new(Semaphore::new(options.concurrency));
405
406 if options.retry_policy.is_some() {
408 Self::setup_retry_infrastructure(&connection_manager, &options).await?;
409 }
410
411 let publisher = Publisher::new(connection_manager.clone());
412
413 Ok(Self {
414 connection_manager,
415 options,
416 channel,
417 semaphore,
418 metrics: None,
419 publisher,
420 })
421 }
422
423 pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
425 self.metrics = Some(metrics);
426 }
427
428 pub async fn consume_with_base_consumer<T, H>(&self, handler: Arc<H>) -> Result<()>
430 where
431 T: DeserializeOwned + Send + Sync + 'static,
432 H: BaseConsumer<T>,
433 {
434 let connection = self.connection_manager.get_connection().await?;
435 let channel = connection.create_channel().await?;
436 let publisher = Publisher::new(self.connection_manager.clone());
437
438 if let Some(prefetch_count) = self.options.prefetch_count {
440 channel
441 .basic_qos(prefetch_count, BasicQosOptions::default())
442 .await?;
443 }
444
445 let semaphore = Arc::new(Semaphore::new(self.options.concurrency));
446
447 let mut consumer = channel
449 .basic_consume(
450 &self.options.queue_name,
451 self.options.consumer_tag.as_deref().unwrap_or(""),
452 BasicConsumeOptions {
453 no_local: false,
454 no_ack: self.options.auto_ack,
455 exclusive: self.options.exclusive,
456 nowait: false,
457 },
458 self.options.arguments.clone(),
459 )
460 .await?;
461
462 info!(
463 "Started consuming from queue: {} with BaseConsumer",
464 self.options.queue_name
465 );
466
467 while let Some(delivery) = consumer.next().await {
469 let delivery = delivery?;
470 let permit = semaphore.clone().acquire_owned().await.map_err(|e| {
471 RabbitError::Generic(anyhow::anyhow!("Semaphore acquire error: {}", e))
472 })?;
473
474 let handler_clone = handler.clone();
475 let retry_policy = self.options.retry_policy.clone();
476 let dead_letter_exchange = self.options.dead_letter_exchange.clone();
477 let channel_clone = channel.clone();
478 let publisher_clone = publisher.clone();
479 let exchange_name = self
480 .options
481 .exchange_name
482 .clone()
483 .unwrap_or_else(|| self.options.queue_name.clone());
484
485 tokio::spawn(async move {
486 let _permit = permit;
487 if let Err(e) = Self::process_message_with_base_consumer(
488 delivery,
489 handler_clone,
490 retry_policy,
491 dead_letter_exchange,
492 channel_clone,
493 publisher_clone,
494 exchange_name,
495 )
496 .await
497 {
498 error!("Error processing message with BaseConsumer: {}", e);
499 }
500 });
501 }
502
503 Ok(())
504 }
505
506 async fn process_message_with_base_consumer<T, H>(
508 delivery: Delivery,
509 handler: Arc<H>,
510 retry_policy: Option<RetryPolicy>,
511 dead_letter_exchange: Option<String>,
512 channel: Channel,
513 publisher: Publisher,
514 exchange_name: String,
515 ) -> Result<()>
516 where
517 T: DeserializeOwned + Send + Sync,
518 H: BaseConsumer<T>,
519 {
520 let context = Self::build_message_context(&delivery);
521
522 let message: T = match serde_json::from_slice(&delivery.data) {
524 Ok(msg) => msg,
525 Err(e) => {
526 error!("Failed to deserialize message: {}", e);
527 Self::reject_message(&delivery, &channel, false).await?;
528 return Ok(());
529 }
530 };
531
532 match handler.handle(message, context.clone()).await {
534 Ok(()) => {
535 Self::ack_message(&delivery, &channel).await?;
537 debug!(
538 "Message processed successfully and ACK'd: {}",
539 delivery.delivery_tag
540 );
541 }
542 Err(ProcessingError::Retryable {
543 message: error_msg,
544 custom_delay,
545 }) => {
546 if let Some(ref policy) = retry_policy {
548 info!("Retryable error occurred: {}. Scheduling retry.", error_msg);
549
550 let delay = custom_delay
552 .unwrap_or_else(|| policy.calculate_delay(context.retry_count + 1));
553
554 Self::handle_retry_with_delay(
555 &delivery,
556 &channel,
557 &context,
558 policy,
559 &publisher,
560 &exchange_name,
561 delay,
562 )
563 .await?;
564 } else {
565 warn!(
566 "Retryable error but no retry policy configured. Rejecting message: {}",
567 error_msg
568 );
569 Self::reject_message(&delivery, &channel, false).await?;
570 }
571 }
572 Err(ProcessingError::NonRetryable {
573 message: error_msg,
574 send_to_dlq,
575 }) => {
576 error!("Non-retryable error occurred: {}", error_msg);
578
579 if send_to_dlq {
580 if let Some(ref dle) = dead_letter_exchange {
581 Self::send_to_dead_letter(&delivery, dle, &context, &publisher).await?;
582 } else {
583 warn!("Error should go to DLQ but no dead letter exchange configured. Rejecting message.");
584 Self::reject_message(&delivery, &channel, false).await?;
585 }
586 } else {
587 info!(
589 "Discarding message due to non-retryable error: {}",
590 error_msg
591 );
592 Self::reject_message(&delivery, &channel, false).await?;
593 }
594 }
595 }
596
597 Ok(())
598 }
599
600 async fn handle_retry_with_delay(
602 delivery: &Delivery,
603 channel: &Channel,
604 context: &MessageContext,
605 retry_policy: &RetryPolicy,
606 publisher: &Publisher,
607 exchange_name: &str,
608 delay: std::time::Duration,
609 ) -> Result<()> {
610 let max_retries = retry_policy.max_retries;
611 let current_retry = context.retry_count;
612
613 if current_retry >= max_retries {
614 warn!(
615 "Max retries ({}) exceeded for message, sending to dead letter",
616 max_retries
617 );
618
619 if let Some(dlx) = &retry_policy.dead_letter_exchange {
621 Self::send_to_dead_letter(delivery, dlx, context, publisher).await?;
622 } else {
623 Self::reject_message(delivery, channel, false).await?;
624 }
625 return Ok(());
626 }
627
628 let delayed_exchange_name = format!("{}.retry", exchange_name);
630
631 let mut headers = delivery.properties.headers().clone().unwrap_or_default();
633 headers.insert(
634 "x-retry-count".into(),
635 lapin::types::AMQPValue::LongInt((current_retry + 1) as i32),
636 );
637 headers.insert(
638 "x-original-exchange".into(),
639 lapin::types::AMQPValue::LongString(exchange_name.into()),
640 );
641 headers.insert(
642 "x-original-routing-key".into(),
643 lapin::types::AMQPValue::LongString(delivery.routing_key.to_string().into()),
644 );
645
646 headers.insert(
648 "x-delay".into(),
649 lapin::types::AMQPValue::LongInt(delay.as_millis() as i32),
650 );
651
652 let properties = BasicProperties::default()
653 .with_content_type("application/json".into())
654 .with_delivery_mode(2)
655 .with_headers(headers);
656
657 let connection = publisher.get_connection().await?;
659 let retry_channel = connection.create_channel().await?;
660
661 retry_channel
662 .basic_publish(
663 &delayed_exchange_name,
664 delivery.routing_key.as_str(), BasicPublishOptions::default(),
666 &delivery.data,
667 properties,
668 )
669 .await?;
670
671 Self::ack_message(delivery, channel).await?;
673
674 info!(
675 "Message scheduled for retry #{} with delay {:?}ms",
676 current_retry + 1,
677 delay.as_millis()
678 );
679
680 Ok(())
681 }
682
683 pub async fn consume<T, H>(&self, handler: Arc<H>) -> Result<()>
685 where
686 T: DeserializeOwned + Send + Sync + 'static,
687 H: MessageHandler<T>,
688 {
689 let consumer_tag = self
690 .options
691 .consumer_tag
692 .clone()
693 .unwrap_or_else(|| format!("rust-rabbit-{}", uuid::Uuid::new_v4()));
694
695 let consume_options = BasicConsumeOptions {
696 no_local: false,
697 no_ack: self.options.auto_ack,
698 exclusive: self.options.exclusive,
699 nowait: false,
700 };
701
702 let mut consumer = self
703 .channel
704 .basic_consume(
705 &self.options.queue_name,
706 &consumer_tag,
707 consume_options,
708 self.options.arguments.clone(),
709 )
710 .await?;
711
712 info!(
713 "Started consuming from queue: {} with tag: {}",
714 self.options.queue_name, consumer_tag
715 );
716
717 while let Some(delivery) = consumer.next().await {
718 let delivery = delivery?;
719 let permit = self
720 .semaphore
721 .clone()
722 .acquire_owned()
723 .await
724 .map_err(|e| RabbitError::Generic(e.into()))?;
725
726 let handler = handler.clone();
727 let retry_policy = self.options.retry_policy.clone();
728 let dead_letter_exchange = self.options.dead_letter_exchange.clone();
729 let channel = self.channel.clone();
730 let publisher = self.publisher.clone();
731 let exchange_name = self
732 .options
733 .exchange_name
734 .clone()
735 .unwrap_or_else(|| self.options.queue_name.clone());
736
737 tokio::spawn(async move {
739 let _permit = permit; if let Err(e) = Self::process_message::<T, H>(
742 delivery,
743 handler,
744 retry_policy,
745 dead_letter_exchange,
746 channel,
747 publisher,
748 exchange_name,
749 )
750 .await
751 {
752 error!("Error processing message: {}", e);
753 }
754 });
755 }
756
757 warn!(
758 "Consumer stream ended for queue: {}",
759 self.options.queue_name
760 );
761 Ok(())
762 }
763
764 async fn process_message<T, H>(
766 delivery: Delivery,
767 handler: Arc<H>,
768 retry_policy: Option<RetryPolicy>,
769 dead_letter_exchange: Option<String>,
770 channel: Channel,
771 publisher: Publisher,
772 exchange_name: String,
773 ) -> Result<()>
774 where
775 T: DeserializeOwned + Send + Sync,
776 H: MessageHandler<T>,
777 {
778 let context = Self::build_message_context(&delivery);
779
780 let message: T = match serde_json::from_slice(&delivery.data) {
782 Ok(msg) => msg,
783 Err(e) => {
784 error!("Failed to deserialize message: {}", e);
785 Self::reject_message(&delivery, &channel, false).await?;
786 return Ok(());
787 }
788 };
789
790 let result = handler.handle(message, context.clone()).await;
792
793 match result {
794 MessageResult::Ack => {
795 Self::ack_message(&delivery, &channel).await?;
796 debug!("Message acknowledged: {}", delivery.delivery_tag);
797 }
798 MessageResult::Retry => {
799 if let Some(ref policy) = retry_policy {
800 Self::handle_retry(
801 &delivery,
802 &channel,
803 &context,
804 policy,
805 &publisher,
806 &exchange_name,
807 )
808 .await?;
809 } else {
810 Self::reject_message(&delivery, &channel, true).await?;
811 }
812 }
813 MessageResult::Reject => {
814 if let Some(ref dle) = dead_letter_exchange {
815 Self::send_to_dead_letter(&delivery, dle, &context, &publisher).await?;
816 } else {
817 Self::reject_message(&delivery, &channel, false).await?;
818 }
819 }
820 MessageResult::Requeue => {
821 Self::reject_message(&delivery, &channel, true).await?;
822 }
823 }
824
825 Ok(())
826 }
827
828 fn build_message_context(delivery: &Delivery) -> MessageContext {
830 let properties = &delivery.properties;
831
832 MessageContext {
833 message_id: properties.message_id().as_ref().map(|s| s.to_string()),
834 correlation_id: properties.correlation_id().as_ref().map(|s| s.to_string()),
835 reply_to: properties.reply_to().as_ref().map(|s| s.to_string()),
836 delivery_tag: delivery.delivery_tag,
837 redelivered: delivery.redelivered,
838 exchange: delivery.exchange.to_string(),
839 routing_key: delivery.routing_key.to_string(),
840 headers: properties.headers().clone().unwrap_or_default(),
841 timestamp: *properties.timestamp(),
842 retry_count: Self::get_retry_count_from_headers(
843 properties
844 .headers()
845 .as_ref()
846 .unwrap_or(&FieldTable::default()),
847 ),
848 }
849 }
850
851 fn get_retry_count_from_headers(headers: &FieldTable) -> u32 {
853 headers
854 .inner()
855 .get("x-retry-count")
856 .and_then(|v| match v {
857 lapin::types::AMQPValue::LongInt(count) => Some(*count as u32),
858 lapin::types::AMQPValue::LongLongInt(count) => Some(*count as u32),
859 _ => None,
860 })
861 .unwrap_or(0)
862 }
863
864 async fn ack_message(delivery: &Delivery, channel: &Channel) -> Result<()> {
866 channel
867 .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
868 .await?;
869 Ok(())
870 }
871
872 async fn reject_message(delivery: &Delivery, channel: &Channel, requeue: bool) -> Result<()> {
874 channel
875 .basic_nack(
876 delivery.delivery_tag,
877 BasicNackOptions {
878 multiple: false,
879 requeue,
880 },
881 )
882 .await?;
883 Ok(())
884 }
885
886 async fn handle_retry(
888 delivery: &Delivery,
889 channel: &Channel,
890 context: &MessageContext,
891 retry_policy: &RetryPolicy,
892 publisher: &Publisher,
893 exchange_name: &str,
894 ) -> Result<()> {
895 if context.retry_count >= retry_policy.max_retries {
896 warn!(
897 "Max retries exceeded for message: {}",
898 delivery.delivery_tag
899 );
900
901 if let Some(ref dle) = retry_policy.dead_letter_exchange {
903 Self::send_to_dead_letter(delivery, dle, context, publisher).await?;
904 } else {
905 Self::reject_message(delivery, channel, false).await?;
906 }
907 return Ok(());
908 }
909
910 let delay = retry_policy.calculate_delay(context.retry_count);
912 let delayed_exchange_name = format!("{}.retry", exchange_name);
913
914 let mut headers = delivery.properties.headers().clone().unwrap_or_default();
916 headers.insert(
917 "x-retry-count".into(),
918 lapin::types::AMQPValue::LongInt((context.retry_count + 1) as i32),
919 );
920 headers.insert(
921 "x-original-queue".into(),
922 lapin::types::AMQPValue::LongString(context.routing_key.clone().into()),
923 );
924
925 let mut properties = BasicProperties::default()
927 .with_content_type("application/json".into())
928 .with_delivery_mode(2)
929 .with_headers(headers);
930
931 let mut delay_headers = properties.headers().clone().unwrap_or_default();
933 delay_headers.insert(
934 "x-delay".into(),
935 lapin::types::AMQPValue::LongLongInt(delay.as_millis() as i64),
936 );
937 properties = properties.with_headers(delay_headers);
938
939 channel
941 .basic_publish(
942 &delayed_exchange_name,
943 &context.routing_key,
944 BasicPublishOptions::default(),
945 &delivery.data,
946 properties,
947 )
948 .await?;
949
950 info!(
951 "Retrying message after {:?} (attempt {})",
952 delay,
953 context.retry_count + 1
954 );
955
956 Self::ack_message(delivery, channel).await?;
958
959 Ok(())
960 }
961
962 async fn send_to_dead_letter(
964 delivery: &Delivery,
965 dead_letter_exchange: &str,
966 _context: &MessageContext,
967 publisher: &Publisher,
968 ) -> Result<()> {
969 let mut headers = delivery.properties.headers().clone().unwrap_or_default();
971 headers.insert(
972 "x-death-reason".into(),
973 lapin::types::AMQPValue::LongString("max-retries-exceeded".into()),
974 );
975 headers.insert(
976 "x-death-time".into(),
977 lapin::types::AMQPValue::LongLongInt(chrono::Utc::now().timestamp_millis()),
978 );
979
980 let properties = BasicProperties::default()
982 .with_content_type("application/json".into())
983 .with_delivery_mode(2)
984 .with_headers(headers);
985
986 let connection = publisher.get_connection().await?;
988 let dlx_channel = connection.create_channel().await?;
989
990 dlx_channel
991 .basic_publish(
992 dead_letter_exchange,
993 "dead-letter", BasicPublishOptions::default(),
995 &delivery.data,
996 properties,
997 )
998 .await?;
999
1000 warn!(
1001 "Sent message to dead letter exchange: {}",
1002 dead_letter_exchange
1003 );
1004
1005 Ok(())
1006 }
1007
1008 pub async fn stop(&self) -> Result<()> {
1010 info!("Stopping consumer for queue: {}", self.options.queue_name);
1013 Ok(())
1014 }
1015
1016 async fn declare_queue_and_exchange(
1018 channel: &Channel,
1019 options: &ConsumerOptions,
1020 ) -> Result<()> {
1021 let queue_options = LapinQueueDeclareOptions {
1023 passive: options.queue_options.passive,
1024 durable: options.queue_options.durable,
1025 exclusive: options.queue_options.exclusive,
1026 auto_delete: options.queue_options.auto_delete,
1027 nowait: false,
1028 };
1029
1030 channel
1031 .queue_declare(
1032 &options.queue_name,
1033 queue_options,
1034 options.queue_options.arguments.clone(),
1035 )
1036 .await?;
1037
1038 debug!("Declared queue: {}", options.queue_name);
1039
1040 if options.auto_declare_exchange {
1042 let exchange_name = options
1043 .exchange_name
1044 .as_ref()
1045 .unwrap_or(&options.queue_name);
1046
1047 let exchange_options = ExchangeDeclareOptions {
1049 passive: options.exchange_options.passive,
1050 durable: options.exchange_options.durable,
1051 auto_delete: options.exchange_options.auto_delete,
1052 internal: options.exchange_options.internal,
1053 nowait: false,
1054 };
1055
1056 let mut arguments = options.exchange_options.arguments.clone();
1058 if matches!(options.exchange_options.exchange_type, ExchangeKind::Custom(ref kind) if kind == "x-delayed-message")
1059 {
1060 arguments.insert(
1061 "x-delayed-type".into(),
1062 lapin::types::AMQPValue::LongString(
1063 match options.exchange_options.original_type {
1064 ExchangeKind::Direct => "direct".into(),
1065 ExchangeKind::Fanout => "fanout".into(),
1066 ExchangeKind::Topic => "topic".into(),
1067 ExchangeKind::Headers => "headers".into(),
1068 ExchangeKind::Custom(ref s) => s.clone().into(),
1069 },
1070 ),
1071 );
1072 }
1073
1074 channel
1075 .exchange_declare(
1076 exchange_name,
1077 options.exchange_options.exchange_type.clone(),
1078 exchange_options,
1079 arguments,
1080 )
1081 .await?;
1082
1083 debug!("Declared exchange: {}", exchange_name);
1084
1085 let routing_key = options.routing_key.as_ref().unwrap_or(&options.queue_name);
1087
1088 channel
1089 .queue_bind(
1090 &options.queue_name,
1091 exchange_name,
1092 routing_key,
1093 QueueBindOptions::default(),
1094 FieldTable::default(),
1095 )
1096 .await?;
1097
1098 debug!(
1099 "Bound queue '{}' to exchange '{}' with routing key '{}'",
1100 options.queue_name, exchange_name, routing_key
1101 );
1102 }
1103
1104 Ok(())
1105 }
1106
1107 async fn setup_retry_infrastructure(
1109 connection_manager: &ConnectionManager,
1110 options: &ConsumerOptions,
1111 ) -> Result<()> {
1112 if let Some(ref retry_policy) = options.retry_policy {
1113 let delayed_exchange_name = format!(
1115 "{}.retry",
1116 options
1117 .exchange_name
1118 .as_ref()
1119 .unwrap_or(&options.queue_name)
1120 );
1121
1122 let delayed_exchange = DelayedMessageExchange::new(
1124 connection_manager.clone(),
1125 delayed_exchange_name.clone(),
1126 retry_policy.clone(),
1127 );
1128
1129 delayed_exchange.setup().await?;
1131
1132 delayed_exchange
1134 .setup_queue_retry(&options.queue_name)
1135 .await?;
1136
1137 debug!(
1138 "Setup retry infrastructure for queue: {} with delayed exchange: {}",
1139 options.queue_name, delayed_exchange_name
1140 );
1141 }
1142
1143 Ok(())
1144 }
1145}
1146
1147pub struct SimpleMessageHandler<F, T>
1149where
1150 F: Fn(T, MessageContext) -> MessageResult + Send + Sync,
1151 T: DeserializeOwned + Send + Sync,
1152{
1153 handler_fn: F,
1154 _phantom: std::marker::PhantomData<T>,
1155}
1156
1157impl<F, T> SimpleMessageHandler<F, T>
1158where
1159 F: Fn(T, MessageContext) -> MessageResult + Send + Sync + 'static,
1160 T: DeserializeOwned + Send + Sync + 'static,
1161{
1162 pub fn new(handler_fn: F) -> Self {
1163 Self {
1164 handler_fn,
1165 _phantom: std::marker::PhantomData,
1166 }
1167 }
1168}
1169
1170#[async_trait]
1171impl<F, T> MessageHandler<T> for SimpleMessageHandler<F, T>
1172where
1173 F: Fn(T, MessageContext) -> MessageResult + Send + Sync + 'static,
1174 T: DeserializeOwned + Send + Sync + 'static,
1175{
1176 async fn handle(&self, message: T, context: MessageContext) -> MessageResult {
1177 (self.handler_fn)(message, context)
1178 }
1179}