Struct nats::jetstream::pull_subscription::PullSubscription
source · pub struct PullSubscription(_);
Expand description
A PullSubscription
pulls messages from Server triggered by client actions
Pull Subscription does nothing on itself. It has to explicitly request messages
using one of available
Implementations
sourceimpl PullSubscription
impl PullSubscription
sourcepub fn fetch<I: Into<BatchOptions>>(&self, batch: I) -> Result<BatchIter<'_>>
pub fn fetch<I: Into<BatchOptions>>(&self, batch: I) -> Result<BatchIter<'_>>
Fetch given amount of messages for PullSubscription
and return Iterator
to handle them. The returned iterator is blocking, meaning it will wait until
every message from the batch are processed.
It can accept either usize
defined size of the batch, or BatchOptions
defining
also expires
and no_wait
.
If no_wait
will be specified, iterator will also return when there are no more messages
in the Consumer.
Example
let consumer = context.pull_subscribe("next")?;
// pass just number of messages to be fetched
for message in consumer.fetch(10)? {
println!("received message: {:?}", message);
}
// pass whole `BatchOptions` to fetch
let messages = consumer.fetch(BatchOptions{
expires: None,
no_wait: false,
batch: 10,
})?;
for message in messages {
println!("received message {:?}", message);
}
sourcepub fn timeout_fetch<I: Into<BatchOptions>>(
&self,
batch: I,
timeout: Duration
) -> Result<TimeoutBatchIter<'_>>
pub fn timeout_fetch<I: Into<BatchOptions>>(
&self,
batch: I,
timeout: Duration
) -> Result<TimeoutBatchIter<'_>>
Fetch given amount of messages for PullSubscription
and return Iterator
to handle them. The returned iterator is will retrieve message or wait for new ones for
a given set of time.
It will stop when all messages for given batch are processed.
That can happen if there are no more messages in the stream, or when iterator processed
number of messages specified in batch.
It can accept either usize
defined size of the batch, or BatchOptions
defining
also expires
and no_wait
.
If no_wait
will be specified, iterator will also return when there are no more messages
in the Consumer.
Example
let consumer = context.pull_subscribe("timeout_fetch")?;
// pass just number of messages to be fetched
for message in consumer.timeout_fetch(10, Duration::from_millis(100))? {
println!("received message: {:?}", message);
}
// pass whole `BatchOptions` to fetch
let messages = consumer.timeout_fetch(BatchOptions{
expires: None,
no_wait: false,
batch: 10,
}, Duration::from_millis(100))?;
for message in messages {
println!("received message {:?}", message);
}
sourcepub fn fetch_with_handler<F, I>(&self, batch: I, handler: F) -> Result<()>where
F: FnMut(&Message) -> Result<()>,
I: Into<BatchOptions> + Copy,
pub fn fetch_with_handler<F, I>(&self, batch: I, handler: F) -> Result<()>where
F: FnMut(&Message) -> Result<()>,
I: Into<BatchOptions> + Copy,
High level method that fetches given set of messages, processes them in user-provider
closure and acks them automatically according to Consumer
AckPolicy
.
Example
let consumer = context.pull_subscribe("fetch_with_handler")?;
consumer.fetch_with_handler(10, |message| {
println!("received message: {:?}", message);
Ok(())
})?;
sourcepub fn next(&self) -> Option<Message>
pub fn next(&self) -> Option<Message>
A low level method that should be used only in specific cases.
Pulls next message available for this PullSubscription
.
This operation is blocking and will indefinately wait for new messages.
Keep in mind that this requires user to request for messages first.
Example
let consumer = context.pull_subscribe("next")?;
consumer.request_batch(1)?;
let message = consumer.next();
println!("Received message: {:?}", message);
sourcepub fn try_next(&self) -> Option<Message>
pub fn try_next(&self) -> Option<Message>
A low level method that should be used only in specific cases.
Pulls next message available for this PullSubscription
.
This operation is non blocking, that will yield None
if there are no messages each time
it is called.
Keep in mind that this requires user to request for messages first.
Example
context.publish("try_next", "hello")?;
let consumer = context.pull_subscribe("try_next")?;
consumer.request_batch(1)?;
let message = consumer.try_next();
println!("Received message: {:?}", message);
assert!(consumer.try_next().is_none());
sourcepub fn next_timeout(&self, timeout: Duration) -> Result<Message>
pub fn next_timeout(&self, timeout: Duration) -> Result<Message>
A low level method that should be used only in specific cases.
Pulls next message available for this PullSubscription
.
This operation is contrast to its siblings next
and try_next
returns Message
wrapped
in io::Result
as it might return timeout
error, either on waiting for next message, or
network.
Keep in mind that this requires user to request for messages first.
Example
let consumer = context.pull_subscribe("next_timeout")?;
consumer.request_batch(1)?;
let message = consumer.next_timeout(Duration::from_millis(1000))?;
println!("Received message: {:?}", message);
// timeout on second, as there are no messages.
let message = consumer.next_timeout(Duration::from_millis(100));
assert!(message.is_err());
sourcepub fn request_batch<I: Into<BatchOptions>>(&self, batch: I) -> Result<()>
pub fn request_batch<I: Into<BatchOptions>>(&self, batch: I) -> Result<()>
Sends request for another set of messages to Pull Consumer. This method does not return any messages. It can be used to have more granular control of how many request and when are sent.
Example
let consumer = context.pull_subscribe("request_batch")?;
// request specific number of messages.
consumer.request_batch(10)?;
// request messages specifying whole config.
consumer.request_batch(BatchOptions{
expires: None,
no_wait: false,
batch: 10,
})?;
sourcepub fn iter(&self) -> Iter<'_>ⓘNotable traits for Iter<'a>impl<'a> Iterator for Iter<'a> type Item = Message;
pub fn iter(&self) -> Iter<'_>ⓘNotable traits for Iter<'a>impl<'a> Iterator for Iter<'a> type Item = Message;
Low level API that should be used with care.
For standard use cases consider using PullSubscription::fetch
or PullSubscription::fetch_with_handler
.
Returns iterator for Current Subscription.
As Pull Consumers requires Client to fetch messages, this will yield nothing if explicit PullSubscription::request_batch
was not sent.
Example
let consumer = context.pull_subscribe("iter")?;
// request specific number of messages.
consumer.request_batch(10)?;
// request messages specifying whole config.
consumer.request_batch(BatchOptions{
expires: Some(10000),
no_wait: true,
batch: 10,
})?;
for (i, message) in consumer.iter().enumerate() {
println!("recieved message: {:?}", message);
message.ack()?;
}
Trait Implementations
sourceimpl Clone for PullSubscription
impl Clone for PullSubscription
sourcefn clone(&self) -> PullSubscription
fn clone(&self) -> PullSubscription
1.0.0 · sourcefn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read more