pub struct PullConsumer<T, C: CodecType> { /* private fields */ }Expand description
A typed pull consumer with configurable codec.
§Type Parameters
T- The message typeC- The codec type used for deserialization
Implementations§
Source§impl<T, C: CodecType> PullConsumer<T, C>
impl<T, C: CodecType> PullConsumer<T, C>
Source§impl<T: DeserializeOwned, C: CodecType> PullConsumer<T, C>
impl<T: DeserializeOwned, C: CodecType> PullConsumer<T, C>
Sourcepub async fn fetch(&self, batch_size: usize) -> Result<PullBatch<T, C>>
pub async fn fetch(&self, batch_size: usize) -> Result<PullBatch<T, C>>
Fetch a batch of messages.
§Example
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream.get_stream("events").await?;
let consumer = stream.get_pull_consumer::<Event>("my-consumer").await?;
let mut messages = consumer.fetch(10).await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}Sourcepub fn fetch_builder(&self) -> FetchBuilder<T, C>
pub fn fetch_builder(&self) -> FetchBuilder<T, C>
Fetch messages with a builder for advanced options.
Sourcepub async fn messages(&self) -> Result<PullMessages<T, C>>
pub async fn messages(&self) -> Result<PullMessages<T, C>>
Create a continuous message stream.
Returns a Stream that continuously delivers messages.
§Example
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream.get_stream("events").await?;
let consumer = stream.get_pull_consumer::<Event>("my-consumer").await?;
let mut messages = consumer.messages().await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}Auto Trait Implementations§
impl<T, C> Freeze for PullConsumer<T, C>
impl<T, C> !RefUnwindSafe for PullConsumer<T, C>
impl<T, C> Send for PullConsumer<T, C>where
T: Send,
impl<T, C> Sync for PullConsumer<T, C>where
T: Sync,
impl<T, C> Unpin for PullConsumer<T, C>
impl<T, C> !UnwindSafe for PullConsumer<T, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more