pub struct Consumer { /* private fields */ }
Expand description
Continuously consumes message from a Queue.
A consumer represents a stream of messages created from the basic.consume AMQP command. It continuously receives messages from the queue, as opposed to the basic.get command, which retrieves only a single message.
A consumer is obtained by calling Channel::basic_consume
with the queue name.
New messages from this consumer can be accessed by obtaining the iterator from the consumer.
This iterator returns new messages and the associated channel in the form of a
DeliveryResult
for as long as the consumer is subscribed to the queue.
It is also possible to set a delegate to be spawned via set_delegate
.
§Message acknowledgment
There are two ways of acknowledging a message:
- If the flag
BasicConsumeOptions::no_ack
is set totrue
while obtaining the consumer fromChannel::basic_consume
, the server implicitely acknowledges each message after it has been sent. - If the flag
BasicConsumeOptions::no_ack
is set tofalse
, a message has to be explicitly acknowledged or rejected withAcker::ack
,Acker::nack
orAcker::reject
. See the documentation atDelivery
for further information.
Also see the RabbitMQ documentation about Acknowledgement Modes.
§Consumer Prefetch
To limit the maximum number of unacknowledged messages arriving, you can call Channel::basic_qos
before creating the consumer.
Also see the RabbitMQ documentation about Consumer Prefetch.
§Cancel subscription
To stop receiving messages, call Channel::basic_cancel
with the consumer tag of this
consumer.
§Example
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Result};
use futures_lite::stream::StreamExt;
use std::future::Future;
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let res: Result<()> = async_global_executor::block_on(async {
let conn = Connection::connect(
&addr,
ConnectionProperties::default(),
)
.await?;
let channel = conn.create_channel().await?;
let mut consumer = channel
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
delivery
.ack(BasicAckOptions::default())
.await?;
}
Ok(())
});
Implementations§
source§impl Consumer
impl Consumer
sourcepub fn tag(&self) -> ShortString
pub fn tag(&self) -> ShortString
Gets the consumer tag.
If no consumer tag was specified when obtaining the consumer from the channel, this contains the server generated consumer tag.
sourcepub fn state(&self) -> ConsumerState
pub fn state(&self) -> ConsumerState
Gets the current state of the Consumer.
sourcepub fn queue(&self) -> ShortString
pub fn queue(&self) -> ShortString
Get the name of the queue we’re consuming
sourcepub fn set_delegate<D: ConsumerDelegate + 'static>(&self, delegate: D)
pub fn set_delegate<D: ConsumerDelegate + 'static>(&self, delegate: D)
Automatically spawns the delegate on the executor for each message.
Enables parallel handling of the messages.
Trait Implementations§
source§impl Stream for Consumer
impl Stream for Consumer
Auto Trait Implementations§
impl Freeze for Consumer
impl !RefUnwindSafe for Consumer
impl Send for Consumer
impl Sync for Consumer
impl Unpin for Consumer
impl !UnwindSafe for Consumer
Blanket Implementations§
source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<S> StreamExt for S
impl<S> StreamExt for S
source§fn next(&mut self) -> NextFuture<'_, Self>where
Self: Unpin,
fn next(&mut self) -> NextFuture<'_, Self>where
Self: Unpin,
source§fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
source§fn count(self) -> CountFuture<Self>where
Self: Sized,
fn count(self) -> CountFuture<Self>where
Self: Sized,
source§fn map<T, F>(self, f: F) -> Map<Self, F>
fn map<T, F>(self, f: F) -> Map<Self, F>
source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
source§fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
source§fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n
items of the stream. Read moresource§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n
items of the stream. Read moresource§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
step
th item. Read moresource§fn chain<U>(self, other: U) -> Chain<Self, U>
fn chain<U>(self, other: U) -> Chain<Self, U>
source§fn collect<C>(self) -> CollectFuture<Self, C>
fn collect<C>(self) -> CollectFuture<Self, C>
source§fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
source§fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
predicate
is true
and those for which it is
false
, and then collects them into two collections. Read moresource§fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
source§fn try_fold<T, E, F, B>(
&mut self,
init: B,
f: F
) -> TryFoldFuture<'_, Self, F, B>
fn try_fold<T, E, F, B>( &mut self, init: B, f: F ) -> TryFoldFuture<'_, Self, F, B>
source§fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
(index, item)
. Read moresource§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
source§fn nth(&mut self, n: usize) -> NthFuture<'_, Self>where
Self: Unpin,
fn nth(&mut self, n: usize) -> NthFuture<'_, Self>where
Self: Unpin,
n
th item of the stream. Read moresource§fn last(self) -> LastFuture<Self>where
Self: Sized,
fn last(self) -> LastFuture<Self>where
Self: Sized,
source§fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
source§fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
source§fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
source§fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
source§fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
source§fn zip<U>(self, other: U) -> Zip<Self, U>
fn zip<U>(self, other: U) -> Zip<Self, U>
source§fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
source§fn race<S>(self, other: S) -> Race<Self, S>
fn race<S>(self, other: S) -> Race<Self, S>
other
stream, with no preference for either stream when both are ready. Read moresource§fn drain(&mut self) -> Drain<'_, Self>
fn drain(&mut self) -> Drain<'_, Self>
source§impl<S> StreamExt for S
impl<S> StreamExt for S
source§fn next(&mut self) -> NextFuture<'_, Self>where
Self: Unpin,
fn next(&mut self) -> NextFuture<'_, Self>where
Self: Unpin,
source§fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
source§fn count(self) -> CountFuture<Self>where
Self: Sized,
fn count(self) -> CountFuture<Self>where
Self: Sized,
source§fn map<T, F>(self, f: F) -> Map<Self, F>
fn map<T, F>(self, f: F) -> Map<Self, F>
source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
source§fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
source§fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n
items of the stream. Read moresource§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n
items of the stream. Read moresource§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
step
th item. Read moresource§fn chain<U>(self, other: U) -> Chain<Self, U>
fn chain<U>(self, other: U) -> Chain<Self, U>
source§fn collect<C>(self) -> CollectFuture<Self, C>
fn collect<C>(self) -> CollectFuture<Self, C>
source§fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
source§fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
predicate
is true
and those for which it is
false
, and then collects them into two collections. Read moresource§fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
source§fn try_fold<T, E, F, B>(
&mut self,
init: B,
f: F
) -> TryFoldFuture<'_, Self, F, B>
fn try_fold<T, E, F, B>( &mut self, init: B, f: F ) -> TryFoldFuture<'_, Self, F, B>
source§fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
(index, item)
. Read moresource§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
source§fn nth(&mut self, n: usize) -> NthFuture<'_, Self>where
Self: Unpin,
fn nth(&mut self, n: usize) -> NthFuture<'_, Self>where
Self: Unpin,
n
th item of the stream. Read moresource§fn last(self) -> LastFuture<Self>where
Self: Sized,
fn last(self) -> LastFuture<Self>where
Self: Sized,
source§fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
source§fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
source§fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
source§fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
source§fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
source§fn zip<U>(self, other: U) -> Zip<Self, U>
fn zip<U>(self, other: U) -> Zip<Self, U>
source§fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
source§fn race<S>(self, other: S) -> Race<Self, S>
fn race<S>(self, other: S) -> Race<Self, S>
other
stream, with no preference for either stream when both are ready. Read more