rust_rabbit/consumer.rs
1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{ErrorType, MessageEnvelope, WireMessage},
5 retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9 options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10 types::{AMQPValue, FieldTable},
11 BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use tokio::sync::Semaphore;
17use tracing::{debug, error, warn};
18
19/// Message wrapper with retry tracking
20#[derive(Debug)]
21pub struct Message<T>
22where
23 T: Clone,
24{
25 pub data: T,
26 pub retry_attempt: u32,
27 tag: u64,
28 channel: Arc<Channel>,
29}
30
31impl<T> Clone for Message<T>
32where
33 T: Clone,
34{
35 fn clone(&self) -> Self {
36 Self {
37 data: self.data.clone(),
38 retry_attempt: self.retry_attempt,
39 tag: self.tag,
40 channel: Arc::clone(&self.channel),
41 }
42 }
43}
44
45impl<T> Message<T>
46where
47 T: Clone,
48{
49 /// Acknowledge the message
50 pub async fn ack(&self) -> Result<(), RustRabbitError> {
51 self.channel
52 .basic_ack(self.tag, BasicAckOptions::default())
53 .await
54 .map_err(RustRabbitError::from)
55 }
56
57 /// Reject and requeue the message
58 pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
59 self.channel
60 .basic_nack(
61 self.tag,
62 lapin::options::BasicNackOptions {
63 multiple: false,
64 requeue,
65 },
66 )
67 .await
68 .map_err(RustRabbitError::from)
69 }
70}
71
72/// Consumer configuration builder
73pub struct ConsumerBuilder {
74 connection: Arc<Connection>,
75 queue_name: String,
76 exchange_name: Option<String>,
77 routing_key: Option<String>,
78 retry_config: Option<RetryConfig>,
79 prefetch_count: Option<u16>,
80 auto_ack: bool,
81}
82
83impl ConsumerBuilder {
84 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
85 Self {
86 connection,
87 queue_name: queue_name.into(),
88 exchange_name: None,
89 routing_key: None,
90 retry_config: None,
91 prefetch_count: Some(10),
92 auto_ack: true,
93 }
94 }
95
96 /// Bind to an exchange with routing key
97 pub fn bind_to_exchange(
98 mut self,
99 exchange: impl Into<String>,
100 routing_key: impl Into<String>,
101 ) -> Self {
102 self.exchange_name = Some(exchange.into());
103 self.routing_key = Some(routing_key.into());
104 self
105 }
106
107 /// Set routing key (for use with bind_to_exchange)
108 pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
109 self.routing_key = Some(routing_key.into());
110 self
111 }
112
113 /// Set concurrency level (same as prefetch count)
114 pub fn concurrency(mut self, count: u16) -> Self {
115 self.prefetch_count = Some(count);
116 self
117 }
118
119 /// Configure retry behavior
120 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
121 self.retry_config = Some(retry_config);
122 self
123 }
124
125 /// Set prefetch count
126 pub fn with_prefetch(mut self, count: u16) -> Self {
127 self.prefetch_count = Some(count);
128 self
129 }
130
131 /// Disable auto-acknowledge (manual ack required)
132 pub fn manual_ack(mut self) -> Self {
133 self.auto_ack = false;
134 self
135 }
136
137 /// Build the consumer
138 pub fn build(self) -> Consumer {
139 Consumer {
140 connection: self.connection,
141 queue_name: self.queue_name,
142 exchange_name: self.exchange_name,
143 routing_key: self.routing_key,
144 retry_config: self.retry_config,
145 prefetch_count: self.prefetch_count.unwrap_or(10),
146 auto_ack: self.auto_ack,
147 }
148 }
149}
150
151/// Simplified Consumer for message consumption
152pub struct Consumer {
153 connection: Arc<Connection>,
154 queue_name: String,
155 exchange_name: Option<String>,
156 routing_key: Option<String>,
157 retry_config: Option<RetryConfig>,
158 prefetch_count: u16,
159 auto_ack: bool,
160}
161
162impl Consumer {
163 /// Create a new consumer builder
164 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
165 ConsumerBuilder::new(connection, queue_name)
166 }
167
168 /// Create retry queue with TTL
169 async fn create_retry_queue(
170 &self,
171 channel: &Channel,
172 retry_attempt: u32,
173 delay: std::time::Duration,
174 ) -> Result<String, RustRabbitError> {
175 let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
176 let delay_ms = delay.as_millis() as i64;
177
178 // Create retry queue with TTL that routes back to original queue
179 let mut args = FieldTable::default();
180 args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
181 args.insert(
182 "x-dead-letter-exchange".into(),
183 AMQPValue::LongString("".into()),
184 ); // Default exchange
185 args.insert(
186 "x-dead-letter-routing-key".into(),
187 AMQPValue::LongString(self.queue_name.clone().into()),
188 );
189
190 channel
191 .queue_declare(
192 &retry_queue_name,
193 QueueDeclareOptions {
194 durable: true,
195 ..Default::default()
196 },
197 args,
198 )
199 .await?;
200
201 debug!(
202 "Created retry queue: {} with TTL: {}ms",
203 retry_queue_name, delay_ms
204 );
205 Ok(retry_queue_name)
206 }
207
208 /// Create DLQ (Dead Letter Queue)
209 async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
210 let dlq_name = format!("{}.dlq", self.queue_name);
211
212 channel
213 .queue_declare(
214 &dlq_name,
215 QueueDeclareOptions {
216 durable: true,
217 ..Default::default()
218 },
219 FieldTable::default(),
220 )
221 .await?;
222
223 debug!("Created DLQ: {}", dlq_name);
224 Ok(dlq_name)
225 }
226
227 /// Send message to retry queue with delay
228 async fn send_to_retry_queue(
229 &self,
230 channel: &Channel,
231 message_data: &[u8],
232 retry_attempt: u32,
233 delay: std::time::Duration,
234 ) -> Result<(), RustRabbitError> {
235 let retry_queue_name = self
236 .create_retry_queue(channel, retry_attempt, delay)
237 .await?;
238
239 // Publish to retry queue
240 channel
241 .basic_publish(
242 "", // Default exchange
243 &retry_queue_name,
244 BasicPublishOptions::default(),
245 message_data,
246 BasicProperties::default()
247 .with_content_type("application/json".into())
248 .with_delivery_mode(2), // Persistent
249 )
250 .await?
251 .await?;
252
253 debug!("Sent message to retry queue: {}", retry_queue_name);
254 Ok(())
255 }
256
257 /// Send message to DLQ
258 async fn send_to_dlq_simple(
259 &self,
260 channel: &Channel,
261 message_data: &[u8],
262 ) -> Result<(), RustRabbitError> {
263 let dlq_name = self.create_dlq(channel).await?;
264
265 // Publish to DLQ
266 channel
267 .basic_publish(
268 "", // Default exchange
269 &dlq_name,
270 BasicPublishOptions::default(),
271 message_data,
272 BasicProperties::default()
273 .with_content_type("application/json".into())
274 .with_delivery_mode(2), // Persistent
275 )
276 .await?
277 .await?;
278
279 debug!("Sent message to DLQ: {}", dlq_name);
280 Ok(())
281 }
282
283 /// Start consuming messages
284 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
285 where
286 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
287 H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
288 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
289 {
290 let channel = self.connection.create_channel().await?;
291
292 // Set prefetch count
293 channel
294 .basic_qos(
295 self.prefetch_count,
296 lapin::options::BasicQosOptions::default(),
297 )
298 .await?;
299
300 // Setup infrastructure (queues, exchanges)
301 self.setup_infrastructure(&channel).await?;
302
303 // Start consuming
304 let mut consumer = channel
305 .basic_consume(
306 &self.queue_name,
307 "",
308 BasicConsumeOptions::default(),
309 FieldTable::default(),
310 )
311 .await?;
312
313 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
314
315 debug!("Started consuming from queue: {}", self.queue_name);
316
317 // Process messages
318 while let Some(delivery_result) = consumer.next().await {
319 let delivery = delivery_result?;
320 let permit = semaphore.clone().acquire_owned().await.unwrap();
321 let handler_clone = handler.clone();
322 let auto_ack = self.auto_ack;
323 let channel_clone = Arc::new(channel.clone());
324 let retry_config = self.retry_config.clone();
325 let consumer_self = Consumer {
326 connection: self.connection.clone(),
327 queue_name: self.queue_name.clone(),
328 exchange_name: self.exchange_name.clone(),
329 routing_key: self.routing_key.clone(),
330 retry_config: self.retry_config.clone(),
331 prefetch_count: self.prefetch_count,
332 auto_ack: self.auto_ack,
333 };
334
335 tokio::spawn(async move {
336 let _permit = permit;
337
338 // Deserialize as WireMessage format
339 match serde_json::from_slice::<crate::message::WireMessage<T>>(&delivery.data) {
340 Ok(wire_msg) => {
341 let message = Message {
342 data: wire_msg.data,
343 retry_attempt: wire_msg.retry_attempt,
344 tag: delivery.delivery_tag,
345 channel: channel_clone.clone(),
346 };
347
348 // Process message
349 match handler_clone(message.clone()).await {
350 Ok(()) => {
351 if auto_ack {
352 if let Err(e) = message.ack().await {
353 error!("Failed to ack message: {}", e);
354 }
355 }
356 debug!("Message processed successfully");
357 }
358 Err(e) => {
359 error!("Handler error: {}", e);
360 if auto_ack {
361 // Check if retry is configured
362 if let Some(retry_cfg) = &retry_config {
363 if message.retry_attempt < retry_cfg.max_retries {
364 // Calculate delay for next retry
365 if let Some(delay) =
366 retry_cfg.calculate_delay(message.retry_attempt)
367 {
368 warn!(
369 "Scheduling retry {} with delay {:?} for message",
370 message.retry_attempt + 1,
371 delay
372 );
373
374 // Update retry attempt in wire message
375 let wire_msg = WireMessage {
376 data: message.data.clone(),
377 retry_attempt: message.retry_attempt + 1,
378 };
379
380 let retry_payload =
381 match serde_json::to_vec(&wire_msg) {
382 Ok(payload) => payload,
383 Err(e) => {
384 error!(
385 "Failed to serialize retry message: {}",
386 e
387 );
388 if let Err(e) =
389 message.nack(false).await
390 {
391 error!(
392 "Failed to nack message: {}",
393 e
394 );
395 }
396 return;
397 }
398 };
399
400 // Send to retry queue with delay
401 if let Err(e) = consumer_self
402 .send_to_retry_queue(
403 &channel_clone,
404 &retry_payload,
405 message.retry_attempt + 1,
406 delay,
407 )
408 .await
409 {
410 error!("Failed to send to retry queue: {}", e);
411 if let Err(e) = message.nack(false).await {
412 error!("Failed to nack message: {}", e);
413 }
414 return;
415 }
416
417 // ACK original message (it's now in retry queue)
418 if let Err(e) = message.ack().await {
419 error!(
420 "Failed to ack message after retry: {}",
421 e
422 );
423 }
424 } else {
425 // No more retries, send to DLQ
426 warn!("Retry exhausted, sending to DLQ");
427 if let Err(e) = consumer_self
428 .send_to_dlq_simple(
429 &channel_clone,
430 &delivery.data,
431 )
432 .await
433 {
434 error!("Failed to send to DLQ: {}", e);
435 }
436 if let Err(e) = message.ack().await {
437 error!(
438 "Failed to ack message after DLQ: {}",
439 e
440 );
441 }
442 }
443 } else {
444 // Retry exhausted, send to DLQ
445 warn!("Max retries reached, sending to DLQ");
446 if let Err(e) = consumer_self
447 .send_to_dlq_simple(&channel_clone, &delivery.data)
448 .await
449 {
450 error!("Failed to send to DLQ: {}", e);
451 }
452 if let Err(e) = message.ack().await {
453 error!("Failed to ack message after DLQ: {}", e);
454 }
455 }
456 } else {
457 // No retry config, just nack
458 if let Err(e) = message.nack(false).await {
459 error!("Failed to nack message: {}", e);
460 }
461 }
462 }
463 }
464 }
465 }
466 Err(e) => {
467 error!("Failed to deserialize message: {}", e);
468 if auto_ack {
469 // Reject malformed messages
470 if let Err(e) = channel_clone
471 .basic_nack(
472 delivery.delivery_tag,
473 lapin::options::BasicNackOptions {
474 multiple: false,
475 requeue: false,
476 },
477 )
478 .await
479 {
480 error!("Failed to nack malformed message: {}", e);
481 }
482 }
483 }
484 }
485 });
486 }
487
488 Ok(())
489 }
490
491 /// Setup queue and exchange infrastructure
492 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
493 // Declare queue
494 channel
495 .queue_declare(
496 &self.queue_name,
497 QueueDeclareOptions {
498 durable: true,
499 ..Default::default()
500 },
501 FieldTable::default(),
502 )
503 .await?;
504
505 // Bind to exchange if specified
506 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
507 channel
508 .queue_bind(
509 &self.queue_name,
510 exchange,
511 routing_key,
512 lapin::options::QueueBindOptions::default(),
513 FieldTable::default(),
514 )
515 .await?;
516 }
517
518 Ok(())
519 }
520
521 /// Start consuming message envelopes with full retry support
522 pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
523 where
524 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
525 H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
526 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
527 {
528 let channel = self.connection.create_channel().await?;
529 let retry_config = self.retry_config.clone();
530
531 // Set prefetch count
532 channel
533 .basic_qos(
534 self.prefetch_count,
535 lapin::options::BasicQosOptions::default(),
536 )
537 .await?;
538
539 // Setup queue and exchange
540 self.setup_infrastructure(&channel).await?;
541
542 // Create consumer
543 let mut consumer = channel
544 .basic_consume(
545 &self.queue_name,
546 "rust-rabbit-envelope-consumer",
547 BasicConsumeOptions::default(),
548 FieldTable::default(),
549 )
550 .await?;
551
552 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
553
554 debug!(
555 "Started consuming envelopes from queue: {}",
556 self.queue_name
557 );
558
559 // Process message envelopes with retry support
560 while let Some(delivery_result) = consumer.next().await {
561 let delivery = delivery_result?;
562 let permit = semaphore.clone().acquire_owned().await.unwrap();
563 let handler_clone = handler.clone();
564 let auto_ack = self.auto_ack;
565 let channel_clone = Arc::new(channel.clone());
566 let retry_config_clone = retry_config.clone();
567 let queue_name = self.queue_name.clone();
568 let connection = self.connection.clone();
569
570 tokio::spawn(async move {
571 let _permit = permit;
572
573 // Try to deserialize as MessageEnvelope
574 match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
575 Ok(mut envelope) => {
576 debug!(
577 "Processing envelope {} (attempt {}/{})",
578 envelope.metadata.message_id,
579 envelope.metadata.retry_attempt + 1,
580 envelope.metadata.max_retries + 1
581 );
582
583 // Process message
584 match handler_clone(envelope.clone()).await {
585 Ok(()) => {
586 if auto_ack {
587 if let Err(e) = channel_clone
588 .basic_ack(
589 delivery.delivery_tag,
590 BasicAckOptions::default(),
591 )
592 .await
593 {
594 error!("Failed to ack message: {}", e);
595 }
596 }
597 debug!(
598 "Envelope {} processed successfully",
599 envelope.metadata.message_id
600 );
601 }
602 Err(e) => {
603 error!(
604 "Handler error for envelope {}: {}",
605 envelope.metadata.message_id, e
606 );
607
608 // Determine error type (simplified classification)
609 let error_type = classify_error(e.as_ref());
610
611 // Add error to envelope
612 envelope = envelope.with_error(
613 &e.to_string(),
614 error_type,
615 Some(&format!("Queue: {}", queue_name)),
616 );
617
618 if auto_ack {
619 // Check if we should retry
620 if let Some(retry_cfg) = &retry_config_clone {
621 if !envelope.is_retry_exhausted() {
622 // Calculate delay and schedule retry
623 if let Some(delay) = retry_cfg
624 .calculate_delay(envelope.metadata.retry_attempt)
625 {
626 warn!(
627 "Scheduling retry {} for envelope {} with delay {:?}",
628 envelope.metadata.retry_attempt + 1,
629 envelope.metadata.message_id,
630 delay
631 );
632
633 // Increment retry attempt in envelope
634 envelope.metadata.retry_attempt += 1;
635
636 // Serialize updated envelope
637 match serde_json::to_vec(&envelope) {
638 Ok(retry_payload) => {
639 // Create consumer instance for access to methods
640 let consumer_self = Consumer {
641 connection: connection.clone(),
642 queue_name: queue_name.clone(),
643 exchange_name: None,
644 routing_key: None,
645 retry_config: retry_config_clone
646 .clone(),
647 prefetch_count: 10,
648 auto_ack: true,
649 };
650
651 // Send to retry queue with delay
652 if let Err(e) = consumer_self
653 .send_to_retry_queue(
654 &channel_clone,
655 &retry_payload,
656 envelope.metadata.retry_attempt,
657 delay,
658 )
659 .await
660 {
661 error!("Failed to send envelope to retry queue: {}", e);
662 // Fallback to simple nack
663 if let Err(e) = channel_clone
664 .basic_nack(
665 delivery.delivery_tag,
666 lapin::options::BasicNackOptions {
667 multiple: false,
668 requeue: false,
669 },
670 )
671 .await
672 {
673 error!("Failed to nack message: {}", e);
674 }
675 return;
676 }
677
678 // ACK original message (it's now in retry queue)
679 if let Err(e) = channel_clone
680 .basic_ack(
681 delivery.delivery_tag,
682 BasicAckOptions::default(),
683 )
684 .await
685 {
686 error!("Failed to ack message after retry: {}", e);
687 }
688 }
689 Err(e) => {
690 error!("Failed to serialize envelope for retry: {}", e);
691 // Fallback to simple nack
692 if let Err(e) = channel_clone
693 .basic_nack(
694 delivery.delivery_tag,
695 lapin::options::BasicNackOptions {
696 multiple: false,
697 requeue: false,
698 },
699 )
700 .await
701 {
702 error!("Failed to nack message: {}", e);
703 }
704 }
705 }
706 } else {
707 // No more retries, send to DLQ
708 Self::send_to_dlq(
709 &envelope,
710 retry_cfg,
711 &connection,
712 &queue_name,
713 )
714 .await;
715
716 // ACK original message
717 if let Err(e) = channel_clone
718 .basic_ack(
719 delivery.delivery_tag,
720 BasicAckOptions::default(),
721 )
722 .await
723 {
724 error!(
725 "Failed to ack message after DLQ: {}",
726 e
727 );
728 }
729 }
730 } else {
731 // Retry exhausted, send to DLQ
732 warn!(
733 "Retry exhausted for envelope {}",
734 envelope.metadata.message_id
735 );
736 Self::send_to_dlq(
737 &envelope,
738 retry_cfg,
739 &connection,
740 &queue_name,
741 )
742 .await;
743
744 // ACK original message
745 if let Err(e) = channel_clone
746 .basic_ack(
747 delivery.delivery_tag,
748 BasicAckOptions::default(),
749 )
750 .await
751 {
752 error!("Failed to ack message after DLQ: {}", e);
753 }
754 }
755 } else {
756 // No retry config, just nack
757 if let Err(e) = channel_clone
758 .basic_nack(
759 delivery.delivery_tag,
760 lapin::options::BasicNackOptions {
761 multiple: false,
762 requeue: false,
763 },
764 )
765 .await
766 {
767 error!("Failed to nack message: {}", e);
768 }
769 }
770 }
771 }
772 }
773 }
774 Err(e) => {
775 error!("Failed to deserialize message envelope: {}", e);
776 if auto_ack {
777 // Reject malformed messages
778 if let Err(e) = channel_clone
779 .basic_nack(
780 delivery.delivery_tag,
781 lapin::options::BasicNackOptions {
782 multiple: false,
783 requeue: false,
784 },
785 )
786 .await
787 {
788 error!("Failed to nack malformed envelope: {}", e);
789 }
790 }
791 }
792 }
793 });
794 }
795
796 Ok(())
797 }
798
799 /// Send failed message to Dead Letter Queue
800 async fn send_to_dlq<T>(
801 envelope: &MessageEnvelope<T>,
802 retry_config: &RetryConfig,
803 connection: &Arc<Connection>,
804 queue_name: &str,
805 ) where
806 T: serde::Serialize,
807 {
808 match connection.create_channel().await {
809 Ok(dlq_channel) => {
810 let dlq_name = retry_config.get_dead_letter_queue(queue_name);
811
812 // Declare DLQ
813 if let Err(e) = dlq_channel
814 .queue_declare(
815 &dlq_name,
816 QueueDeclareOptions {
817 durable: true,
818 ..Default::default()
819 },
820 FieldTable::default(),
821 )
822 .await
823 {
824 error!("Failed to declare DLQ {}: {}", dlq_name, e);
825 return;
826 }
827
828 // Publish to DLQ with failure summary
829 let failure_summary = envelope.get_failure_summary();
830 let dlq_payload = serde_json::json!({
831 "envelope": envelope,
832 "failure_summary": failure_summary,
833 "sent_to_dlq_at": chrono::Utc::now(),
834 });
835
836 if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
837 if let Err(e) = dlq_channel
838 .basic_publish(
839 "",
840 &dlq_name,
841 lapin::options::BasicPublishOptions::default(),
842 &payload_bytes,
843 lapin::BasicProperties::default(),
844 )
845 .await
846 {
847 error!("Failed to publish to DLQ {}: {}", dlq_name, e);
848 } else {
849 warn!(
850 "Sent envelope {} to DLQ: {}",
851 envelope.metadata.message_id, failure_summary
852 );
853 }
854 }
855 }
856 Err(e) => {
857 error!("Failed to create DLQ channel: {}", e);
858 }
859 }
860 }
861}
862
863/// Classify error type based on error message (simplified heuristics)
864fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
865 let error_msg = error.to_string().to_lowercase();
866
867 if error_msg.contains("timeout")
868 || error_msg.contains("connection")
869 || error_msg.contains("network")
870 || error_msg.contains("temporary")
871 {
872 ErrorType::Transient
873 } else if error_msg.contains("rate limit")
874 || error_msg.contains("quota")
875 || error_msg.contains("resource")
876 {
877 ErrorType::Resource
878 } else if error_msg.contains("validation")
879 || error_msg.contains("authentication")
880 || error_msg.contains("authorization")
881 || error_msg.contains("invalid")
882 || error_msg.contains("bad request")
883 {
884 ErrorType::Permanent
885 } else {
886 ErrorType::Unknown
887 }
888}