[][src]Struct lapin::Consumer

pub struct Consumer { /* fields omitted */ }

Continuously consumes message from a Queue.

A consumer represents a stream of messages created from the basic.consume AMQP command. It continuously receives messages from the queue, as opposed to the basic.get command, which retrieves only a single message.

A consumer is obtained by calling Channel::basic_consume with the queue name.

New messages from this consumer can be accessed by obtaining the iterator from the consumer. This iterator returns new messages and the associated channel in the form of a DeliveryResult for as long as the consumer is subscribed to the queue.

It is also possible to set a delegate to be spawned via set_delegate.

Message acknowledgment

There are two ways for acknowledging a message:

Also see the RabbitMQ documentation about Acknowledgement Modes.

Consumer Prefetch

To limit the maximum number of unacknowledged messages arriving, you can call Channel::basic_qos before creating the consumer.

Also see the RabbitMQ documentation about Consumer Prefetch.

Cancel subscription

To stop receiving messages, call Channel::basic_cancel with the consumer tag of this consumer.

Example

use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Result};
use futures_executor::LocalPool;
use futures_util::stream::StreamExt;
use std::future::Future;

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

let res: Result<()> = LocalPool::new().run_until(async {
    let conn = Connection::connect(
        &addr,
        ConnectionProperties::default().with_default_executor(8),
    )
    .await?;
    let channel = conn.create_channel().await?;
    let mut consumer = channel
        .basic_consume(
            "hello",
            "my_consumer",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;

    while let Some(delivery) = consumer.next().await {
        let (channel, delivery) = delivery.expect("error in consumer");
        channel
            .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
            .await?;
    }
    Ok(())
});

Implementations

impl Consumer[src]

pub fn tag(&self) -> ShortString[src]

Gets the consumer tag.

If no consumer tag was specified when obtaining the consumer from the channel, this contains the server generated consumer tag.

pub fn set_delegate<D: ConsumerDelegate + 'static>(
    &self,
    delegate: D
) -> Result<()>
[src]

Automatically spawns the delegate on the executor for each message.

Enables parallel handling of the messages.

Trait Implementations

impl Clone for Consumer[src]

impl Debug for Consumer[src]

impl IntoIterator for Consumer[src]

type Item = Result<(Channel, Delivery)>

The type of the elements being iterated over.

type IntoIter = ConsumerIterator

Which kind of iterator are we turning this into?

impl Stream for Consumer[src]

type Item = Result<(Channel, Delivery)>

Values yielded by the stream.

Auto Trait Implementations

impl !RefUnwindSafe for Consumer

impl Send for Consumer

impl Sync for Consumer

impl Unpin for Consumer

impl !UnwindSafe for Consumer

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<I> IntoIterator for I where
    I: Iterator
[src]

type Item = <I as Iterator>::Item

The type of the elements being iterated over.

type IntoIter = I

Which kind of iterator are we turning this into?

impl<T> ToOwned for T where
    T: Clone
[src]

type Owned = T

The resulting type after obtaining ownership.

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<S, T, E> TryStream for S where
    S: Stream<Item = Result<T, E>> + ?Sized
[src]

type Ok = T

The type of successful values yielded by this future

type Error = E

The type of failures yielded by this future