rust_rabbit/
consumer.rs

1use crate::{
2    connection::Connection, 
3    error::RustRabbitError, 
4    message::{ErrorType, MessageEnvelope},
5    retry::RetryConfig
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9    options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
10    types::FieldTable,
11    Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use tokio::sync::Semaphore;
17use tracing::{debug, error, warn};
18
19/// Message wrapper with retry tracking
20#[derive(Debug)]
21pub struct Message<T>
22where
23    T: Clone,
24{
25    pub data: T,
26    pub retry_attempt: u32,
27    tag: u64,
28    channel: Arc<Channel>,
29}
30
31impl<T> Clone for Message<T>
32where
33    T: Clone,
34{
35    fn clone(&self) -> Self {
36        Self {
37            data: self.data.clone(),
38            retry_attempt: self.retry_attempt,
39            tag: self.tag,
40            channel: Arc::clone(&self.channel),
41        }
42    }
43}
44
45impl<T> Message<T>
46where
47    T: Clone,
48{
49    /// Acknowledge the message
50    pub async fn ack(&self) -> Result<(), RustRabbitError> {
51        self.channel
52            .basic_ack(self.tag, BasicAckOptions::default())
53            .await
54            .map_err(RustRabbitError::from)
55    }
56
57    /// Reject and requeue the message
58    pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
59        self.channel
60            .basic_nack(
61                self.tag,
62                lapin::options::BasicNackOptions {
63                    multiple: false,
64                    requeue,
65                },
66            )
67            .await
68            .map_err(RustRabbitError::from)
69    }
70}
71
72/// Consumer configuration builder
73pub struct ConsumerBuilder {
74    connection: Arc<Connection>,
75    queue_name: String,
76    exchange_name: Option<String>,
77    routing_key: Option<String>,
78    retry_config: Option<RetryConfig>,
79    prefetch_count: Option<u16>,
80    auto_ack: bool,
81}
82
83impl ConsumerBuilder {
84    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
85        Self {
86            connection,
87            queue_name: queue_name.into(),
88            exchange_name: None,
89            routing_key: None,
90            retry_config: None,
91            prefetch_count: Some(10),
92            auto_ack: true,
93        }
94    }
95
96    /// Bind to an exchange with routing key
97    pub fn bind_to_exchange(
98        mut self,
99        exchange: impl Into<String>,
100        routing_key: impl Into<String>,
101    ) -> Self {
102        self.exchange_name = Some(exchange.into());
103        self.routing_key = Some(routing_key.into());
104        self
105    }
106
107    /// Set routing key (for use with bind_to_exchange)
108    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
109        self.routing_key = Some(routing_key.into());
110        self
111    }
112
113    /// Set concurrency level (same as prefetch count)
114    pub fn concurrency(mut self, count: u16) -> Self {
115        self.prefetch_count = Some(count);
116        self
117    }
118
119    /// Configure retry behavior
120    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
121        self.retry_config = Some(retry_config);
122        self
123    }
124
125    /// Set prefetch count
126    pub fn with_prefetch(mut self, count: u16) -> Self {
127        self.prefetch_count = Some(count);
128        self
129    }
130
131    /// Disable auto-acknowledge (manual ack required)
132    pub fn manual_ack(mut self) -> Self {
133        self.auto_ack = false;
134        self
135    }
136
137    /// Build the consumer
138    pub fn build(self) -> Consumer {
139        Consumer {
140            connection: self.connection,
141            queue_name: self.queue_name,
142            exchange_name: self.exchange_name,
143            routing_key: self.routing_key,
144            retry_config: self.retry_config,
145            prefetch_count: self.prefetch_count.unwrap_or(10),
146            auto_ack: self.auto_ack,
147        }
148    }
149}
150
151/// Simplified Consumer for message consumption
152pub struct Consumer {
153    connection: Arc<Connection>,
154    queue_name: String,
155    exchange_name: Option<String>,
156    routing_key: Option<String>,
157    #[allow(dead_code)]
158    retry_config: Option<RetryConfig>,
159    prefetch_count: u16,
160    auto_ack: bool,
161}
162
163impl Consumer {
164    /// Create a new consumer builder
165    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
166        ConsumerBuilder::new(connection, queue_name)
167    }
168
169    /// Start consuming messages
170    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
171    where
172        T: DeserializeOwned + Send + Clone + Sync + 'static,
173        H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
174        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
175    {
176        let channel = self.connection.create_channel().await?;
177
178        // Set prefetch count
179        channel
180            .basic_qos(
181                self.prefetch_count,
182                lapin::options::BasicQosOptions::default(),
183            )
184            .await?;
185
186        // Setup infrastructure (queues, exchanges)
187        self.setup_infrastructure(&channel).await?;
188
189        // Start consuming
190        let mut consumer = channel
191            .basic_consume(
192                &self.queue_name,
193                "",
194                BasicConsumeOptions::default(),
195                FieldTable::default(),
196            )
197            .await?;
198
199        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
200
201        debug!("Started consuming from queue: {}", self.queue_name);
202
203        // Process messages (simplified - no retry for now)
204        while let Some(delivery_result) = consumer.next().await {
205            let delivery = delivery_result?;
206            let permit = semaphore.clone().acquire_owned().await.unwrap();
207            let handler_clone = handler.clone();
208            let auto_ack = self.auto_ack;
209            let channel_clone = Arc::new(channel.clone());
210
211            tokio::spawn(async move {
212                let _permit = permit;
213
214                // Deserialize message
215                match serde_json::from_slice::<T>(&delivery.data) {
216                    Ok(data) => {
217                        let message = Message {
218                            data,
219                            retry_attempt: 0, // Simplified for now
220                            tag: delivery.delivery_tag,
221                            channel: channel_clone.clone(),
222                        };
223
224                        // Process message
225                        match handler_clone(message.clone()).await {
226                            Ok(()) => {
227                                if auto_ack {
228                                    if let Err(e) = message.ack().await {
229                                        error!("Failed to ack message: {}", e);
230                                    }
231                                }
232                                debug!("Message processed successfully");
233                            }
234                            Err(e) => {
235                                error!("Handler error: {}", e);
236                                if auto_ack {
237                                    // Simple reject without retry for now
238                                    if let Err(e) = message.nack(false).await {
239                                        error!("Failed to nack message: {}", e);
240                                    }
241                                }
242                            }
243                        }
244                    }
245                    Err(e) => {
246                        error!("Failed to deserialize message: {}", e);
247                        if auto_ack {
248                            // Reject malformed messages
249                            if let Err(e) = channel_clone
250                                .basic_nack(
251                                    delivery.delivery_tag,
252                                    lapin::options::BasicNackOptions {
253                                        multiple: false,
254                                        requeue: false,
255                                    },
256                                )
257                                .await
258                            {
259                                error!("Failed to nack malformed message: {}", e);
260                            }
261                        }
262                    }
263                }
264            });
265        }
266
267        Ok(())
268    }
269
270
271
272    /// Setup queue and exchange infrastructure
273    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
274        // Declare queue
275        channel
276            .queue_declare(
277                &self.queue_name,
278                QueueDeclareOptions {
279                    durable: true,
280                    ..Default::default()
281                },
282                FieldTable::default(),
283            )
284            .await?;
285
286        // Bind to exchange if specified
287        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
288            channel
289                .queue_bind(
290                    &self.queue_name,
291                    exchange,
292                    routing_key,
293                    lapin::options::QueueBindOptions::default(),
294                    FieldTable::default(),
295                )
296                .await?;
297        }
298
299        Ok(())
300    }
301
302    /// Start consuming message envelopes with full retry support
303    pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
304    where
305        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
306        H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
307        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
308    {
309        let channel = self.connection.create_channel().await?;
310        let retry_config = self.retry_config.clone();
311
312        // Set prefetch count
313        channel
314            .basic_qos(
315                self.prefetch_count,
316                lapin::options::BasicQosOptions::default(),
317            )
318            .await?;
319
320        // Setup queue and exchange
321        self.setup_infrastructure(&channel).await?;
322
323        // Create consumer
324        let mut consumer = channel
325            .basic_consume(
326                &self.queue_name,
327                "rust-rabbit-envelope-consumer",
328                BasicConsumeOptions::default(),
329                FieldTable::default(),
330            )
331            .await?;
332
333        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
334
335        debug!("Started consuming envelopes from queue: {}", self.queue_name);
336
337        // Process message envelopes with retry support
338        while let Some(delivery_result) = consumer.next().await {
339            let delivery = delivery_result?;
340            let permit = semaphore.clone().acquire_owned().await.unwrap();
341            let handler_clone = handler.clone();
342            let auto_ack = self.auto_ack;
343            let channel_clone = Arc::new(channel.clone());
344            let retry_config_clone = retry_config.clone();
345            let queue_name = self.queue_name.clone();
346            let connection = self.connection.clone();
347
348            tokio::spawn(async move {
349                let _permit = permit;
350
351                // Try to deserialize as MessageEnvelope
352                match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
353                    Ok(mut envelope) => {
354                        debug!(
355                            "Processing envelope {} (attempt {}/{})",
356                            envelope.metadata.message_id,
357                            envelope.metadata.retry_attempt + 1,
358                            envelope.metadata.max_retries + 1
359                        );
360
361                        // Process message
362                        match handler_clone(envelope.clone()).await {
363                            Ok(()) => {
364                                if auto_ack {
365                                    if let Err(e) = channel_clone
366                                        .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
367                                        .await
368                                    {
369                                        error!("Failed to ack message: {}", e);
370                                    }
371                                }
372                                debug!("Envelope {} processed successfully", envelope.metadata.message_id);
373                            }
374                            Err(e) => {
375                                error!("Handler error for envelope {}: {}", envelope.metadata.message_id, e);
376                                
377                                // Determine error type (simplified classification)
378                                let error_type = classify_error(e.as_ref());
379                                
380                                // Add error to envelope
381                                envelope = envelope.with_error(
382                                    &e.to_string(),
383                                    error_type,
384                                    Some(&format!("Queue: {}", queue_name))
385                                );
386
387                                if auto_ack {
388                                    // Check if we should retry
389                                    if let Some(retry_cfg) = &retry_config_clone {
390                                        if !envelope.is_retry_exhausted() {
391                                            // Calculate delay and schedule retry
392                                            if let Some(_delay) = retry_cfg.calculate_delay(envelope.metadata.retry_attempt - 1) {
393                                                warn!(
394                                                    "Scheduling retry {} for envelope {} (simple requeue for now)",
395                                                    envelope.metadata.retry_attempt,
396                                                    envelope.metadata.message_id,
397                                                );
398                                                
399                                                // TODO: Implement proper delay-based retry scheduling
400                                                // For now, just nack and requeue
401                                                if let Err(e) = channel_clone
402                                                    .basic_nack(
403                                                        delivery.delivery_tag,
404                                                        lapin::options::BasicNackOptions {
405                                                            multiple: false,
406                                                            requeue: true, // Simple requeue for now
407                                                        },
408                                                    )
409                                                    .await
410                                                {
411                                                    error!("Failed to nack message for retry: {}", e);
412                                                }
413                                            } else {
414                                                // No more retries, send to DLQ
415                                                Self::send_to_dlq(&envelope, retry_cfg, &connection, &queue_name).await;
416                                                
417                                                // ACK original message
418                                                if let Err(e) = channel_clone
419                                                    .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
420                                                    .await
421                                                {
422                                                    error!("Failed to ack message after DLQ: {}", e);
423                                                }
424                                            }
425                                        } else {
426                                            // Retry exhausted, send to DLQ
427                                            warn!("Retry exhausted for envelope {}", envelope.metadata.message_id);
428                                            Self::send_to_dlq(&envelope, retry_cfg, &connection, &queue_name).await;
429                                            
430                                            // ACK original message
431                                            if let Err(e) = channel_clone
432                                                .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
433                                                .await
434                                            {
435                                                error!("Failed to ack message after DLQ: {}", e);
436                                            }
437                                        }
438                                    } else {
439                                        // No retry config, just nack
440                                        if let Err(e) = channel_clone
441                                            .basic_nack(
442                                                delivery.delivery_tag,
443                                                lapin::options::BasicNackOptions {
444                                                    multiple: false,
445                                                    requeue: false,
446                                                },
447                                            )
448                                            .await
449                                        {
450                                            error!("Failed to nack message: {}", e);
451                                        }
452                                    }
453                                }
454                            }
455                        }
456                    }
457                    Err(e) => {
458                        error!("Failed to deserialize message envelope: {}", e);
459                        if auto_ack {
460                            // Reject malformed messages
461                            if let Err(e) = channel_clone
462                                .basic_nack(
463                                    delivery.delivery_tag,
464                                    lapin::options::BasicNackOptions {
465                                        multiple: false,
466                                        requeue: false,
467                                    },
468                                )
469                                .await
470                            {
471                                error!("Failed to nack malformed envelope: {}", e);
472                            }
473                        }
474                    }
475                }
476            });
477        }
478
479        Ok(())
480    }
481
482    /// Send failed message to Dead Letter Queue
483    async fn send_to_dlq<T>(
484        envelope: &MessageEnvelope<T>, 
485        retry_config: &RetryConfig,
486        connection: &Arc<Connection>,
487        queue_name: &str,
488    ) where
489        T: serde::Serialize,
490    {
491        match connection.create_channel().await {
492            Ok(dlq_channel) => {
493                let dlq_name = retry_config.get_dead_letter_queue(queue_name);
494                
495                // Declare DLQ
496                if let Err(e) = dlq_channel
497                    .queue_declare(
498                        &dlq_name,
499                        QueueDeclareOptions {
500                            durable: true,
501                            ..Default::default()
502                        },
503                        FieldTable::default(),
504                    )
505                    .await
506                {
507                    error!("Failed to declare DLQ {}: {}", dlq_name, e);
508                    return;
509                }
510
511                // Publish to DLQ with failure summary
512                let failure_summary = envelope.get_failure_summary();
513                let dlq_payload = serde_json::json!({
514                    "envelope": envelope,
515                    "failure_summary": failure_summary,
516                    "sent_to_dlq_at": chrono::Utc::now(),
517                });
518
519                if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
520                    if let Err(e) = dlq_channel
521                        .basic_publish(
522                            "",
523                            &dlq_name,
524                            lapin::options::BasicPublishOptions::default(),
525                            &payload_bytes,
526                            lapin::BasicProperties::default(),
527                        )
528                        .await
529                    {
530                        error!("Failed to publish to DLQ {}: {}", dlq_name, e);
531                    } else {
532                        warn!("Sent envelope {} to DLQ: {}", envelope.metadata.message_id, failure_summary);
533                    }
534                }
535            }
536            Err(e) => {
537                error!("Failed to create DLQ channel: {}", e);
538            }
539        }
540    }
541}
542
543/// Classify error type based on error message (simplified heuristics)
544fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
545    let error_msg = error.to_string().to_lowercase();
546    
547    if error_msg.contains("timeout") 
548        || error_msg.contains("connection") 
549        || error_msg.contains("network") 
550        || error_msg.contains("temporary") {
551        ErrorType::Transient
552    } else if error_msg.contains("rate limit") 
553        || error_msg.contains("quota") 
554        || error_msg.contains("resource") {
555        ErrorType::Resource
556    } else if error_msg.contains("validation") 
557        || error_msg.contains("authentication") 
558        || error_msg.contains("authorization") 
559        || error_msg.contains("invalid") 
560        || error_msg.contains("bad request") {
561        ErrorType::Permanent
562    } else {
563        ErrorType::Unknown
564    }
565}