server/
consumer.rs

1use azservicebus::receiver::DeadLetterOptions;
2use azservicebus::{ServiceBusClient, ServiceBusReceiver, ServiceBusReceiverOptions};
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::Mutex;
6
7use crate::model::MessageModel;
8
9/// A wrapper around Azure Service Bus receiver for consuming messages from queues.
10///
11/// The Consumer provides a high-level interface for receiving, processing, and managing
12/// messages from Azure Service Bus queues. It supports both peek operations (non-destructive)
13/// and receive operations (which lock messages for processing).
14///
15/// # Thread Safety
16///
17/// The Consumer is thread-safe and can be shared across async tasks. The underlying
18/// receiver is protected by a mutex to ensure safe concurrent access.
19///
20/// # Examples
21///
22/// ```no_run
23/// use quetty_server::consumer::Consumer;
24/// use azservicebus::{ServiceBusReceiver, ServiceBusReceiverOptions};
25///
26/// async fn example(receiver: ServiceBusReceiver) {
27///     let mut consumer = Consumer::new(receiver);
28///
29///     // Peek at messages without consuming them
30///     let messages = consumer.peek_messages(10, None).await?;
31///
32///     // Receive messages for processing
33///     let received = consumer.receive_messages_with_timeout(5, std::time::Duration::from_secs(10)).await?;
34///
35///     // Process and complete messages
36///     for message in &received {
37///         consumer.complete_message(message).await?;
38///     }
39/// }
40/// ```
41#[derive(Debug)]
42pub struct Consumer {
43    receiver: Arc<Mutex<Option<ServiceBusReceiver>>>,
44}
45
46impl PartialEq for Consumer {
47    fn eq(&self, other: &Self) -> bool {
48        Arc::ptr_eq(&self.receiver, &other.receiver)
49    }
50}
51
52impl Consumer {
53    /// Creates a new Consumer wrapping the provided Service Bus receiver.
54    ///
55    /// # Arguments
56    ///
57    /// * `receiver` - The Azure Service Bus receiver to wrap
58    pub fn new(receiver: ServiceBusReceiver) -> Self {
59        Self {
60            receiver: Arc::new(Mutex::new(Some(receiver))),
61        }
62    }
63
64    /// Peeks at messages in the queue without consuming them.
65    ///
66    /// This operation allows you to inspect messages without locking them
67    /// or affecting their delivery count. Useful for browsing queue contents.
68    ///
69    /// # Arguments
70    ///
71    /// * `max_count` - Maximum number of messages to peek at
72    /// * `from_sequence_number` - Optional starting sequence number
73    ///
74    /// # Returns
75    ///
76    /// Vector of MessageModel instances representing the peeked messages
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
81    pub async fn peek_messages(
82        &mut self,
83        max_count: u32,
84        from_sequence_number: Option<i64>,
85    ) -> Result<Vec<MessageModel>, Box<dyn std::error::Error>> {
86        let mut guard = self.receiver.lock().await;
87        if let Some(receiver) = guard.as_mut() {
88            let messages = receiver
89                .peek_messages(max_count, from_sequence_number)
90                .await?;
91            let result = MessageModel::try_convert_messages_collect(messages);
92            Ok(result)
93        } else {
94            Err("Receiver already disposed".into())
95        }
96    }
97
98    /// Receives messages from the queue with a timeout.
99    ///
100    /// This operation locks the received messages for processing. The messages
101    /// must be completed, abandoned, or dead-lettered to release the lock.
102    ///
103    /// # Arguments
104    ///
105    /// * `max_count` - Maximum number of messages to receive
106    /// * `timeout` - Maximum time to wait for messages
107    ///
108    /// # Returns
109    ///
110    /// Vector of received messages that are locked for processing.
111    /// Returns an empty vector if the timeout expires before messages are available.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
116    pub async fn receive_messages_with_timeout(
117        &mut self,
118        max_count: u32,
119        timeout: Duration,
120    ) -> Result<Vec<azservicebus::ServiceBusReceivedMessage>, Box<dyn std::error::Error>> {
121        let mut guard = self.receiver.lock().await;
122        if let Some(receiver) = guard.as_mut() {
123            match tokio::time::timeout(timeout, receiver.receive_messages(max_count)).await {
124                Ok(result) => result.map_err(|e| e.into()),
125                Err(_) => {
126                    // Timeout occurred - return empty vector instead of error
127                    log::debug!(
128                        "receive_messages timed out after {timeout:?}, returning empty result"
129                    );
130                    Ok(Vec::new())
131                }
132            }
133        } else {
134            Err("Receiver already disposed".into())
135        }
136    }
137
138    /// Abandons a received message, returning it to the queue.
139    ///
140    /// The message becomes available for redelivery and its delivery count is incremented.
141    ///
142    /// # Arguments
143    ///
144    /// * `message` - The message to abandon
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
149    pub async fn abandon_message(
150        &mut self,
151        message: &azservicebus::ServiceBusReceivedMessage,
152    ) -> Result<(), Box<dyn std::error::Error>> {
153        let mut guard = self.receiver.lock().await;
154        if let Some(receiver) = guard.as_mut() {
155            receiver.abandon_message(message, None).await?;
156            Ok(())
157        } else {
158            Err("Receiver already disposed".into())
159        }
160    }
161
162    /// Moves a message to the dead letter queue.
163    ///
164    /// Dead lettered messages are permanently removed from the main queue
165    /// and can be inspected in the dead letter queue for debugging.
166    ///
167    /// # Arguments
168    ///
169    /// * `message` - The message to dead letter
170    /// * `reason` - Optional reason for dead lettering
171    /// * `error_description` - Optional error description
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
176    pub async fn dead_letter_message(
177        &mut self,
178        message: &azservicebus::ServiceBusReceivedMessage,
179        reason: Option<String>,
180        error_description: Option<String>,
181    ) -> Result<(), Box<dyn std::error::Error>> {
182        let mut guard = self.receiver.lock().await;
183        if let Some(receiver) = guard.as_mut() {
184            let options = DeadLetterOptions {
185                dead_letter_reason: reason,
186                dead_letter_error_description: error_description,
187                properties_to_modify: None,
188            };
189            receiver.dead_letter_message(message, options).await?;
190            Ok(())
191        } else {
192            Err("Receiver already disposed".into())
193        }
194    }
195
196    /// Completes a message, removing it from the queue.
197    ///
198    /// This indicates successful processing of the message.
199    ///
200    /// # Arguments
201    ///
202    /// * `message` - The message to complete
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
207    pub async fn complete_message(
208        &mut self,
209        message: &azservicebus::ServiceBusReceivedMessage,
210    ) -> Result<(), Box<dyn std::error::Error>> {
211        let mut guard = self.receiver.lock().await;
212        if let Some(receiver) = guard.as_mut() {
213            receiver.complete_message(message).await?;
214            Ok(())
215        } else {
216            Err("Receiver already disposed".into())
217        }
218    }
219
220    /// Completes multiple messages in a batch for better performance.
221    ///
222    /// Attempts to complete all provided messages, logging results for each.
223    /// If any message fails to complete, the operation continues with remaining
224    /// messages and returns an error indicating the failure count.
225    ///
226    /// # Arguments
227    ///
228    /// * `messages` - Slice of messages to complete
229    ///
230    /// # Returns
231    ///
232    /// `Ok(())` if all messages were completed successfully,
233    /// `Err` if any messages failed to complete
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the receiver has been disposed or if any message completion fails
238    pub async fn complete_messages(
239        &mut self,
240        messages: &[azservicebus::ServiceBusReceivedMessage],
241    ) -> Result<(), Box<dyn std::error::Error>> {
242        let mut guard = self.receiver.lock().await;
243        if let Some(receiver) = guard.as_mut() {
244            // Complete messages one by one since batch completion may not be available
245            let mut completed_count = 0;
246            let mut failed_count = 0;
247
248            for message in messages {
249                let message_id = message
250                    .message_id()
251                    .map(|s| s.to_string())
252                    .unwrap_or_else(|| "unknown".to_string());
253                let sequence = message.sequence_number();
254
255                match receiver.complete_message(message).await {
256                    Ok(()) => {
257                        completed_count += 1;
258                        log::debug!(
259                            "Successfully completed message {message_id} (sequence: {sequence})"
260                        );
261                    }
262                    Err(e) => {
263                        failed_count += 1;
264                        log::error!(
265                            "Failed to complete message {message_id} (sequence: {sequence}): {e}"
266                        );
267                        // Don't return early - try to complete as many as possible
268                    }
269                }
270            }
271
272            log::info!(
273                "Batch completion result: {} successful, {} failed out of {} messages",
274                completed_count,
275                failed_count,
276                messages.len()
277            );
278
279            if failed_count > 0 {
280                return Err(format!(
281                    "Failed to complete {} out of {} messages",
282                    failed_count,
283                    messages.len()
284                )
285                .into());
286            }
287
288            Ok(())
289        } else {
290            Err("Receiver already disposed".into())
291        }
292    }
293
294    /// Abandons multiple messages in a batch.
295    ///
296    /// All provided messages are returned to the queue for redelivery.
297    ///
298    /// # Arguments
299    ///
300    /// * `messages` - Slice of messages to abandon
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the receiver has been disposed or if any abandon operation fails
305    pub async fn abandon_messages(
306        &mut self,
307        messages: &[azservicebus::ServiceBusReceivedMessage],
308    ) -> Result<(), Box<dyn std::error::Error>> {
309        let mut guard = self.receiver.lock().await;
310        if let Some(receiver) = guard.as_mut() {
311            for message in messages {
312                receiver.abandon_message(message, None).await?;
313            }
314            Ok(())
315        } else {
316            Err("Receiver already disposed".into())
317        }
318    }
319
320    /// Renews the lock on a received message to extend processing time.
321    ///
322    /// This prevents the message from becoming available for redelivery
323    /// while it's still being processed.
324    ///
325    /// # Arguments
326    ///
327    /// * `message` - The message whose lock should be renewed
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the receiver has been disposed or if the lock renewal fails
332    pub async fn renew_message_lock(
333        &mut self,
334        message: &mut azservicebus::ServiceBusReceivedMessage,
335    ) -> Result<(), Box<dyn std::error::Error>> {
336        let mut guard = self.receiver.lock().await;
337        if let Some(receiver) = guard.as_mut() {
338            receiver.renew_message_lock(message).await?;
339            Ok(())
340        } else {
341            Err("Receiver already disposed".into())
342        }
343    }
344
345    /// Renews locks on multiple messages.
346    ///
347    /// Attempts to renew locks for all provided messages, logging results.
348    /// Continues processing all messages even if some renewals fail.
349    ///
350    /// # Arguments
351    ///
352    /// * `messages` - Mutable slice of messages whose locks should be renewed
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if the receiver has been disposed. Lock renewal failures
357    /// are logged but do not cause the method to return an error.
358    pub async fn renew_message_locks(
359        &mut self,
360        messages: &mut [azservicebus::ServiceBusReceivedMessage],
361    ) -> Result<(), Box<dyn std::error::Error>> {
362        let mut guard = self.receiver.lock().await;
363        if let Some(receiver) = guard.as_mut() {
364            let mut renewed_count = 0;
365            let mut failed_count = 0;
366
367            for message in messages.iter_mut() {
368                let message_id = message
369                    .message_id()
370                    .map(|s| s.to_string())
371                    .unwrap_or_else(|| "unknown".to_string());
372                let sequence = message.sequence_number();
373
374                match receiver.renew_message_lock(message).await {
375                    Ok(()) => {
376                        renewed_count += 1;
377                        log::debug!(
378                            "Successfully renewed lock for message {message_id} (sequence: {sequence})"
379                        );
380                    }
381                    Err(e) => {
382                        failed_count += 1;
383                        log::warn!(
384                            "Failed to renew lock for message {message_id} (sequence: {sequence}): {e}"
385                        );
386                        // Continue trying to renew other locks
387                    }
388                }
389            }
390
391            log::debug!(
392                "Lock renewal result: {} successful, {} failed out of {} messages",
393                renewed_count,
394                failed_count,
395                messages.len()
396            );
397
398            if failed_count > 0 {
399                log::warn!(
400                    "Failed to renew locks for {} out of {} messages - some may expire during processing",
401                    failed_count,
402                    messages.len()
403                );
404            }
405
406            Ok(())
407        } else {
408            Err("Receiver already disposed".into())
409        }
410    }
411
412    /// Disposes the underlying Service Bus receiver, releasing all resources.
413    ///
414    /// After disposal, all other operations on this Consumer will fail.
415    ///
416    /// # Errors
417    ///
418    /// Returns an error if the disposal operation fails
419    pub async fn dispose(&self) -> Result<(), Box<dyn std::error::Error>> {
420        let mut guard = self.receiver.lock().await;
421        if let Some(receiver) = guard.take() {
422            receiver.dispose().await?;
423        }
424        Ok(())
425    }
426
427    /// Receives deferred messages by their sequence numbers.
428    ///
429    /// This allows operations (delete/complete) on messages that were previously
430    /// deferred without re-activating them in the main queue first.
431    ///
432    /// # Arguments
433    ///
434    /// * `sequence_numbers` - Array of sequence numbers for the deferred messages
435    ///
436    /// # Returns
437    ///
438    /// Vector of received deferred messages that are locked for processing
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the receiver has been disposed or if the Service Bus operation fails
443    pub async fn receive_deferred_messages(
444        &mut self,
445        sequence_numbers: &[i64],
446    ) -> Result<
447        Vec<azservicebus::ServiceBusReceivedMessage>,
448        Box<dyn std::error::Error + Send + Sync>,
449    > {
450        let mut guard = self.receiver.lock().await;
451        if let Some(receiver) = guard.as_mut() {
452            let messages = receiver
453                .receive_deferred_messages(sequence_numbers.to_vec())
454                .await?;
455            Ok(messages)
456        } else {
457            Err("Receiver already disposed".into())
458        }
459    }
460}
461
462/// Extension trait for ServiceBusClient to create Consumer instances.
463///
464/// This trait provides a convenient method to create a Consumer directly
465/// from a ServiceBusClient without manually creating the receiver first.
466pub trait ServiceBusClientExt {
467    /// Creates a Consumer for the specified queue.
468    ///
469    /// # Arguments
470    ///
471    /// * `queue_name` - Name of the queue to create a consumer for
472    /// * `options` - Configuration options for the receiver
473    ///
474    /// # Returns
475    ///
476    /// A Consumer instance configured for the specified queue
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if the receiver creation fails
481    fn create_consumer_for_queue(
482        &mut self,
483        queue_name: impl Into<String> + Send,
484        options: ServiceBusReceiverOptions,
485    ) -> impl Future<Output = Result<Consumer, azure_core::Error>>;
486}
487
488impl<RP> ServiceBusClientExt for ServiceBusClient<RP>
489where
490    RP: azservicebus::ServiceBusRetryPolicy
491        + From<azservicebus::ServiceBusRetryOptions>
492        + Send
493        + Sync
494        + 'static,
495{
496    /// Creates a Consumer for the specified queue using this ServiceBusClient.
497    ///
498    /// This method handles the creation of the underlying receiver and wraps it
499    /// in a Consumer instance for easier usage.
500    async fn create_consumer_for_queue(
501        &mut self,
502        queue_name: impl Into<String> + Send,
503        options: ServiceBusReceiverOptions,
504    ) -> Result<Consumer, azure_core::Error> {
505        let receiver = self
506            .create_receiver_for_queue(queue_name, options)
507            .await
508            .map_err(|e| {
509                azure_core::Error::message(
510                    azure_core::error::ErrorKind::Other,
511                    format!("Receiver error: {e}"),
512                )
513            })?;
514
515        Ok(Consumer::new(receiver))
516    }
517}