[−][src]Struct lapin::Consumer
Continuously consumes message from a Queue.
A consumer represents a stream of messages created from the basic.consume AMQP command. It continuously receives messages from the queue, as opposed to the basic.get command, which retrieves only a single message.
A consumer is obtained by calling Channel::basic_consume
with the queue name.
New messages from this consumer can be accessed by obtaining the iterator from the consumer.
This iterator returns new messages and the associated channel in the form of a
DeliveryResult
for as long as the consumer is subscribed to the queue.
It is also possible to set a delegate to be spawned via set_delegate
.
Message acknowledgment
There are two ways for acknowledging a message:
- If the flag
BasicConsumeOptions::no_ack
is set totrue
while obtaining the consumer fromChannel::basic_consume
, the server implicitely acknowledges each message after it has been sent. - If the flag
BasicConsumeOptions::no_ack
is set tofalse
, a message has to be explicitely acknowledged or rejected withChannel::basic_ack
,Channel::basic_reject
orChannel::basic_nack
. See the documentation atDelivery
for further information.
Also see the RabbitMQ documentation about Acknowledgement Modes.
Consumer Prefetch
To limit the maximum number of unacknowledged messages arriving, you can call Channel::basic_qos
before creating the consumer.
Also see the RabbitMQ documentation about Consumer Prefetch.
Cancel subscription
To stop receiving messages, call Channel::basic_cancel
with the consumer tag of this
consumer.
Example
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Result}; use futures_executor::LocalPool; use futures_util::stream::StreamExt; use std::future::Future; let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); let res: Result<()> = LocalPool::new().run_until(async { let conn = Connection::connect( &addr, ConnectionProperties::default().with_default_executor(8), ) .await?; let channel = conn.create_channel().await?; let mut consumer = channel .basic_consume( "hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), ) .await?; while let Some(delivery) = consumer.next().await { let (channel, delivery) = delivery.expect("error in consumer"); channel .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) .await?; } Ok(()) });
Implementations
impl Consumer
[src]
pub fn tag(&self) -> ShortString
[src]
Gets the consumer tag.
If no consumer tag was specified when obtaining the consumer from the channel, this contains the server generated consumer tag.
pub fn set_delegate<D: ConsumerDelegate + 'static>(
&self,
delegate: D
) -> Result<()>
[src]
&self,
delegate: D
) -> Result<()>
Automatically spawns the delegate on the executor for each message.
Enables parallel handling of the messages.
Trait Implementations
impl Clone for Consumer
[src]
impl Debug for Consumer
[src]
impl IntoIterator for Consumer
[src]
type Item = Result<(Channel, Delivery)>
The type of the elements being iterated over.
type IntoIter = ConsumerIterator
Which kind of iterator are we turning this into?
fn into_iter(self) -> Self::IntoIter
[src]
impl Stream for Consumer
[src]
Auto Trait Implementations
impl !RefUnwindSafe for Consumer
impl Send for Consumer
impl Sync for Consumer
impl Unpin for Consumer
impl !UnwindSafe for Consumer
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> From<T> for T
[src]
impl<T> Instrument for T
[src]
fn instrument(self, span: Span) -> Instrumented<Self>
[src]
fn in_current_span(self) -> Instrumented<Self>
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<I> IntoIterator for I where
I: Iterator,
[src]
I: Iterator,
type Item = <I as Iterator>::Item
The type of the elements being iterated over.
type IntoIter = I
Which kind of iterator are we turning this into?
fn into_iter(self) -> I
[src]
impl<T> ToOwned for T where
T: Clone,
[src]
T: Clone,
type Owned = T
The resulting type after obtaining ownership.
fn to_owned(&self) -> T
[src]
fn clone_into(&self, target: &mut T)
[src]
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,
type Error = <U as TryFrom<T>>::Error
The type returned in the event of a conversion error.
fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>
[src]
impl<S, T, E> TryStream for S where
S: Stream<Item = Result<T, E>> + ?Sized,
[src]
S: Stream<Item = Result<T, E>> + ?Sized,