rust_rabbit/
consumer.rs

1use crate::{connection::Connection, error::RustRabbitError, retry::RetryConfig};
2use futures_lite::stream::StreamExt;
3use lapin::{
4    options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
5    types::FieldTable,
6    Channel,
7};
8use serde::de::DeserializeOwned;
9use std::future::Future;
10use std::sync::Arc;
11use tokio::sync::Semaphore;
12use tracing::{debug, error};
13
14/// Message wrapper with retry tracking
15#[derive(Debug)]
16pub struct Message<T>
17where
18    T: Clone,
19{
20    pub data: T,
21    pub retry_attempt: u32,
22    tag: u64,
23    channel: Arc<Channel>,
24}
25
26impl<T> Clone for Message<T>
27where
28    T: Clone,
29{
30    fn clone(&self) -> Self {
31        Self {
32            data: self.data.clone(),
33            retry_attempt: self.retry_attempt,
34            tag: self.tag,
35            channel: Arc::clone(&self.channel),
36        }
37    }
38}
39
40impl<T> Message<T>
41where
42    T: Clone,
43{
44    /// Acknowledge the message
45    pub async fn ack(&self) -> Result<(), RustRabbitError> {
46        self.channel
47            .basic_ack(self.tag, BasicAckOptions::default())
48            .await
49            .map_err(RustRabbitError::from)
50    }
51
52    /// Reject and requeue the message
53    pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
54        self.channel
55            .basic_nack(
56                self.tag,
57                lapin::options::BasicNackOptions {
58                    multiple: false,
59                    requeue,
60                },
61            )
62            .await
63            .map_err(RustRabbitError::from)
64    }
65}
66
67/// Consumer configuration builder
68pub struct ConsumerBuilder {
69    connection: Arc<Connection>,
70    queue_name: String,
71    exchange_name: Option<String>,
72    routing_key: Option<String>,
73    retry_config: Option<RetryConfig>,
74    prefetch_count: Option<u16>,
75    auto_ack: bool,
76}
77
78impl ConsumerBuilder {
79    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
80        Self {
81            connection,
82            queue_name: queue_name.into(),
83            exchange_name: None,
84            routing_key: None,
85            retry_config: None,
86            prefetch_count: Some(10),
87            auto_ack: true,
88        }
89    }
90
91    /// Bind to an exchange with routing key
92    pub fn bind_to_exchange(
93        mut self,
94        exchange: impl Into<String>,
95        routing_key: impl Into<String>,
96    ) -> Self {
97        self.exchange_name = Some(exchange.into());
98        self.routing_key = Some(routing_key.into());
99        self
100    }
101
102    /// Configure retry behavior
103    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
104        self.retry_config = Some(retry_config);
105        self
106    }
107
108    /// Set prefetch count
109    pub fn with_prefetch(mut self, count: u16) -> Self {
110        self.prefetch_count = Some(count);
111        self
112    }
113
114    /// Disable auto-acknowledge (manual ack required)
115    pub fn manual_ack(mut self) -> Self {
116        self.auto_ack = false;
117        self
118    }
119
120    /// Build the consumer
121    pub fn build(self) -> Consumer {
122        Consumer {
123            connection: self.connection,
124            queue_name: self.queue_name,
125            exchange_name: self.exchange_name,
126            routing_key: self.routing_key,
127            retry_config: self.retry_config,
128            prefetch_count: self.prefetch_count.unwrap_or(10),
129            auto_ack: self.auto_ack,
130        }
131    }
132}
133
134/// Simplified Consumer for message consumption
135pub struct Consumer {
136    connection: Arc<Connection>,
137    queue_name: String,
138    exchange_name: Option<String>,
139    routing_key: Option<String>,
140    #[allow(dead_code)]
141    retry_config: Option<RetryConfig>,
142    prefetch_count: u16,
143    auto_ack: bool,
144}
145
146impl Consumer {
147    /// Create a new consumer builder
148    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
149        ConsumerBuilder::new(connection, queue_name)
150    }
151
152    /// Start consuming messages
153    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
154    where
155        T: DeserializeOwned + Send + Clone + Sync + 'static,
156        H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
157        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
158    {
159        let channel = self.connection.create_channel().await?;
160
161        // Set prefetch count
162        channel
163            .basic_qos(
164                self.prefetch_count,
165                lapin::options::BasicQosOptions::default(),
166            )
167            .await?;
168
169        // Setup queue and exchange
170        self.setup_infrastructure(&channel).await?;
171
172        // Create consumer
173        let mut consumer = channel
174            .basic_consume(
175                &self.queue_name,
176                "rust-rabbit-consumer",
177                BasicConsumeOptions::default(),
178                FieldTable::default(),
179            )
180            .await?;
181
182        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
183
184        debug!("Started consuming from queue: {}", self.queue_name);
185
186        // Process messages (simplified - no retry for now)
187        while let Some(delivery_result) = consumer.next().await {
188            let delivery = delivery_result?;
189            let permit = semaphore.clone().acquire_owned().await.unwrap();
190            let handler_clone = handler.clone();
191            let auto_ack = self.auto_ack;
192            let channel_clone = Arc::new(channel.clone());
193
194            tokio::spawn(async move {
195                let _permit = permit;
196
197                // Deserialize message
198                match serde_json::from_slice::<T>(&delivery.data) {
199                    Ok(data) => {
200                        let message = Message {
201                            data,
202                            retry_attempt: 0, // Simplified for now
203                            tag: delivery.delivery_tag,
204                            channel: channel_clone.clone(),
205                        };
206
207                        // Process message
208                        match handler_clone(message.clone()).await {
209                            Ok(()) => {
210                                if auto_ack {
211                                    if let Err(e) = message.ack().await {
212                                        error!("Failed to ack message: {}", e);
213                                    }
214                                }
215                                debug!("Message processed successfully");
216                            }
217                            Err(e) => {
218                                error!("Handler error: {}", e);
219                                if auto_ack {
220                                    // Simple reject without retry for now
221                                    if let Err(e) = message.nack(false).await {
222                                        error!("Failed to nack message: {}", e);
223                                    }
224                                }
225                            }
226                        }
227                    }
228                    Err(e) => {
229                        error!("Failed to deserialize message: {}", e);
230                        if auto_ack {
231                            // Reject malformed messages
232                            if let Err(e) = channel_clone
233                                .basic_nack(
234                                    delivery.delivery_tag,
235                                    lapin::options::BasicNackOptions {
236                                        multiple: false,
237                                        requeue: false,
238                                    },
239                                )
240                                .await
241                            {
242                                error!("Failed to nack malformed message: {}", e);
243                            }
244                        }
245                    }
246                }
247            });
248        }
249
250        Ok(())
251    }
252
253    /// Setup queue and exchange infrastructure
254    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
255        // Declare queue
256        channel
257            .queue_declare(
258                &self.queue_name,
259                QueueDeclareOptions {
260                    durable: true,
261                    ..Default::default()
262                },
263                FieldTable::default(),
264            )
265            .await?;
266
267        // Bind to exchange if specified
268        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
269            channel
270                .queue_bind(
271                    &self.queue_name,
272                    exchange,
273                    routing_key,
274                    lapin::options::QueueBindOptions::default(),
275                    FieldTable::default(),
276                )
277                .await?;
278        }
279
280        Ok(())
281    }
282}