1use crate::{connection::Connection, error::RustRabbitError, retry::RetryConfig};
2use futures_lite::stream::StreamExt;
3use lapin::{
4 options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
5 types::FieldTable,
6 Channel,
7};
8use serde::de::DeserializeOwned;
9use std::future::Future;
10use std::sync::Arc;
11use tokio::sync::Semaphore;
12use tracing::{debug, error};
13
14#[derive(Debug)]
16pub struct Message<T>
17where
18 T: Clone,
19{
20 pub data: T,
21 pub retry_attempt: u32,
22 tag: u64,
23 channel: Arc<Channel>,
24}
25
26impl<T> Clone for Message<T>
27where
28 T: Clone,
29{
30 fn clone(&self) -> Self {
31 Self {
32 data: self.data.clone(),
33 retry_attempt: self.retry_attempt,
34 tag: self.tag,
35 channel: Arc::clone(&self.channel),
36 }
37 }
38}
39
40impl<T> Message<T>
41where
42 T: Clone,
43{
44 pub async fn ack(&self) -> Result<(), RustRabbitError> {
46 self.channel
47 .basic_ack(self.tag, BasicAckOptions::default())
48 .await
49 .map_err(RustRabbitError::from)
50 }
51
52 pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
54 self.channel
55 .basic_nack(
56 self.tag,
57 lapin::options::BasicNackOptions {
58 multiple: false,
59 requeue,
60 },
61 )
62 .await
63 .map_err(RustRabbitError::from)
64 }
65}
66
67pub struct ConsumerBuilder {
69 connection: Arc<Connection>,
70 queue_name: String,
71 exchange_name: Option<String>,
72 routing_key: Option<String>,
73 retry_config: Option<RetryConfig>,
74 prefetch_count: Option<u16>,
75 auto_ack: bool,
76}
77
78impl ConsumerBuilder {
79 pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
80 Self {
81 connection,
82 queue_name: queue_name.into(),
83 exchange_name: None,
84 routing_key: None,
85 retry_config: None,
86 prefetch_count: Some(10),
87 auto_ack: true,
88 }
89 }
90
91 pub fn bind_to_exchange(
93 mut self,
94 exchange: impl Into<String>,
95 routing_key: impl Into<String>,
96 ) -> Self {
97 self.exchange_name = Some(exchange.into());
98 self.routing_key = Some(routing_key.into());
99 self
100 }
101
102 pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
104 self.retry_config = Some(retry_config);
105 self
106 }
107
108 pub fn with_prefetch(mut self, count: u16) -> Self {
110 self.prefetch_count = Some(count);
111 self
112 }
113
114 pub fn manual_ack(mut self) -> Self {
116 self.auto_ack = false;
117 self
118 }
119
120 pub fn build(self) -> Consumer {
122 Consumer {
123 connection: self.connection,
124 queue_name: self.queue_name,
125 exchange_name: self.exchange_name,
126 routing_key: self.routing_key,
127 retry_config: self.retry_config,
128 prefetch_count: self.prefetch_count.unwrap_or(10),
129 auto_ack: self.auto_ack,
130 }
131 }
132}
133
134pub struct Consumer {
136 connection: Arc<Connection>,
137 queue_name: String,
138 exchange_name: Option<String>,
139 routing_key: Option<String>,
140 #[allow(dead_code)]
141 retry_config: Option<RetryConfig>,
142 prefetch_count: u16,
143 auto_ack: bool,
144}
145
146impl Consumer {
147 pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
149 ConsumerBuilder::new(connection, queue_name)
150 }
151
152 pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
154 where
155 T: DeserializeOwned + Send + Clone + Sync + 'static,
156 H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
157 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
158 {
159 let channel = self.connection.create_channel().await?;
160
161 channel
163 .basic_qos(
164 self.prefetch_count,
165 lapin::options::BasicQosOptions::default(),
166 )
167 .await?;
168
169 self.setup_infrastructure(&channel).await?;
171
172 let mut consumer = channel
174 .basic_consume(
175 &self.queue_name,
176 "rust-rabbit-consumer",
177 BasicConsumeOptions::default(),
178 FieldTable::default(),
179 )
180 .await?;
181
182 let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
183
184 debug!("Started consuming from queue: {}", self.queue_name);
185
186 while let Some(delivery_result) = consumer.next().await {
188 let delivery = delivery_result?;
189 let permit = semaphore.clone().acquire_owned().await.unwrap();
190 let handler_clone = handler.clone();
191 let auto_ack = self.auto_ack;
192 let channel_clone = Arc::new(channel.clone());
193
194 tokio::spawn(async move {
195 let _permit = permit;
196
197 match serde_json::from_slice::<T>(&delivery.data) {
199 Ok(data) => {
200 let message = Message {
201 data,
202 retry_attempt: 0, tag: delivery.delivery_tag,
204 channel: channel_clone.clone(),
205 };
206
207 match handler_clone(message.clone()).await {
209 Ok(()) => {
210 if auto_ack {
211 if let Err(e) = message.ack().await {
212 error!("Failed to ack message: {}", e);
213 }
214 }
215 debug!("Message processed successfully");
216 }
217 Err(e) => {
218 error!("Handler error: {}", e);
219 if auto_ack {
220 if let Err(e) = message.nack(false).await {
222 error!("Failed to nack message: {}", e);
223 }
224 }
225 }
226 }
227 }
228 Err(e) => {
229 error!("Failed to deserialize message: {}", e);
230 if auto_ack {
231 if let Err(e) = channel_clone
233 .basic_nack(
234 delivery.delivery_tag,
235 lapin::options::BasicNackOptions {
236 multiple: false,
237 requeue: false,
238 },
239 )
240 .await
241 {
242 error!("Failed to nack malformed message: {}", e);
243 }
244 }
245 }
246 }
247 });
248 }
249
250 Ok(())
251 }
252
253 async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
255 channel
257 .queue_declare(
258 &self.queue_name,
259 QueueDeclareOptions {
260 durable: true,
261 ..Default::default()
262 },
263 FieldTable::default(),
264 )
265 .await?;
266
267 if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
269 channel
270 .queue_bind(
271 &self.queue_name,
272 exchange,
273 routing_key,
274 lapin::options::QueueBindOptions::default(),
275 FieldTable::default(),
276 )
277 .await?;
278 }
279
280 Ok(())
281 }
282}