server/consumer.rs
1use azservicebus::receiver::DeadLetterOptions;
2use azservicebus::{ServiceBusClient, ServiceBusReceiver, ServiceBusReceiverOptions};
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::Mutex;
6
7use crate::model::MessageModel;
8
9/// A wrapper around Azure Service Bus receiver for consuming messages from queues.
10///
11/// The Consumer provides a high-level interface for receiving, processing, and managing
12/// messages from Azure Service Bus queues. It supports both peek operations (non-destructive)
13/// and receive operations (which lock messages for processing).
14///
15/// # Thread Safety
16///
17/// The Consumer is thread-safe and can be shared across async tasks. The underlying
18/// receiver is protected by a mutex to ensure safe concurrent access.
19///
20/// # Examples
21///
22/// ```no_run
23/// use quetty_server::consumer::Consumer;
24/// use azservicebus::{ServiceBusReceiver, ServiceBusReceiverOptions};
25///
26/// async fn example(receiver: ServiceBusReceiver) {
27/// let mut consumer = Consumer::new(receiver);
28///
29/// // Peek at messages without consuming them
30/// let messages = consumer.peek_messages(10, None).await?;
31///
32/// // Receive messages for processing
33/// let received = consumer.receive_messages_with_timeout(5, std::time::Duration::from_secs(10)).await?;
34///
35/// // Process and complete messages
36/// for message in &received {
37/// consumer.complete_message(message).await?;
38/// }
39/// }
40/// ```
41#[derive(Debug)]
42pub struct Consumer {
43 receiver: Arc<Mutex<Option<ServiceBusReceiver>>>,
44}
45
46impl PartialEq for Consumer {
47 fn eq(&self, other: &Self) -> bool {
48 Arc::ptr_eq(&self.receiver, &other.receiver)
49 }
50}
51
52impl Consumer {
53 /// Creates a new Consumer wrapping the provided Service Bus receiver.
54 ///
55 /// # Arguments
56 ///
57 /// * `receiver` - The Azure Service Bus receiver to wrap
58 pub fn new(receiver: ServiceBusReceiver) -> Self {
59 Self {
60 receiver: Arc::new(Mutex::new(Some(receiver))),
61 }
62 }
63
64 /// Peeks at messages in the queue without consuming them.
65 ///
66 /// This operation allows you to inspect messages without locking them
67 /// or affecting their delivery count. Useful for browsing queue contents.
68 ///
69 /// # Arguments
70 ///
71 /// * `max_count` - Maximum number of messages to peek at
72 /// * `from_sequence_number` - Optional starting sequence number
73 ///
74 /// # Returns
75 ///
76 /// Vector of MessageModel instances representing the peeked messages
77 ///
78 /// # Errors
79 ///
80 /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
81 pub async fn peek_messages(
82 &mut self,
83 max_count: u32,
84 from_sequence_number: Option<i64>,
85 ) -> Result<Vec<MessageModel>, Box<dyn std::error::Error>> {
86 let mut guard = self.receiver.lock().await;
87 if let Some(receiver) = guard.as_mut() {
88 let messages = receiver
89 .peek_messages(max_count, from_sequence_number)
90 .await?;
91 let result = MessageModel::try_convert_messages_collect(messages);
92 Ok(result)
93 } else {
94 Err("Receiver already disposed".into())
95 }
96 }
97
98 /// Receives messages from the queue with a timeout.
99 ///
100 /// This operation locks the received messages for processing. The messages
101 /// must be completed, abandoned, or dead-lettered to release the lock.
102 ///
103 /// # Arguments
104 ///
105 /// * `max_count` - Maximum number of messages to receive
106 /// * `timeout` - Maximum time to wait for messages
107 ///
108 /// # Returns
109 ///
110 /// Vector of received messages that are locked for processing.
111 /// Returns an empty vector if the timeout expires before messages are available.
112 ///
113 /// # Errors
114 ///
115 /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
116 pub async fn receive_messages_with_timeout(
117 &mut self,
118 max_count: u32,
119 timeout: Duration,
120 ) -> Result<Vec<azservicebus::ServiceBusReceivedMessage>, Box<dyn std::error::Error>> {
121 let mut guard = self.receiver.lock().await;
122 if let Some(receiver) = guard.as_mut() {
123 match tokio::time::timeout(timeout, receiver.receive_messages(max_count)).await {
124 Ok(result) => result.map_err(|e| e.into()),
125 Err(_) => {
126 // Timeout occurred - return empty vector instead of error
127 log::debug!(
128 "receive_messages timed out after {timeout:?}, returning empty result"
129 );
130 Ok(Vec::new())
131 }
132 }
133 } else {
134 Err("Receiver already disposed".into())
135 }
136 }
137
138 /// Abandons a received message, returning it to the queue.
139 ///
140 /// The message becomes available for redelivery and its delivery count is incremented.
141 ///
142 /// # Arguments
143 ///
144 /// * `message` - The message to abandon
145 ///
146 /// # Errors
147 ///
148 /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
149 pub async fn abandon_message(
150 &mut self,
151 message: &azservicebus::ServiceBusReceivedMessage,
152 ) -> Result<(), Box<dyn std::error::Error>> {
153 let mut guard = self.receiver.lock().await;
154 if let Some(receiver) = guard.as_mut() {
155 receiver.abandon_message(message, None).await?;
156 Ok(())
157 } else {
158 Err("Receiver already disposed".into())
159 }
160 }
161
162 /// Moves a message to the dead letter queue.
163 ///
164 /// Dead lettered messages are permanently removed from the main queue
165 /// and can be inspected in the dead letter queue for debugging.
166 ///
167 /// # Arguments
168 ///
169 /// * `message` - The message to dead letter
170 /// * `reason` - Optional reason for dead lettering
171 /// * `error_description` - Optional error description
172 ///
173 /// # Errors
174 ///
175 /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
176 pub async fn dead_letter_message(
177 &mut self,
178 message: &azservicebus::ServiceBusReceivedMessage,
179 reason: Option<String>,
180 error_description: Option<String>,
181 ) -> Result<(), Box<dyn std::error::Error>> {
182 let mut guard = self.receiver.lock().await;
183 if let Some(receiver) = guard.as_mut() {
184 let options = DeadLetterOptions {
185 dead_letter_reason: reason,
186 dead_letter_error_description: error_description,
187 properties_to_modify: None,
188 };
189 receiver.dead_letter_message(message, options).await?;
190 Ok(())
191 } else {
192 Err("Receiver already disposed".into())
193 }
194 }
195
196 /// Completes a message, removing it from the queue.
197 ///
198 /// This indicates successful processing of the message.
199 ///
200 /// # Arguments
201 ///
202 /// * `message` - The message to complete
203 ///
204 /// # Errors
205 ///
206 /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
207 pub async fn complete_message(
208 &mut self,
209 message: &azservicebus::ServiceBusReceivedMessage,
210 ) -> Result<(), Box<dyn std::error::Error>> {
211 let mut guard = self.receiver.lock().await;
212 if let Some(receiver) = guard.as_mut() {
213 receiver.complete_message(message).await?;
214 Ok(())
215 } else {
216 Err("Receiver already disposed".into())
217 }
218 }
219
220 /// Completes multiple messages in a batch for better performance.
221 ///
222 /// Attempts to complete all provided messages, logging results for each.
223 /// If any message fails to complete, the operation continues with remaining
224 /// messages and returns an error indicating the failure count.
225 ///
226 /// # Arguments
227 ///
228 /// * `messages` - Slice of messages to complete
229 ///
230 /// # Returns
231 ///
232 /// `Ok(())` if all messages were completed successfully,
233 /// `Err` if any messages failed to complete
234 ///
235 /// # Errors
236 ///
237 /// Returns an error if the receiver has been disposed or if any message completion fails
238 pub async fn complete_messages(
239 &mut self,
240 messages: &[azservicebus::ServiceBusReceivedMessage],
241 ) -> Result<(), Box<dyn std::error::Error>> {
242 let mut guard = self.receiver.lock().await;
243 if let Some(receiver) = guard.as_mut() {
244 // Complete messages one by one since batch completion may not be available
245 let mut completed_count = 0;
246 let mut failed_count = 0;
247
248 for message in messages {
249 let message_id = message
250 .message_id()
251 .map(|s| s.to_string())
252 .unwrap_or_else(|| "unknown".to_string());
253 let sequence = message.sequence_number();
254
255 match receiver.complete_message(message).await {
256 Ok(()) => {
257 completed_count += 1;
258 log::debug!(
259 "Successfully completed message {message_id} (sequence: {sequence})"
260 );
261 }
262 Err(e) => {
263 failed_count += 1;
264 log::error!(
265 "Failed to complete message {message_id} (sequence: {sequence}): {e}"
266 );
267 // Don't return early - try to complete as many as possible
268 }
269 }
270 }
271
272 log::info!(
273 "Batch completion result: {} successful, {} failed out of {} messages",
274 completed_count,
275 failed_count,
276 messages.len()
277 );
278
279 if failed_count > 0 {
280 return Err(format!(
281 "Failed to complete {} out of {} messages",
282 failed_count,
283 messages.len()
284 )
285 .into());
286 }
287
288 Ok(())
289 } else {
290 Err("Receiver already disposed".into())
291 }
292 }
293
294 /// Abandons multiple messages in a batch.
295 ///
296 /// All provided messages are returned to the queue for redelivery.
297 ///
298 /// # Arguments
299 ///
300 /// * `messages` - Slice of messages to abandon
301 ///
302 /// # Errors
303 ///
304 /// Returns an error if the receiver has been disposed or if any abandon operation fails
305 pub async fn abandon_messages(
306 &mut self,
307 messages: &[azservicebus::ServiceBusReceivedMessage],
308 ) -> Result<(), Box<dyn std::error::Error>> {
309 let mut guard = self.receiver.lock().await;
310 if let Some(receiver) = guard.as_mut() {
311 for message in messages {
312 receiver.abandon_message(message, None).await?;
313 }
314 Ok(())
315 } else {
316 Err("Receiver already disposed".into())
317 }
318 }
319
320 /// Renews the lock on a received message to extend processing time.
321 ///
322 /// This prevents the message from becoming available for redelivery
323 /// while it's still being processed.
324 ///
325 /// # Arguments
326 ///
327 /// * `message` - The message whose lock should be renewed
328 ///
329 /// # Errors
330 ///
331 /// Returns an error if the receiver has been disposed or if the lock renewal fails
332 pub async fn renew_message_lock(
333 &mut self,
334 message: &mut azservicebus::ServiceBusReceivedMessage,
335 ) -> Result<(), Box<dyn std::error::Error>> {
336 let mut guard = self.receiver.lock().await;
337 if let Some(receiver) = guard.as_mut() {
338 receiver.renew_message_lock(message).await?;
339 Ok(())
340 } else {
341 Err("Receiver already disposed".into())
342 }
343 }
344
345 /// Renews locks on multiple messages.
346 ///
347 /// Attempts to renew locks for all provided messages, logging results.
348 /// Continues processing all messages even if some renewals fail.
349 ///
350 /// # Arguments
351 ///
352 /// * `messages` - Mutable slice of messages whose locks should be renewed
353 ///
354 /// # Errors
355 ///
356 /// Returns an error if the receiver has been disposed. Lock renewal failures
357 /// are logged but do not cause the method to return an error.
358 pub async fn renew_message_locks(
359 &mut self,
360 messages: &mut [azservicebus::ServiceBusReceivedMessage],
361 ) -> Result<(), Box<dyn std::error::Error>> {
362 let mut guard = self.receiver.lock().await;
363 if let Some(receiver) = guard.as_mut() {
364 let mut renewed_count = 0;
365 let mut failed_count = 0;
366
367 for message in messages.iter_mut() {
368 let message_id = message
369 .message_id()
370 .map(|s| s.to_string())
371 .unwrap_or_else(|| "unknown".to_string());
372 let sequence = message.sequence_number();
373
374 match receiver.renew_message_lock(message).await {
375 Ok(()) => {
376 renewed_count += 1;
377 log::debug!(
378 "Successfully renewed lock for message {message_id} (sequence: {sequence})"
379 );
380 }
381 Err(e) => {
382 failed_count += 1;
383 log::warn!(
384 "Failed to renew lock for message {message_id} (sequence: {sequence}): {e}"
385 );
386 // Continue trying to renew other locks
387 }
388 }
389 }
390
391 log::debug!(
392 "Lock renewal result: {} successful, {} failed out of {} messages",
393 renewed_count,
394 failed_count,
395 messages.len()
396 );
397
398 if failed_count > 0 {
399 log::warn!(
400 "Failed to renew locks for {} out of {} messages - some may expire during processing",
401 failed_count,
402 messages.len()
403 );
404 }
405
406 Ok(())
407 } else {
408 Err("Receiver already disposed".into())
409 }
410 }
411
412 /// Disposes the underlying Service Bus receiver, releasing all resources.
413 ///
414 /// After disposal, all other operations on this Consumer will fail.
415 ///
416 /// # Errors
417 ///
418 /// Returns an error if the disposal operation fails
419 pub async fn dispose(&self) -> Result<(), Box<dyn std::error::Error>> {
420 let mut guard = self.receiver.lock().await;
421 if let Some(receiver) = guard.take() {
422 receiver.dispose().await?;
423 }
424 Ok(())
425 }
426
427 /// Receives deferred messages by their sequence numbers.
428 ///
429 /// This allows operations (delete/complete) on messages that were previously
430 /// deferred without re-activating them in the main queue first.
431 ///
432 /// # Arguments
433 ///
434 /// * `sequence_numbers` - Array of sequence numbers for the deferred messages
435 ///
436 /// # Returns
437 ///
438 /// Vector of received deferred messages that are locked for processing
439 ///
440 /// # Errors
441 ///
442 /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
443 pub async fn receive_deferred_messages(
444 &mut self,
445 sequence_numbers: &[i64],
446 ) -> Result<
447 Vec<azservicebus::ServiceBusReceivedMessage>,
448 Box<dyn std::error::Error + Send + Sync>,
449 > {
450 let mut guard = self.receiver.lock().await;
451 if let Some(receiver) = guard.as_mut() {
452 let messages = receiver
453 .receive_deferred_messages(sequence_numbers.to_vec())
454 .await?;
455 Ok(messages)
456 } else {
457 Err("Receiver already disposed".into())
458 }
459 }
460}
461
462/// Extension trait for ServiceBusClient to create Consumer instances.
463///
464/// This trait provides a convenient method to create a Consumer directly
465/// from a ServiceBusClient without manually creating the receiver first.
466pub trait ServiceBusClientExt {
467 /// Creates a Consumer for the specified queue.
468 ///
469 /// # Arguments
470 ///
471 /// * `queue_name` - Name of the queue to create a consumer for
472 /// * `options` - Configuration options for the receiver
473 ///
474 /// # Returns
475 ///
476 /// A Consumer instance configured for the specified queue
477 ///
478 /// # Errors
479 ///
480 /// Returns an error if the receiver creation fails
481 fn create_consumer_for_queue(
482 &mut self,
483 queue_name: impl Into<String> + Send,
484 options: ServiceBusReceiverOptions,
485 ) -> impl Future<Output = Result<Consumer, azure_core::Error>>;
486}
487
488impl<RP> ServiceBusClientExt for ServiceBusClient<RP>
489where
490 RP: azservicebus::ServiceBusRetryPolicy
491 + From<azservicebus::ServiceBusRetryOptions>
492 + Send
493 + Sync
494 + 'static,
495{
496 /// Creates a Consumer for the specified queue using this ServiceBusClient.
497 ///
498 /// This method handles the creation of the underlying receiver and wraps it
499 /// in a Consumer instance for easier usage.
500 async fn create_consumer_for_queue(
501 &mut self,
502 queue_name: impl Into<String> + Send,
503 options: ServiceBusReceiverOptions,
504 ) -> Result<Consumer, azure_core::Error> {
505 let receiver = self
506 .create_receiver_for_queue(queue_name, options)
507 .await
508 .map_err(|e| {
509 azure_core::Error::message(
510 azure_core::error::ErrorKind::Other,
511 format!("Receiver error: {e}"),
512 )
513 })?;
514
515 Ok(Consumer::new(receiver))
516 }
517}