[][src]Struct pulsar::consumer::Consumer

pub struct Consumer<T: DeserializeMessage, Exe: Executor> { /* fields omitted */ }

the consumer is used to subscribe to a topic

use pulsar::{Consumer, SubType};
use futures::StreamExt;

let mut consumer: Consumer<TestData, _> = pulsar
    .consumer()
    .with_topic("non-persistent://public/default/test")
    .with_consumer_name("test_consumer")
    .with_subscription_type(SubType::Exclusive)
    .with_subscription("test_subscription")
    .build()
    .await?;

let mut counter = 0usize;
while let Some(Ok(msg)) = consumer.next().await {
    consumer.ack(&msg).await?;
    let data = match msg.deserialize() {
        Ok(data) => data,
        Err(e) => {
            log::error!("could not deserialize message: {:?}", e);
            break;
        }
    };

    counter += 1;
    log::info!("got {} messages", counter);
}

Implementations

impl<T: DeserializeMessage, Exe: Executor> Consumer<T, Exe>[src]

pub fn builder(pulsar: &Pulsar<Exe>) -> ConsumerBuilder<Exe>[src]

pub async fn check_connection<'_>(&'_ self) -> Result<(), Error>[src]

pub async fn ack<'_, '_>(
    &'_ mut self,
    msg: &'_ Message<T>
) -> Result<(), ConsumerError>
[src]

pub async fn cumulative_ack<'_, '_>(
    &'_ mut self,
    msg: &'_ Message<T>
) -> Result<(), ConsumerError>
[src]

pub async fn nack<'_, '_>(
    &'_ mut self,
    msg: &'_ Message<T>
) -> Result<(), ConsumerError>
[src]

pub fn topics(&self) -> Vec<String>[src]

pub fn connections(&self) -> Vec<&Url>[src]

pub fn options(&self) -> &ConsumerOptions[src]

pub fn dead_letter_policy(&self) -> Option<&DeadLetterPolicy>[src]

pub fn subscription(&self) -> &str[src]

pub fn sub_type(&self) -> SubType[src]

pub fn batch_size(&self) -> Option<u32>[src]

pub fn consumer_name(&self) -> Option<&str>[src]

pub fn consumer_id(&self) -> Vec<u64>[src]

pub fn unacked_message_redelivery_delay(&self) -> Option<Duration>[src]

pub fn last_message_received(&self) -> Option<DateTime<Utc>>[src]

pub fn messages_received(&self) -> u64[src]

Trait Implementations

impl<T: DeserializeMessage + 'static, Exe: Executor> Stream for Consumer<T, Exe>[src]

type Item = Result<Message<T>, Error>

Values yielded by the stream.

Auto Trait Implementations

impl<T, Exe> !RefUnwindSafe for Consumer<T, Exe>

impl<T, Exe> Send for Consumer<T, Exe>

impl<T, Exe> !Sync for Consumer<T, Exe>

impl<T, Exe> Unpin for Consumer<T, Exe>

impl<T, Exe> !UnwindSafe for Consumer<T, Exe>

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<I> IntoStream for I where
    I: Stream
[src]

type Item = <I as Stream>::Item

The type of the elements being iterated over.

type IntoStream = I

Which kind of stream are we turning this into?

impl<T> StreamExt for T where
    T: Stream + ?Sized
[src]

impl<T> StreamExt for T where
    T: Stream + ?Sized
[src]

impl<St> StreamExt for St where
    St: Stream + ?Sized
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<S, T, E> TryStream for S where
    S: Stream<Item = Result<T, E>> + ?Sized
[src]

type Ok = T

The type of successful values yielded by this future

type Error = E

The type of failures yielded by this future

impl<S> TryStreamExt for S where
    S: TryStream + ?Sized
[src]

impl<V, T> VZip<V> for T where
    V: MultiLane<T>,