1use crate::{
2 connection::Connection,
3 error::RustRabbitError,
4 message::{ErrorType, MessageEnvelope},
5 retry::RetryConfig
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9 options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
10 types::FieldTable,
11 Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use tokio::sync::Semaphore;
17use tracing::{debug, error, warn};
18
19#[derive(Debug)]
21pub struct Message<T>
22where
23 T: Clone,
24{
25 pub data: T,
26 pub retry_attempt: u32,
27 tag: u64,
28 channel: Arc<Channel>,
29}
30
31impl<T> Clone for Message<T>
32where
33 T: Clone,
34{
35 fn clone(&self) -> Self {
36 Self {
37 data: self.data.clone(),
38 retry_attempt: self.retry_attempt,
39 tag: self.tag,
40 channel: Arc::clone(&self.channel),
41 }
42 }
43}
44
45impl<T> Message<T>
46where
47 T: Clone,
48{
49 pub async fn ack(&self) -> Result<(), RustRabbitError> {
51 self.channel
52 .basic_ack(self.tag, BasicAckOptions::default())
53 .await
54 .map_err(RustRabbitError::from)
55 }
56
57 pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
59 self.channel
60 .basic_nack(
61 self.tag,
62 lapin::options::BasicNackOptions {
63 multiple: false,
64 requeue,
65 },
66 )
67 .await
68 .map_err(RustRabbitError::from)
69 }
70}
71
72pub struct ConsumerBuilder {
74 connection: Arc<Connection>,
75 queue_name: String,
76 exchange_name: Option<String>,
77 routing_key: Option<String>,
78 retry_config: Option<RetryConfig>,
79 prefetch_count: Option<u16>,
80 auto_ack: bool,
81}
82
83impl ConsumerBuilder {
84 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
85 Self {
86 connection,
87 queue_name: queue_name.into(),
88 exchange_name: None,
89 routing_key: None,
90 retry_config: None,
91 prefetch_count: Some(10),
92 auto_ack: true,
93 }
94 }
95
96 pub fn bind_to_exchange(
98 mut self,
99 exchange: impl Into<String>,
100 routing_key: impl Into<String>,
101 ) -> Self {
102 self.exchange_name = Some(exchange.into());
103 self.routing_key = Some(routing_key.into());
104 self
105 }
106
107 pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
109 self.routing_key = Some(routing_key.into());
110 self
111 }
112
113 pub fn concurrency(mut self, count: u16) -> Self {
115 self.prefetch_count = Some(count);
116 self
117 }
118
119 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
121 self.retry_config = Some(retry_config);
122 self
123 }
124
125 pub fn with_prefetch(mut self, count: u16) -> Self {
127 self.prefetch_count = Some(count);
128 self
129 }
130
131 pub fn manual_ack(mut self) -> Self {
133 self.auto_ack = false;
134 self
135 }
136
137 pub fn build(self) -> Consumer {
139 Consumer {
140 connection: self.connection,
141 queue_name: self.queue_name,
142 exchange_name: self.exchange_name,
143 routing_key: self.routing_key,
144 retry_config: self.retry_config,
145 prefetch_count: self.prefetch_count.unwrap_or(10),
146 auto_ack: self.auto_ack,
147 }
148 }
149}
150
151pub struct Consumer {
153 connection: Arc<Connection>,
154 queue_name: String,
155 exchange_name: Option<String>,
156 routing_key: Option<String>,
157 #[allow(dead_code)]
158 retry_config: Option<RetryConfig>,
159 prefetch_count: u16,
160 auto_ack: bool,
161}
162
163impl Consumer {
164 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
166 ConsumerBuilder::new(connection, queue_name)
167 }
168
169 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
171 where
172 T: DeserializeOwned + Send + Clone + Sync + 'static,
173 H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
174 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
175 {
176 let channel = self.connection.create_channel().await?;
177
178 channel
180 .basic_qos(
181 self.prefetch_count,
182 lapin::options::BasicQosOptions::default(),
183 )
184 .await?;
185
186 self.setup_infrastructure(&channel).await?;
188
189 let mut consumer = channel
191 .basic_consume(
192 &self.queue_name,
193 "",
194 BasicConsumeOptions::default(),
195 FieldTable::default(),
196 )
197 .await?;
198
199 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
200
201 debug!("Started consuming from queue: {}", self.queue_name);
202
203 while let Some(delivery_result) = consumer.next().await {
205 let delivery = delivery_result?;
206 let permit = semaphore.clone().acquire_owned().await.unwrap();
207 let handler_clone = handler.clone();
208 let auto_ack = self.auto_ack;
209 let channel_clone = Arc::new(channel.clone());
210
211 tokio::spawn(async move {
212 let _permit = permit;
213
214 match serde_json::from_slice::<T>(&delivery.data) {
216 Ok(data) => {
217 let message = Message {
218 data,
219 retry_attempt: 0, tag: delivery.delivery_tag,
221 channel: channel_clone.clone(),
222 };
223
224 match handler_clone(message.clone()).await {
226 Ok(()) => {
227 if auto_ack {
228 if let Err(e) = message.ack().await {
229 error!("Failed to ack message: {}", e);
230 }
231 }
232 debug!("Message processed successfully");
233 }
234 Err(e) => {
235 error!("Handler error: {}", e);
236 if auto_ack {
237 if let Err(e) = message.nack(false).await {
239 error!("Failed to nack message: {}", e);
240 }
241 }
242 }
243 }
244 }
245 Err(e) => {
246 error!("Failed to deserialize message: {}", e);
247 if auto_ack {
248 if let Err(e) = channel_clone
250 .basic_nack(
251 delivery.delivery_tag,
252 lapin::options::BasicNackOptions {
253 multiple: false,
254 requeue: false,
255 },
256 )
257 .await
258 {
259 error!("Failed to nack malformed message: {}", e);
260 }
261 }
262 }
263 }
264 });
265 }
266
267 Ok(())
268 }
269
270
271
272 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
274 channel
276 .queue_declare(
277 &self.queue_name,
278 QueueDeclareOptions {
279 durable: true,
280 ..Default::default()
281 },
282 FieldTable::default(),
283 )
284 .await?;
285
286 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
288 channel
289 .queue_bind(
290 &self.queue_name,
291 exchange,
292 routing_key,
293 lapin::options::QueueBindOptions::default(),
294 FieldTable::default(),
295 )
296 .await?;
297 }
298
299 Ok(())
300 }
301
302 pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
304 where
305 T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
306 H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
307 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
308 {
309 let channel = self.connection.create_channel().await?;
310 let retry_config = self.retry_config.clone();
311
312 channel
314 .basic_qos(
315 self.prefetch_count,
316 lapin::options::BasicQosOptions::default(),
317 )
318 .await?;
319
320 self.setup_infrastructure(&channel).await?;
322
323 let mut consumer = channel
325 .basic_consume(
326 &self.queue_name,
327 "rust-rabbit-envelope-consumer",
328 BasicConsumeOptions::default(),
329 FieldTable::default(),
330 )
331 .await?;
332
333 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
334
335 debug!("Started consuming envelopes from queue: {}", self.queue_name);
336
337 while let Some(delivery_result) = consumer.next().await {
339 let delivery = delivery_result?;
340 let permit = semaphore.clone().acquire_owned().await.unwrap();
341 let handler_clone = handler.clone();
342 let auto_ack = self.auto_ack;
343 let channel_clone = Arc::new(channel.clone());
344 let retry_config_clone = retry_config.clone();
345 let queue_name = self.queue_name.clone();
346 let connection = self.connection.clone();
347
348 tokio::spawn(async move {
349 let _permit = permit;
350
351 match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
353 Ok(mut envelope) => {
354 debug!(
355 "Processing envelope {} (attempt {}/{})",
356 envelope.metadata.message_id,
357 envelope.metadata.retry_attempt + 1,
358 envelope.metadata.max_retries + 1
359 );
360
361 match handler_clone(envelope.clone()).await {
363 Ok(()) => {
364 if auto_ack {
365 if let Err(e) = channel_clone
366 .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
367 .await
368 {
369 error!("Failed to ack message: {}", e);
370 }
371 }
372 debug!("Envelope {} processed successfully", envelope.metadata.message_id);
373 }
374 Err(e) => {
375 error!("Handler error for envelope {}: {}", envelope.metadata.message_id, e);
376
377 let error_type = classify_error(e.as_ref());
379
380 envelope = envelope.with_error(
382 &e.to_string(),
383 error_type,
384 Some(&format!("Queue: {}", queue_name))
385 );
386
387 if auto_ack {
388 if let Some(retry_cfg) = &retry_config_clone {
390 if !envelope.is_retry_exhausted() {
391 if let Some(_delay) = retry_cfg.calculate_delay(envelope.metadata.retry_attempt - 1) {
393 warn!(
394 "Scheduling retry {} for envelope {} (simple requeue for now)",
395 envelope.metadata.retry_attempt,
396 envelope.metadata.message_id,
397 );
398
399 if let Err(e) = channel_clone
402 .basic_nack(
403 delivery.delivery_tag,
404 lapin::options::BasicNackOptions {
405 multiple: false,
406 requeue: true, },
408 )
409 .await
410 {
411 error!("Failed to nack message for retry: {}", e);
412 }
413 } else {
414 Self::send_to_dlq(&envelope, retry_cfg, &connection, &queue_name).await;
416
417 if let Err(e) = channel_clone
419 .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
420 .await
421 {
422 error!("Failed to ack message after DLQ: {}", e);
423 }
424 }
425 } else {
426 warn!("Retry exhausted for envelope {}", envelope.metadata.message_id);
428 Self::send_to_dlq(&envelope, retry_cfg, &connection, &queue_name).await;
429
430 if let Err(e) = channel_clone
432 .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
433 .await
434 {
435 error!("Failed to ack message after DLQ: {}", e);
436 }
437 }
438 } else {
439 if let Err(e) = channel_clone
441 .basic_nack(
442 delivery.delivery_tag,
443 lapin::options::BasicNackOptions {
444 multiple: false,
445 requeue: false,
446 },
447 )
448 .await
449 {
450 error!("Failed to nack message: {}", e);
451 }
452 }
453 }
454 }
455 }
456 }
457 Err(e) => {
458 error!("Failed to deserialize message envelope: {}", e);
459 if auto_ack {
460 if let Err(e) = channel_clone
462 .basic_nack(
463 delivery.delivery_tag,
464 lapin::options::BasicNackOptions {
465 multiple: false,
466 requeue: false,
467 },
468 )
469 .await
470 {
471 error!("Failed to nack malformed envelope: {}", e);
472 }
473 }
474 }
475 }
476 });
477 }
478
479 Ok(())
480 }
481
482 async fn send_to_dlq<T>(
484 envelope: &MessageEnvelope<T>,
485 retry_config: &RetryConfig,
486 connection: &Arc<Connection>,
487 queue_name: &str,
488 ) where
489 T: serde::Serialize,
490 {
491 match connection.create_channel().await {
492 Ok(dlq_channel) => {
493 let dlq_name = retry_config.get_dead_letter_queue(queue_name);
494
495 if let Err(e) = dlq_channel
497 .queue_declare(
498 &dlq_name,
499 QueueDeclareOptions {
500 durable: true,
501 ..Default::default()
502 },
503 FieldTable::default(),
504 )
505 .await
506 {
507 error!("Failed to declare DLQ {}: {}", dlq_name, e);
508 return;
509 }
510
511 let failure_summary = envelope.get_failure_summary();
513 let dlq_payload = serde_json::json!({
514 "envelope": envelope,
515 "failure_summary": failure_summary,
516 "sent_to_dlq_at": chrono::Utc::now(),
517 });
518
519 if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
520 if let Err(e) = dlq_channel
521 .basic_publish(
522 "",
523 &dlq_name,
524 lapin::options::BasicPublishOptions::default(),
525 &payload_bytes,
526 lapin::BasicProperties::default(),
527 )
528 .await
529 {
530 error!("Failed to publish to DLQ {}: {}", dlq_name, e);
531 } else {
532 warn!("Sent envelope {} to DLQ: {}", envelope.metadata.message_id, failure_summary);
533 }
534 }
535 }
536 Err(e) => {
537 error!("Failed to create DLQ channel: {}", e);
538 }
539 }
540 }
541}
542
543fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
545 let error_msg = error.to_string().to_lowercase();
546
547 if error_msg.contains("timeout")
548 || error_msg.contains("connection")
549 || error_msg.contains("network")
550 || error_msg.contains("temporary") {
551 ErrorType::Transient
552 } else if error_msg.contains("rate limit")
553 || error_msg.contains("quota")
554 || error_msg.contains("resource") {
555 ErrorType::Resource
556 } else if error_msg.contains("validation")
557 || error_msg.contains("authentication")
558 || error_msg.contains("authorization")
559 || error_msg.contains("invalid")
560 || error_msg.contains("bad request") {
561 ErrorType::Permanent
562 } else {
563 ErrorType::Unknown
564 }
565}