Consumer

Struct Consumer 

Source
pub struct Consumer { /* private fields */ }
Expand description

A wrapper around Azure Service Bus receiver for consuming messages from queues.

The Consumer provides a high-level interface for receiving, processing, and managing messages from Azure Service Bus queues. It supports both peek operations (non-destructive) and receive operations (which lock messages for processing).

§Thread Safety

The Consumer is thread-safe and can be shared across async tasks. The underlying receiver is protected by a mutex to ensure safe concurrent access.

§Examples

use quetty_server::consumer::Consumer;
use azservicebus::{ServiceBusReceiver, ServiceBusReceiverOptions};

async fn example(receiver: ServiceBusReceiver) {
    let mut consumer = Consumer::new(receiver);

    // Peek at messages without consuming them
    let messages = consumer.peek_messages(10, None).await?;

    // Receive messages for processing
    let received = consumer.receive_messages_with_timeout(5, std::time::Duration::from_secs(10)).await?;

    // Process and complete messages
    for message in &received {
        consumer.complete_message(message).await?;
    }
}

Implementations§

Source§

impl Consumer

Source

pub fn new(receiver: ServiceBusReceiver) -> Self

Creates a new Consumer wrapping the provided Service Bus receiver.

§Arguments
  • receiver - The Azure Service Bus receiver to wrap
Source

pub async fn peek_messages( &mut self, max_count: u32, from_sequence_number: Option<i64>, ) -> Result<Vec<MessageModel>, Box<dyn Error>>

Peeks at messages in the queue without consuming them.

This operation allows you to inspect messages without locking them or affecting their delivery count. Useful for browsing queue contents.

§Arguments
  • max_count - Maximum number of messages to peek at
  • from_sequence_number - Optional starting sequence number
§Returns

Vector of MessageModel instances representing the peeked messages

§Errors

Returns an error if the receiver has been disposed or if the Service Bus operation fails

Source

pub async fn receive_messages_with_timeout( &mut self, max_count: u32, timeout: Duration, ) -> Result<Vec<ServiceBusReceivedMessage>, Box<dyn Error>>

Receives messages from the queue with a timeout.

This operation locks the received messages for processing. The messages must be completed, abandoned, or dead-lettered to release the lock.

§Arguments
  • max_count - Maximum number of messages to receive
  • timeout - Maximum time to wait for messages
§Returns

Vector of received messages that are locked for processing. Returns an empty vector if the timeout expires before messages are available.

§Errors

Returns an error if the receiver has been disposed or if the Service Bus operation fails

Source

pub async fn abandon_message( &mut self, message: &ServiceBusReceivedMessage, ) -> Result<(), Box<dyn Error>>

Abandons a received message, returning it to the queue.

The message becomes available for redelivery and its delivery count is incremented.

§Arguments
  • message - The message to abandon
§Errors

Returns an error if the receiver has been disposed or if the Service Bus operation fails

Source

pub async fn dead_letter_message( &mut self, message: &ServiceBusReceivedMessage, reason: Option<String>, error_description: Option<String>, ) -> Result<(), Box<dyn Error>>

Moves a message to the dead letter queue.

Dead lettered messages are permanently removed from the main queue and can be inspected in the dead letter queue for debugging.

§Arguments
  • message - The message to dead letter
  • reason - Optional reason for dead lettering
  • error_description - Optional error description
§Errors

Returns an error if the receiver has been disposed or if the Service Bus operation fails

Source

pub async fn complete_message( &mut self, message: &ServiceBusReceivedMessage, ) -> Result<(), Box<dyn Error>>

Completes a message, removing it from the queue.

This indicates successful processing of the message.

§Arguments
  • message - The message to complete
§Errors

Returns an error if the receiver has been disposed or if the Service Bus operation fails

Source

pub async fn complete_messages( &mut self, messages: &[ServiceBusReceivedMessage], ) -> Result<(), Box<dyn Error>>

Completes multiple messages in a batch for better performance.

Attempts to complete all provided messages, logging results for each. If any message fails to complete, the operation continues with remaining messages and returns an error indicating the failure count.

§Arguments
  • messages - Slice of messages to complete
§Returns

Ok(()) if all messages were completed successfully, Err if any messages failed to complete

§Errors

Returns an error if the receiver has been disposed or if any message completion fails

Source

pub async fn abandon_messages( &mut self, messages: &[ServiceBusReceivedMessage], ) -> Result<(), Box<dyn Error>>

Abandons multiple messages in a batch.

All provided messages are returned to the queue for redelivery.

§Arguments
  • messages - Slice of messages to abandon
§Errors

Returns an error if the receiver has been disposed or if any abandon operation fails

Source

pub async fn renew_message_lock( &mut self, message: &mut ServiceBusReceivedMessage, ) -> Result<(), Box<dyn Error>>

Renews the lock on a received message to extend processing time.

This prevents the message from becoming available for redelivery while it’s still being processed.

§Arguments
  • message - The message whose lock should be renewed
§Errors

Returns an error if the receiver has been disposed or if the lock renewal fails

Source

pub async fn renew_message_locks( &mut self, messages: &mut [ServiceBusReceivedMessage], ) -> Result<(), Box<dyn Error>>

Renews locks on multiple messages.

Attempts to renew locks for all provided messages, logging results. Continues processing all messages even if some renewals fail.

§Arguments
  • messages - Mutable slice of messages whose locks should be renewed
§Errors

Returns an error if the receiver has been disposed. Lock renewal failures are logged but do not cause the method to return an error.

Source

pub async fn dispose(&self) -> Result<(), Box<dyn Error>>

Disposes the underlying Service Bus receiver, releasing all resources.

After disposal, all other operations on this Consumer will fail.

§Errors

Returns an error if the disposal operation fails

Source

pub async fn receive_deferred_messages( &mut self, sequence_numbers: &[i64], ) -> Result<Vec<ServiceBusReceivedMessage>, Box<dyn Error + Send + Sync>>

Receives deferred messages by their sequence numbers.

This allows operations (delete/complete) on messages that were previously deferred without re-activating them in the main queue first.

§Arguments
  • sequence_numbers - Array of sequence numbers for the deferred messages
§Returns

Vector of received deferred messages that are locked for processing

§Errors

Returns an error if the receiver has been disposed or if the Service Bus operation fails

Trait Implementations§

Source§

impl Debug for Consumer

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl PartialEq for Consumer

Source§

fn eq(&self, other: &Self) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> SendBound for T
where T: Send,