async_nats/jetstream/consumer/pull.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16 future::{BoxFuture, Either},
17 FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32 connection::State,
33 error::Error,
34 jetstream::{self, Context},
35 StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44 backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45 StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50 /// Returns a stream of messages for Pull Consumer.
51 ///
52 /// # Example
53 ///
54 /// ```no_run
55 /// # #[tokio::main]
56 /// # async fn mains() -> Result<(), async_nats::Error> {
57 /// use futures::StreamExt;
58 /// use futures::TryStreamExt;
59 ///
60 /// let client = async_nats::connect("localhost:4222").await?;
61 /// let jetstream = async_nats::jetstream::new(client);
62 ///
63 /// let stream = jetstream
64 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
65 /// name: "events".to_string(),
66 /// max_messages: 10_000,
67 /// ..Default::default()
68 /// })
69 /// .await?;
70 ///
71 /// jetstream.publish("events", "data".into()).await?;
72 ///
73 /// let consumer = stream
74 /// .get_or_create_consumer(
75 /// "consumer",
76 /// async_nats::jetstream::consumer::pull::Config {
77 /// durable_name: Some("consumer".to_string()),
78 /// ..Default::default()
79 /// },
80 /// )
81 /// .await?;
82 ///
83 /// let mut messages = consumer.messages().await?.take(100);
84 /// while let Some(Ok(message)) = messages.next().await {
85 /// println!("got message {:?}", message);
86 /// message.ack().await?;
87 /// }
88 /// Ok(())
89 /// # }
90 /// ```
91 pub async fn messages(&self) -> Result<Stream, StreamError> {
92 Stream::stream(
93 BatchConfig {
94 batch: 200,
95 expires: Some(Duration::from_secs(30)),
96 no_wait: false,
97 max_bytes: 0,
98 idle_heartbeat: Duration::from_secs(15),
99 min_pending: None,
100 min_ack_pending: None,
101 group: None,
102 },
103 self,
104 )
105 .await
106 }
107
108 /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
109 /// messages or bytes buffered.
110 ///
111 /// # Examples
112 ///
113 /// ```no_run
114 /// # #[tokio::main]
115 /// # async fn main() -> Result<(), async_nats::Error> {
116 /// use async_nats::jetstream::consumer::PullConsumer;
117 /// use futures::StreamExt;
118 /// let client = async_nats::connect("localhost:4222").await?;
119 /// let jetstream = async_nats::jetstream::new(client);
120 ///
121 /// let consumer: PullConsumer = jetstream
122 /// .get_stream("events")
123 /// .await?
124 /// .get_consumer("pull")
125 /// .await?;
126 ///
127 /// let mut messages = consumer
128 /// .stream()
129 /// .max_messages_per_batch(100)
130 /// .max_bytes_per_batch(1024)
131 /// .messages()
132 /// .await?;
133 ///
134 /// while let Some(message) = messages.next().await {
135 /// let message = message?;
136 /// println!("message: {:?}", message);
137 /// message.ack().await?;
138 /// }
139 /// # Ok(())
140 /// # }
141 /// ```
142 pub fn stream(&self) -> StreamBuilder<'_> {
143 StreamBuilder::new(self)
144 }
145
146 pub async fn request_batch<I: Into<BatchConfig>>(
147 &self,
148 batch: I,
149 inbox: Subject,
150 ) -> Result<(), BatchRequestError> {
151 debug!("sending batch");
152 let subject = format!(
153 "{}.CONSUMER.MSG.NEXT.{}.{}",
154 self.context.prefix, self.info.stream_name, self.info.name
155 );
156
157 let payload = serde_json::to_vec(&batch.into())
158 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
159
160 self.context
161 .client
162 .publish_with_reply(subject, inbox, payload.into())
163 .await
164 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
165 debug!("batch request sent");
166 Ok(())
167 }
168
169 /// Returns a batch of specified number of messages, or if there are less messages on the
170 /// [Stream] than requested, returns all available messages.
171 ///
172 /// # Example
173 ///
174 /// ```no_run
175 /// # #[tokio::main]
176 /// # async fn mains() -> Result<(), async_nats::Error> {
177 /// use futures::StreamExt;
178 /// use futures::TryStreamExt;
179 ///
180 /// let client = async_nats::connect("localhost:4222").await?;
181 /// let jetstream = async_nats::jetstream::new(client);
182 ///
183 /// let stream = jetstream
184 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
185 /// name: "events".to_string(),
186 /// max_messages: 10_000,
187 /// ..Default::default()
188 /// })
189 /// .await?;
190 ///
191 /// jetstream.publish("events", "data".into()).await?;
192 ///
193 /// let consumer = stream
194 /// .get_or_create_consumer(
195 /// "consumer",
196 /// async_nats::jetstream::consumer::pull::Config {
197 /// durable_name: Some("consumer".to_string()),
198 /// ..Default::default()
199 /// },
200 /// )
201 /// .await?;
202 ///
203 /// for _ in 0..100 {
204 /// jetstream.publish("events", "data".into()).await?;
205 /// }
206 ///
207 /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
208 /// // will finish after 100 messages, as that is the number of messages available on the
209 /// // stream.
210 /// while let Some(Ok(message)) = messages.next().await {
211 /// println!("got message {:?}", message);
212 /// message.ack().await?;
213 /// }
214 /// Ok(())
215 /// # }
216 /// ```
217 pub fn fetch(&self) -> FetchBuilder {
218 FetchBuilder::new(self)
219 }
220
221 /// Returns a batch of specified number of messages unless timeout happens first.
222 ///
223 /// # Example
224 ///
225 /// ```no_run
226 /// # #[tokio::main]
227 /// # async fn mains() -> Result<(), async_nats::Error> {
228 /// use futures::StreamExt;
229 /// use futures::TryStreamExt;
230 ///
231 /// let client = async_nats::connect("localhost:4222").await?;
232 /// let jetstream = async_nats::jetstream::new(client);
233 ///
234 /// let stream = jetstream
235 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
236 /// name: "events".to_string(),
237 /// max_messages: 10_000,
238 /// ..Default::default()
239 /// })
240 /// .await?;
241 ///
242 /// jetstream.publish("events", "data".into()).await?;
243 ///
244 /// let consumer = stream
245 /// .get_or_create_consumer(
246 /// "consumer",
247 /// async_nats::jetstream::consumer::pull::Config {
248 /// durable_name: Some("consumer".to_string()),
249 /// ..Default::default()
250 /// },
251 /// )
252 /// .await?;
253 ///
254 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
255 /// while let Some(Ok(message)) = messages.next().await {
256 /// println!("got message {:?}", message);
257 /// message.ack().await?;
258 /// }
259 /// Ok(())
260 /// # }
261 /// ```
262 pub fn batch(&self) -> BatchBuilder {
263 BatchBuilder::new(self)
264 }
265
266 /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
267 /// messages in those batches.
268 ///
269 /// # Example
270 ///
271 /// ```no_run
272 /// # #[tokio::main]
273 /// # async fn mains() -> Result<(), async_nats::Error> {
274 /// use futures::StreamExt;
275 /// use futures::TryStreamExt;
276 ///
277 /// let client = async_nats::connect("localhost:4222").await?;
278 /// let jetstream = async_nats::jetstream::new(client);
279 ///
280 /// let stream = jetstream
281 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
282 /// name: "events".to_string(),
283 /// max_messages: 10_000,
284 /// ..Default::default()
285 /// })
286 /// .await?;
287 ///
288 /// jetstream.publish("events", "data".into()).await?;
289 ///
290 /// let consumer = stream
291 /// .get_or_create_consumer(
292 /// "consumer",
293 /// async_nats::jetstream::consumer::pull::Config {
294 /// durable_name: Some("consumer".to_string()),
295 /// ..Default::default()
296 /// },
297 /// )
298 /// .await?;
299 ///
300 /// let mut iter = consumer.sequence(50).unwrap().take(10);
301 /// while let Ok(Some(mut batch)) = iter.try_next().await {
302 /// while let Ok(Some(message)) = batch.try_next().await {
303 /// println!("message received: {:?}", message);
304 /// }
305 /// }
306 /// Ok(())
307 /// # }
308 /// ```
309 pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
310 let context = self.context.clone();
311 let subject = format!(
312 "{}.CONSUMER.MSG.NEXT.{}.{}",
313 self.context.prefix, self.info.stream_name, self.info.name
314 );
315
316 let request = serde_json::to_vec(&BatchConfig {
317 batch,
318 expires: Some(Duration::from_secs(60)),
319 ..Default::default()
320 })
321 .map(Bytes::from)
322 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
323
324 Ok(Sequence {
325 context,
326 subject,
327 request,
328 pending_messages: batch,
329 next: None,
330 })
331 }
332}
333
334pub struct Batch {
335 pending_messages: usize,
336 subscriber: Subscriber,
337 context: Context,
338 timeout: Option<Pin<Box<Sleep>>>,
339 terminated: bool,
340}
341
342impl Batch {
343 async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
344 let inbox = Subject::from(consumer.context.client.new_inbox());
345 let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
346 consumer.request_batch(batch.clone(), inbox.clone()).await?;
347
348 let sleep = batch.expires.map(|expires| {
349 Box::pin(tokio::time::sleep(
350 expires.saturating_add(Duration::from_secs(5)),
351 ))
352 });
353
354 Ok(Batch {
355 pending_messages: batch.batch,
356 subscriber: subscription,
357 context: consumer.context.clone(),
358 terminated: false,
359 timeout: sleep,
360 })
361 }
362}
363
364impl futures::Stream for Batch {
365 type Item = Result<jetstream::Message, crate::Error>;
366
367 fn poll_next(
368 mut self: std::pin::Pin<&mut Self>,
369 cx: &mut std::task::Context<'_>,
370 ) -> std::task::Poll<Option<Self::Item>> {
371 if self.terminated {
372 return Poll::Ready(None);
373 }
374 if self.pending_messages == 0 {
375 self.terminated = true;
376 return Poll::Ready(None);
377 }
378 if let Some(sleep) = self.timeout.as_mut() {
379 match sleep.poll_unpin(cx) {
380 Poll::Ready(_) => {
381 debug!("batch timeout timer triggered");
382 // TODO(tp): Maybe we can be smarter here and before timing out, check if
383 // we consumed all the messages from the subscription buffer in case of user
384 // slowly consuming messages. Keep in mind that we time out here only if
385 // for some reason we missed timeout from the server and few seconds have
386 // passed since expected timeout message.
387 self.terminated = true;
388 return Poll::Ready(None);
389 }
390 Poll::Pending => (),
391 }
392 }
393 match self.subscriber.receiver.poll_recv(cx) {
394 Poll::Ready(maybe_message) => match maybe_message {
395 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
396 StatusCode::TIMEOUT => {
397 debug!("received timeout. Iterator done");
398 self.terminated = true;
399 Poll::Ready(None)
400 }
401 StatusCode::IDLE_HEARTBEAT => {
402 debug!("received heartbeat");
403 Poll::Pending
404 }
405 // If this is fetch variant, terminate on no more messages.
406 // We do not need to check if this is a fetch, not batch,
407 // as only fetch will send back `NO_MESSAGES` status.
408 StatusCode::NOT_FOUND => {
409 debug!("received `NO_MESSAGES`. Iterator done");
410 self.terminated = true;
411 Poll::Ready(None)
412 }
413 StatusCode::OK => {
414 debug!("received message");
415 self.pending_messages -= 1;
416 Poll::Ready(Some(Ok(jetstream::Message {
417 context: self.context.clone(),
418 message,
419 })))
420 }
421 status => {
422 debug!("received error");
423 self.terminated = true;
424 Poll::Ready(Some(Err(Box::new(std::io::Error::new(
425 std::io::ErrorKind::Other,
426 format!(
427 "error while processing messages from the stream: {}, {:?}",
428 status, message.description
429 ),
430 )))))
431 }
432 },
433 None => Poll::Ready(None),
434 },
435 std::task::Poll::Pending => std::task::Poll::Pending,
436 }
437 }
438}
439
440pub struct Sequence {
441 context: Context,
442 subject: String,
443 request: Bytes,
444 pending_messages: usize,
445 next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
446}
447
448impl futures::Stream for Sequence {
449 type Item = Result<Batch, MessagesError>;
450
451 fn poll_next(
452 mut self: std::pin::Pin<&mut Self>,
453 cx: &mut std::task::Context<'_>,
454 ) -> std::task::Poll<Option<Self::Item>> {
455 match self.next.as_mut() {
456 None => {
457 let context = self.context.clone();
458 let subject = self.subject.clone();
459 let request = self.request.clone();
460 let pending_messages = self.pending_messages;
461
462 let next = self.next.insert(Box::pin(async move {
463 let inbox = context.client.new_inbox();
464 let subscriber = context
465 .client
466 .subscribe(inbox.clone())
467 .await
468 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
469
470 context
471 .client
472 .publish_with_reply(subject, inbox, request)
473 .await
474 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
475
476 // TODO(tp): Add timeout config and defaults.
477 Ok(Batch {
478 pending_messages,
479 subscriber,
480 context,
481 terminated: false,
482 timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
483 })
484 }));
485
486 match next.as_mut().poll(cx) {
487 Poll::Ready(result) => {
488 self.next = None;
489 Poll::Ready(Some(result.map_err(|err| {
490 MessagesError::with_source(MessagesErrorKind::Pull, err)
491 })))
492 }
493 Poll::Pending => Poll::Pending,
494 }
495 }
496
497 Some(next) => match next.as_mut().poll(cx) {
498 Poll::Ready(result) => {
499 self.next = None;
500 Poll::Ready(Some(result.map_err(|err| {
501 MessagesError::with_source(MessagesErrorKind::Pull, err)
502 })))
503 }
504 Poll::Pending => Poll::Pending,
505 },
506 }
507 }
508}
509
510impl Consumer<OrderedConfig> {
511 /// Returns a stream of messages for Ordered Pull Consumer.
512 ///
513 /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
514 /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
515 /// sees mismatch.
516 ///
517 /// # Example
518 ///
519 /// ```no_run
520 /// # #[tokio::main]
521 /// # async fn mains() -> Result<(), async_nats::Error> {
522 /// use futures::StreamExt;
523 /// use futures::TryStreamExt;
524 ///
525 /// let client = async_nats::connect("localhost:4222").await?;
526 /// let jetstream = async_nats::jetstream::new(client);
527 ///
528 /// let stream = jetstream
529 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
530 /// name: "events".to_string(),
531 /// max_messages: 10_000,
532 /// ..Default::default()
533 /// })
534 /// .await?;
535 ///
536 /// jetstream.publish("events", "data".into()).await?;
537 ///
538 /// let consumer = stream
539 /// .get_or_create_consumer(
540 /// "consumer",
541 /// async_nats::jetstream::consumer::pull::OrderedConfig {
542 /// name: Some("consumer".to_string()),
543 /// ..Default::default()
544 /// },
545 /// )
546 /// .await?;
547 ///
548 /// let mut messages = consumer.messages().await?.take(100);
549 /// while let Some(Ok(message)) = messages.next().await {
550 /// println!("got message {:?}", message);
551 /// }
552 /// Ok(())
553 /// # }
554 /// ```
555 pub async fn messages(self) -> Result<Ordered, StreamError> {
556 let config = Consumer {
557 config: self.config.clone().into(),
558 context: self.context.clone(),
559 info: self.info.clone(),
560 };
561 let stream = Stream::stream(
562 BatchConfig {
563 batch: 500,
564 expires: Some(Duration::from_secs(30)),
565 no_wait: false,
566 max_bytes: 0,
567 idle_heartbeat: Duration::from_secs(15),
568 min_pending: None,
569 min_ack_pending: None,
570 group: None,
571 },
572 &config,
573 )
574 .await?;
575
576 Ok(Ordered {
577 consumer_sequence: 0,
578 stream_sequence: 0,
579 missed_heartbeats: false,
580 create_stream: None,
581 context: self.context.clone(),
582 consumer_name: self
583 .config
584 .name
585 .clone()
586 .unwrap_or_else(|| self.context.client.new_inbox()),
587 consumer: self.config,
588 stream: Some(stream),
589 stream_name: self.info.stream_name.clone(),
590 })
591 }
592}
593
594/// Configuration for consumers. From a high level, the
595/// `durable_name` and `deliver_subject` fields have a particularly
596/// strong influence on the consumer's overall behavior.
597#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
598pub struct OrderedConfig {
599 /// A name of the consumer. Can be specified for both durable and ephemeral
600 /// consumers.
601 #[serde(default, skip_serializing_if = "Option::is_none")]
602 pub name: Option<String>,
603 /// A short description of the purpose of this consumer.
604 #[serde(default, skip_serializing_if = "Option::is_none")]
605 pub description: Option<String>,
606 #[serde(default, skip_serializing_if = "is_default")]
607 pub filter_subject: String,
608 #[cfg(feature = "server_2_10")]
609 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
610 #[serde(default, skip_serializing_if = "is_default")]
611 pub filter_subjects: Vec<String>,
612 /// Whether messages are sent as quickly as possible or at the rate of receipt
613 pub replay_policy: ReplayPolicy,
614 /// The rate of message delivery in bits per second
615 #[serde(default, skip_serializing_if = "is_default")]
616 pub rate_limit: u64,
617 /// What percentage of acknowledgments should be samples for observability, 0-100
618 #[serde(
619 rename = "sample_freq",
620 with = "super::sample_freq_deser",
621 default,
622 skip_serializing_if = "is_default"
623 )]
624 pub sample_frequency: u8,
625 /// Only deliver headers without payloads.
626 #[serde(default, skip_serializing_if = "is_default")]
627 pub headers_only: bool,
628 /// Allows for a variety of options that determine how this consumer will receive messages
629 #[serde(flatten)]
630 pub deliver_policy: DeliverPolicy,
631 /// The maximum number of waiting consumers.
632 #[serde(default, skip_serializing_if = "is_default")]
633 pub max_waiting: i64,
634 #[cfg(feature = "server_2_10")]
635 // Additional consumer metadata.
636 #[serde(default, skip_serializing_if = "is_default")]
637 pub metadata: HashMap<String, String>,
638 // Maximum number of messages that can be requested in single Pull Request.
639 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
640 // [stream]
641 pub max_batch: i64,
642 // Maximum number of bytes that can be requested in single Pull Request.
643 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
644 // [stream]
645 pub max_bytes: i64,
646 // Maximum expiry that can be set for a single Pull Request.
647 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
648 // [stream]
649 pub max_expires: Duration,
650}
651
652impl From<OrderedConfig> for Config {
653 fn from(config: OrderedConfig) -> Self {
654 Config {
655 durable_name: None,
656 name: config.name,
657 description: config.description,
658 deliver_policy: config.deliver_policy,
659 ack_policy: AckPolicy::None,
660 ack_wait: Duration::default(),
661 max_deliver: 1,
662 filter_subject: config.filter_subject,
663 #[cfg(feature = "server_2_10")]
664 filter_subjects: config.filter_subjects,
665 replay_policy: config.replay_policy,
666 rate_limit: config.rate_limit,
667 sample_frequency: config.sample_frequency,
668 max_waiting: config.max_waiting,
669 max_ack_pending: 0,
670 headers_only: config.headers_only,
671 max_batch: config.max_batch,
672 max_bytes: config.max_bytes,
673 max_expires: config.max_expires,
674 inactive_threshold: Duration::from_secs(30),
675 num_replicas: 1,
676 memory_storage: true,
677 #[cfg(feature = "server_2_10")]
678 metadata: config.metadata,
679 backoff: Vec::new(),
680 #[cfg(feature = "server_2_11")]
681 priority_policy: PriorityPolicy::None,
682 #[cfg(feature = "server_2_11")]
683 priority_groups: Vec::new(),
684 #[cfg(feature = "server_2_11")]
685 pause_until: None,
686 }
687 }
688}
689
690impl FromConsumer for OrderedConfig {
691 fn try_from_consumer_config(
692 config: crate::jetstream::consumer::Config,
693 ) -> Result<Self, crate::Error>
694 where
695 Self: Sized,
696 {
697 Ok(OrderedConfig {
698 name: config.name,
699 description: config.description,
700 filter_subject: config.filter_subject,
701 #[cfg(feature = "server_2_10")]
702 filter_subjects: config.filter_subjects,
703 replay_policy: config.replay_policy,
704 rate_limit: config.rate_limit,
705 sample_frequency: config.sample_frequency,
706 headers_only: config.headers_only,
707 deliver_policy: config.deliver_policy,
708 max_waiting: config.max_waiting,
709 #[cfg(feature = "server_2_10")]
710 metadata: config.metadata,
711 max_batch: config.max_batch,
712 max_bytes: config.max_bytes,
713 max_expires: config.max_expires,
714 })
715 }
716}
717
718impl IntoConsumerConfig for OrderedConfig {
719 fn into_consumer_config(self) -> super::Config {
720 jetstream::consumer::Config {
721 deliver_subject: None,
722 durable_name: None,
723 name: self.name,
724 description: self.description,
725 deliver_group: None,
726 deliver_policy: self.deliver_policy,
727 ack_policy: AckPolicy::None,
728 ack_wait: Duration::default(),
729 max_deliver: 1,
730 filter_subject: self.filter_subject,
731 #[cfg(feature = "server_2_10")]
732 filter_subjects: self.filter_subjects,
733 replay_policy: self.replay_policy,
734 rate_limit: self.rate_limit,
735 sample_frequency: self.sample_frequency,
736 max_waiting: self.max_waiting,
737 max_ack_pending: 0,
738 headers_only: self.headers_only,
739 flow_control: false,
740 idle_heartbeat: Duration::default(),
741 max_batch: 0,
742 max_bytes: 0,
743 max_expires: Duration::default(),
744 inactive_threshold: Duration::from_secs(30),
745 num_replicas: 1,
746 memory_storage: true,
747 #[cfg(feature = "server_2_10")]
748 metadata: self.metadata,
749 backoff: Vec::new(),
750 #[cfg(feature = "server_2_11")]
751 priority_policy: PriorityPolicy::None,
752 #[cfg(feature = "server_2_11")]
753 priority_groups: Vec::new(),
754 #[cfg(feature = "server_2_11")]
755 pause_until: None,
756 }
757 }
758}
759
760pub struct Ordered {
761 context: Context,
762 stream_name: String,
763 consumer: OrderedConfig,
764 consumer_name: String,
765 stream: Option<Stream>,
766 create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
767 consumer_sequence: u64,
768 stream_sequence: u64,
769 missed_heartbeats: bool,
770}
771
772impl futures::Stream for Ordered {
773 type Item = Result<jetstream::Message, OrderedError>;
774
775 fn poll_next(
776 mut self: Pin<&mut Self>,
777 cx: &mut std::task::Context<'_>,
778 ) -> Poll<Option<Self::Item>> {
779 let mut recreate = false;
780 // Poll messages
781 if let Some(stream) = self.stream.as_mut() {
782 match stream.poll_next_unpin(cx) {
783 Poll::Ready(message) => match message {
784 Some(message) => match message {
785 Ok(message) => {
786 self.missed_heartbeats = false;
787 let info = message.info().map_err(|err| {
788 OrderedError::with_source(OrderedErrorKind::Other, err)
789 })?;
790 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
791 self.consumer_sequence,
792 self.stream_sequence,
793 info.consumer_sequence,
794 info.stream_sequence);
795 if info.consumer_sequence != self.consumer_sequence + 1 {
796 debug!(
797 "ordered consumer mismatch. current {}, info: {}",
798 self.consumer_sequence, info.consumer_sequence
799 );
800 recreate = true;
801 self.consumer_sequence = 0;
802 } else {
803 self.stream_sequence = info.stream_sequence;
804 self.consumer_sequence = info.consumer_sequence;
805 return Poll::Ready(Some(Ok(message)));
806 }
807 }
808 Err(err) => match err.kind() {
809 MessagesErrorKind::MissingHeartbeat => {
810 // If we have missed heartbeats set, it means this is a second
811 // missed heartbeat, so we need to recreate consumer.
812 if self.missed_heartbeats {
813 self.consumer_sequence = 0;
814 recreate = true;
815 } else {
816 self.missed_heartbeats = true;
817 }
818 }
819 MessagesErrorKind::ConsumerDeleted
820 | MessagesErrorKind::NoResponders => {
821 recreate = true;
822 self.consumer_sequence = 0;
823 }
824 MessagesErrorKind::Pull
825 | MessagesErrorKind::PushBasedConsumer
826 | MessagesErrorKind::Other => {
827 return Poll::Ready(Some(Err(err.into())));
828 }
829 },
830 },
831 None => return Poll::Ready(None),
832 },
833 Poll::Pending => (),
834 }
835 }
836 // Recreate consumer if needed
837 if recreate {
838 self.stream = None;
839 self.create_stream = Some(Box::pin({
840 let context = self.context.clone();
841 let config = self.consumer.clone();
842 let stream_name = self.stream_name.clone();
843 let consumer_name = self.consumer_name.clone();
844 let sequence = self.stream_sequence;
845 async move {
846 tryhard::retry_fn(|| {
847 recreate_consumer_stream(
848 &context,
849 &config,
850 &stream_name,
851 &consumer_name,
852 sequence,
853 )
854 })
855 .retries(u32::MAX)
856 .custom_backoff(backoff)
857 .await
858 }
859 }))
860 }
861 // check for recreation future
862 if let Some(result) = self.create_stream.as_mut() {
863 match result.poll_unpin(cx) {
864 Poll::Ready(result) => match result {
865 Ok(stream) => {
866 self.create_stream = None;
867 self.stream = Some(stream);
868 return self.poll_next(cx);
869 }
870 Err(err) => {
871 return Poll::Ready(Some(Err(OrderedError::with_source(
872 OrderedErrorKind::Recreate,
873 err,
874 ))))
875 }
876 },
877 Poll::Pending => (),
878 }
879 }
880 Poll::Pending
881 }
882}
883
884pub struct Stream {
885 pending_messages: usize,
886 pending_bytes: usize,
887 request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
888 request_tx: tokio::sync::watch::Sender<()>,
889 subscriber: Subscriber,
890 batch_config: BatchConfig,
891 context: Context,
892 pending_request: bool,
893 task_handle: JoinHandle<()>,
894 terminated: bool,
895 heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
896}
897
898impl Drop for Stream {
899 fn drop(&mut self) {
900 self.task_handle.abort();
901 }
902}
903
904impl Stream {
905 async fn stream(
906 batch_config: BatchConfig,
907 consumer: &Consumer<Config>,
908 ) -> Result<Stream, StreamError> {
909 let inbox = consumer.context.client.new_inbox();
910 let subscription = consumer
911 .context
912 .client
913 .subscribe(inbox.clone())
914 .await
915 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
916 let subject = format!(
917 "{}.CONSUMER.MSG.NEXT.{}.{}",
918 consumer.context.prefix, consumer.info.stream_name, consumer.info.name
919 );
920
921 let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
922 let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
923 let task_handle = tokio::task::spawn({
924 let batch = batch_config.clone();
925 let consumer = consumer.clone();
926 let mut context = consumer.context.clone();
927 let inbox = inbox.clone();
928 async move {
929 loop {
930 // this is just in edge case of missing response for some reason.
931 let expires = batch_config
932 .expires
933 .map(|expires| {
934 if expires.is_zero() {
935 Either::Left(future::pending())
936 } else {
937 Either::Right(tokio::time::sleep(
938 expires.saturating_add(Duration::from_secs(5)),
939 ))
940 }
941 })
942 .unwrap_or_else(|| Either::Left(future::pending()));
943 // Need to check previous state, as `changed` will always fire on first
944 // call.
945 let prev_state = context.client.state.borrow().to_owned();
946 let mut pending_reset = false;
947
948 tokio::select! {
949 _ = context.client.state.changed() => {
950 let state = context.client.state.borrow().to_owned();
951 if !(state == crate::connection::State::Connected
952 && prev_state != State::Connected) {
953 continue;
954 }
955 debug!("detected !Connected -> Connected state change");
956
957 match tryhard::retry_fn(|| consumer.fetch_info())
958 .retries(5).custom_backoff(backoff).await
959 .map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
960 Ok(info) => {
961 if info.num_waiting == 0 {
962 pending_reset = true;
963 }
964 }
965 Err(err) => {
966 if let Err(err) = request_result_tx.send(Err(err)).await {
967 debug!("failed to sent request result: {}", err);
968 }
969 },
970 }
971 },
972 _ = request_rx.changed() => debug!("task received request request"),
973 _ = expires => {
974 pending_reset = true;
975 debug!("expired pull request")},
976 }
977
978 let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
979 let result = context
980 .client
981 .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
982 .await
983 .map(|_| pending_reset);
984 // TODO: add tracing instead of ignoring this.
985 request_result_tx
986 .send(result.map(|_| pending_reset).map_err(|err| {
987 crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
988 .into()
989 }))
990 .await
991 .ok();
992 trace!("result send over tx");
993 }
994 }
995 });
996
997 Ok(Stream {
998 task_handle,
999 request_result_rx,
1000 request_tx,
1001 batch_config,
1002 pending_messages: 0,
1003 pending_bytes: 0,
1004 subscriber: subscription,
1005 context: consumer.context.clone(),
1006 pending_request: false,
1007 terminated: false,
1008 heartbeat_timeout: None,
1009 })
1010 }
1011}
1012
1013#[derive(Clone, Copy, Debug, PartialEq)]
1014pub enum OrderedErrorKind {
1015 MissingHeartbeat,
1016 ConsumerDeleted,
1017 Pull,
1018 PushBasedConsumer,
1019 Recreate,
1020 NoResponders,
1021 Other,
1022}
1023
1024impl std::fmt::Display for OrderedErrorKind {
1025 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1026 match self {
1027 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1028 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1029 Self::Pull => write!(f, "pull request failed"),
1030 Self::Other => write!(f, "error"),
1031 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1032 Self::Recreate => write!(f, "consumer recreation failed"),
1033 Self::NoResponders => write!(f, "no responders"),
1034 }
1035 }
1036}
1037
1038pub type OrderedError = Error<OrderedErrorKind>;
1039
1040impl From<MessagesError> for OrderedError {
1041 fn from(err: MessagesError) -> Self {
1042 match err.kind() {
1043 MessagesErrorKind::MissingHeartbeat => {
1044 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1045 }
1046 MessagesErrorKind::ConsumerDeleted => {
1047 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1048 }
1049 MessagesErrorKind::Pull => OrderedError {
1050 kind: OrderedErrorKind::Pull,
1051 source: err.source,
1052 },
1053 MessagesErrorKind::PushBasedConsumer => {
1054 OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1055 }
1056 MessagesErrorKind::Other => OrderedError {
1057 kind: OrderedErrorKind::Other,
1058 source: err.source,
1059 },
1060 MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
1061 }
1062 }
1063}
1064
1065#[derive(Clone, Copy, Debug, PartialEq)]
1066pub enum MessagesErrorKind {
1067 MissingHeartbeat,
1068 ConsumerDeleted,
1069 Pull,
1070 PushBasedConsumer,
1071 NoResponders,
1072 Other,
1073}
1074
1075impl std::fmt::Display for MessagesErrorKind {
1076 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1077 match self {
1078 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1079 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1080 Self::Pull => write!(f, "pull request failed"),
1081 Self::Other => write!(f, "error"),
1082 Self::NoResponders => write!(f, "no responders"),
1083 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1084 }
1085 }
1086}
1087
1088pub type MessagesError = Error<MessagesErrorKind>;
1089
1090impl futures::Stream for Stream {
1091 type Item = Result<jetstream::Message, MessagesError>;
1092
1093 fn poll_next(
1094 mut self: std::pin::Pin<&mut Self>,
1095 cx: &mut std::task::Context<'_>,
1096 ) -> std::task::Poll<Option<Self::Item>> {
1097 if self.terminated {
1098 return Poll::Ready(None);
1099 }
1100
1101 if !self.batch_config.idle_heartbeat.is_zero() {
1102 trace!("checking idle hearbeats");
1103 let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1104 match self
1105 .heartbeat_timeout
1106 .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1107 .poll_unpin(cx)
1108 {
1109 Poll::Ready(_) => {
1110 self.heartbeat_timeout = None;
1111 return Poll::Ready(Some(Err(MessagesError::new(
1112 MessagesErrorKind::MissingHeartbeat,
1113 ))));
1114 }
1115 Poll::Pending => (),
1116 }
1117 }
1118
1119 loop {
1120 trace!("pending messages: {}", self.pending_messages);
1121 if (self.pending_messages <= self.batch_config.batch / 2
1122 || (self.batch_config.max_bytes > 0
1123 && self.pending_bytes <= self.batch_config.max_bytes / 2))
1124 && !self.pending_request
1125 {
1126 debug!("pending messages reached threshold to send new fetch request");
1127 self.request_tx.send(()).ok();
1128 self.pending_request = true;
1129 }
1130
1131 match self.request_result_rx.poll_recv(cx) {
1132 Poll::Ready(resp) => match resp {
1133 Some(resp) => match resp {
1134 Ok(reset) => {
1135 trace!("request response: {:?}", reset);
1136 debug!("request sent, setting pending messages");
1137 if reset {
1138 self.pending_messages = self.batch_config.batch;
1139 self.pending_bytes = self.batch_config.max_bytes;
1140 } else {
1141 self.pending_messages += self.batch_config.batch;
1142 self.pending_bytes += self.batch_config.max_bytes;
1143 }
1144 self.pending_request = false;
1145 continue;
1146 }
1147 Err(err) => {
1148 return Poll::Ready(Some(Err(MessagesError::with_source(
1149 MessagesErrorKind::Pull,
1150 err,
1151 ))))
1152 }
1153 },
1154 None => return Poll::Ready(None),
1155 },
1156 Poll::Pending => {
1157 trace!("pending result");
1158 }
1159 }
1160
1161 trace!("polling subscriber");
1162 match self.subscriber.receiver.poll_recv(cx) {
1163 Poll::Ready(maybe_message) => {
1164 self.heartbeat_timeout = None;
1165 match maybe_message {
1166 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1167 StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1168 debug!("received status message: {:?}", message);
1169 // If consumer has been deleted, error and shutdown the iterator.
1170 if message.description.as_deref() == Some("Consumer Deleted") {
1171 self.terminated = true;
1172 return Poll::Ready(Some(Err(MessagesError::new(
1173 MessagesErrorKind::ConsumerDeleted,
1174 ))));
1175 }
1176 // If consumer is not pull based, error and shutdown the iterator.
1177 if message.description.as_deref() == Some("Consumer is push based")
1178 {
1179 self.terminated = true;
1180 return Poll::Ready(Some(Err(MessagesError::new(
1181 MessagesErrorKind::PushBasedConsumer,
1182 ))));
1183 }
1184
1185 // Do accounting for messages left after terminated/completed pull request.
1186 let pending_messages = message
1187 .headers
1188 .as_ref()
1189 .and_then(|headers| headers.get("Nats-Pending-Messages"))
1190 .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1191 .map_err(|err| {
1192 MessagesError::with_source(MessagesErrorKind::Other, err)
1193 })?;
1194
1195 let pending_bytes = message
1196 .headers
1197 .as_ref()
1198 .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1199 .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1200 .map_err(|err| {
1201 MessagesError::with_source(MessagesErrorKind::Other, err)
1202 })?;
1203
1204 debug!(
1205 "timeout reached. remaining messages: {}, bytes {}",
1206 pending_messages, pending_bytes
1207 );
1208 self.pending_messages =
1209 self.pending_messages.saturating_sub(pending_messages);
1210 trace!("message bytes len: {}", pending_bytes);
1211 self.pending_bytes =
1212 self.pending_bytes.saturating_sub(pending_bytes);
1213 continue;
1214 }
1215 // Idle Hearbeat means we have no messages, but consumer is fine.
1216 StatusCode::IDLE_HEARTBEAT => {
1217 debug!("received idle heartbeat");
1218 continue;
1219 }
1220 // We got an message from a stream.
1221 StatusCode::OK => {
1222 trace!("message received");
1223 self.pending_messages = self.pending_messages.saturating_sub(1);
1224 self.pending_bytes =
1225 self.pending_bytes.saturating_sub(message.length);
1226 return Poll::Ready(Some(Ok(jetstream::Message {
1227 context: self.context.clone(),
1228 message,
1229 })));
1230 }
1231 StatusCode::NO_RESPONDERS => {
1232 debug!("received no responders");
1233 return Poll::Ready(Some(Err(MessagesError::new(
1234 MessagesErrorKind::NoResponders,
1235 ))));
1236 }
1237 status => {
1238 debug!("received unknown message: {:?}", message);
1239 return Poll::Ready(Some(Err(MessagesError::with_source(
1240 MessagesErrorKind::Other,
1241 format!(
1242 "error while processing messages from the stream: {}, {:?}",
1243 status, message.description
1244 ),
1245 ))));
1246 }
1247 },
1248 None => return Poll::Ready(None),
1249 }
1250 }
1251 Poll::Pending => {
1252 debug!("subscriber still pending");
1253 return std::task::Poll::Pending;
1254 }
1255 }
1256 }
1257 }
1258}
1259
1260/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1261///
1262/// # Examples
1263///
1264/// ```no_run
1265/// # #[tokio::main]
1266/// # async fn main() -> Result<(), async_nats::Error> {
1267/// use futures::StreamExt;
1268/// use async_nats::jetstream::consumer::PullConsumer;
1269/// let client = async_nats::connect("localhost:4222").await?;
1270/// let jetstream = async_nats::jetstream::new(client);
1271///
1272/// let consumer: PullConsumer = jetstream
1273/// .get_stream("events").await?
1274/// .get_consumer("pull").await?;
1275///
1276/// let mut messages = consumer.stream()
1277/// .max_messages_per_batch(100)
1278/// .max_bytes_per_batch(1024)
1279/// .messages().await?;
1280///
1281/// while let Some(message) = messages.next().await {
1282/// let message = message?;
1283/// println!("message: {:?}", message);
1284/// message.ack().await?;
1285/// }
1286/// # Ok(())
1287/// # }
1288pub struct StreamBuilder<'a> {
1289 batch: usize,
1290 max_bytes: usize,
1291 heartbeat: Duration,
1292 expires: Duration,
1293 group: Option<String>,
1294 min_pending: Option<usize>,
1295 min_ack_pending: Option<usize>,
1296 consumer: &'a Consumer<Config>,
1297}
1298
1299impl<'a> StreamBuilder<'a> {
1300 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1301 StreamBuilder {
1302 consumer,
1303 batch: 200,
1304 max_bytes: 0,
1305 expires: Duration::from_secs(30),
1306 heartbeat: Duration::default(),
1307 group: None,
1308 min_pending: None,
1309 min_ack_pending: None,
1310 }
1311 }
1312
1313 /// Sets max bytes that can be buffered on the Client while processing already received
1314 /// messages.
1315 /// Higher values will yield better performance, but also potentially increase memory usage if
1316 /// application is acknowledging messages much slower than they arrive.
1317 ///
1318 /// Default values should provide reasonable balance between performance and memory usage.
1319 ///
1320 /// # Examples
1321 ///
1322 /// ```no_run
1323 /// # #[tokio::main]
1324 /// # async fn main() -> Result<(), async_nats::Error> {
1325 /// use async_nats::jetstream::consumer::PullConsumer;
1326 /// use futures::StreamExt;
1327 /// let client = async_nats::connect("localhost:4222").await?;
1328 /// let jetstream = async_nats::jetstream::new(client);
1329 ///
1330 /// let consumer: PullConsumer = jetstream
1331 /// .get_stream("events")
1332 /// .await?
1333 /// .get_consumer("pull")
1334 /// .await?;
1335 ///
1336 /// let mut messages = consumer
1337 /// .stream()
1338 /// .max_bytes_per_batch(1024)
1339 /// .messages()
1340 /// .await?;
1341 ///
1342 /// while let Some(message) = messages.next().await {
1343 /// let message = message?;
1344 /// println!("message: {:?}", message);
1345 /// message.ack().await?;
1346 /// }
1347 /// # Ok(())
1348 /// # }
1349 /// ```
1350 pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1351 self.max_bytes = max_bytes;
1352 self
1353 }
1354
1355 /// Sets max number of messages that can be buffered on the Client while processing already received
1356 /// messages.
1357 /// Higher values will yield better performance, but also potentially increase memory usage if
1358 /// application is acknowledging messages much slower than they arrive.
1359 ///
1360 /// Default values should provide reasonable balance between performance and memory usage.
1361 ///
1362 /// # Examples
1363 ///
1364 /// ```no_run
1365 /// # #[tokio::main]
1366 /// # async fn main() -> Result<(), async_nats::Error> {
1367 /// use async_nats::jetstream::consumer::PullConsumer;
1368 /// use futures::StreamExt;
1369 /// let client = async_nats::connect("localhost:4222").await?;
1370 /// let jetstream = async_nats::jetstream::new(client);
1371 ///
1372 /// let consumer: PullConsumer = jetstream
1373 /// .get_stream("events")
1374 /// .await?
1375 /// .get_consumer("pull")
1376 /// .await?;
1377 ///
1378 /// let mut messages = consumer
1379 /// .stream()
1380 /// .max_messages_per_batch(100)
1381 /// .messages()
1382 /// .await?;
1383 ///
1384 /// while let Some(message) = messages.next().await {
1385 /// let message = message?;
1386 /// println!("message: {:?}", message);
1387 /// message.ack().await?;
1388 /// }
1389 /// # Ok(())
1390 /// # }
1391 /// ```
1392 pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1393 self.batch = batch;
1394 self
1395 }
1396
1397 /// Sets heartbeat which will be send by the server if there are no messages for a given
1398 /// [Consumer] pending.
1399 ///
1400 /// # Examples
1401 ///
1402 /// ```no_run
1403 /// # #[tokio::main]
1404 /// # async fn main() -> Result<(), async_nats::Error> {
1405 /// use async_nats::jetstream::consumer::PullConsumer;
1406 /// use futures::StreamExt;
1407 /// let client = async_nats::connect("localhost:4222").await?;
1408 /// let jetstream = async_nats::jetstream::new(client);
1409 ///
1410 /// let consumer: PullConsumer = jetstream
1411 /// .get_stream("events")
1412 /// .await?
1413 /// .get_consumer("pull")
1414 /// .await?;
1415 ///
1416 /// let mut messages = consumer
1417 /// .stream()
1418 /// .heartbeat(std::time::Duration::from_secs(10))
1419 /// .messages()
1420 /// .await?;
1421 ///
1422 /// while let Some(message) = messages.next().await {
1423 /// let message = message?;
1424 /// println!("message: {:?}", message);
1425 /// message.ack().await?;
1426 /// }
1427 /// # Ok(())
1428 /// # }
1429 /// ```
1430 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1431 self.heartbeat = heartbeat;
1432 self
1433 }
1434
1435 /// Low level API that does not need tweaking for most use cases.
1436 /// Sets how long each batch request waits for whole batch of messages before timing out.
1437 /// [Consumer] pending.
1438 ///
1439 /// # Examples
1440 ///
1441 /// ```no_run
1442 /// # #[tokio::main]
1443 /// # async fn main() -> Result<(), async_nats::Error> {
1444 /// use async_nats::jetstream::consumer::PullConsumer;
1445 /// use futures::StreamExt;
1446 /// let client = async_nats::connect("localhost:4222").await?;
1447 /// let jetstream = async_nats::jetstream::new(client);
1448 ///
1449 /// let consumer: PullConsumer = jetstream
1450 /// .get_stream("events")
1451 /// .await?
1452 /// .get_consumer("pull")
1453 /// .await?;
1454 ///
1455 /// let mut messages = consumer
1456 /// .stream()
1457 /// .expires(std::time::Duration::from_secs(30))
1458 /// .messages()
1459 /// .await?;
1460 ///
1461 /// while let Some(message) = messages.next().await {
1462 /// let message = message?;
1463 /// println!("message: {:?}", message);
1464 /// message.ack().await?;
1465 /// }
1466 /// # Ok(())
1467 /// # }
1468 /// ```
1469 pub fn expires(mut self, expires: Duration) -> Self {
1470 self.expires = expires;
1471 self
1472 }
1473
1474 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1475 /// messages for a [Consumer].
1476 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1477 ///
1478 /// # Examples
1479 ///
1480 /// ```no_run
1481 /// # #[tokio::main]
1482 /// # async fn main() -> Result<(), async_nats::Error> {
1483 /// use async_nats::jetstream::consumer::PullConsumer;
1484 /// use futures::StreamExt;
1485 /// let client = async_nats::connect("localhost:4222").await?;
1486 /// let jetstream = async_nats::jetstream::new(client);
1487 ///
1488 /// let consumer: PullConsumer = jetstream
1489 /// .get_stream("events")
1490 /// .await?
1491 /// .get_consumer("pull")
1492 /// .await?;
1493 ///
1494 /// let mut messages = consumer
1495 /// .stream()
1496 /// .expires(std::time::Duration::from_secs(30))
1497 /// .group("A")
1498 /// .min_pending(100)
1499 /// .messages()
1500 /// .await?;
1501 ///
1502 /// while let Some(message) = messages.next().await {
1503 /// let message = message?;
1504 /// println!("message: {:?}", message);
1505 /// message.ack().await?;
1506 /// }
1507 /// # Ok(())
1508 /// # }
1509 /// ```
1510 pub fn min_pending(mut self, min_pending: usize) -> Self {
1511 self.min_pending = Some(min_pending);
1512 self
1513 }
1514
1515 /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1516 /// messages for a [Consumer].
1517 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1518 ///
1519 /// # Examples
1520 ///
1521 /// ```no_run
1522 /// # #[tokio::main]
1523 /// # async fn main() -> Result<(), async_nats::Error> {
1524 /// use async_nats::jetstream::consumer::PullConsumer;
1525 /// use futures::StreamExt;
1526 /// let client = async_nats::connect("localhost:4222").await?;
1527 /// let jetstream = async_nats::jetstream::new(client);
1528 ///
1529 /// let consumer: PullConsumer = jetstream
1530 /// .get_stream("events")
1531 /// .await?
1532 /// .get_consumer("pull")
1533 /// .await?;
1534 ///
1535 /// let mut messages = consumer
1536 /// .stream()
1537 /// .expires(std::time::Duration::from_secs(30))
1538 /// .group("A")
1539 /// .min_ack_pending(100)
1540 /// .messages()
1541 /// .await?;
1542 ///
1543 /// while let Some(message) = messages.next().await {
1544 /// let message = message?;
1545 /// println!("message: {:?}", message);
1546 /// message.ack().await?;
1547 /// }
1548 /// # Ok(())
1549 /// # }
1550 /// ```
1551 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1552 self.min_ack_pending = Some(min_ack_pending);
1553 self
1554 }
1555
1556 /// Setting group when using [Consumer] with [Config::priority_groups].
1557 ///
1558 /// # Examples
1559 ///
1560 /// ```no_run
1561 /// # #[tokio::main]
1562 /// # async fn main() -> Result<(), async_nats::Error> {
1563 /// use async_nats::jetstream::consumer::PullConsumer;
1564 /// use futures::StreamExt;
1565 /// let client = async_nats::connect("localhost:4222").await?;
1566 /// let jetstream = async_nats::jetstream::new(client);
1567 ///
1568 /// let consumer: PullConsumer = jetstream
1569 /// .get_stream("events")
1570 /// .await?
1571 /// .get_consumer("pull")
1572 /// .await?;
1573 ///
1574 /// let mut messages = consumer
1575 /// .stream()
1576 /// .expires(std::time::Duration::from_secs(30))
1577 /// .group("A")
1578 /// .min_ack_pending(100)
1579 /// .messages()
1580 /// .await?;
1581 ///
1582 /// while let Some(message) = messages.next().await {
1583 /// let message = message?;
1584 /// println!("message: {:?}", message);
1585 /// message.ack().await?;
1586 /// }
1587 /// # Ok(())
1588 /// # }
1589 /// ```
1590 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1591 self.group = Some(group.into());
1592 self
1593 }
1594
1595 /// Creates actual [Stream] with provided configuration.
1596 ///
1597 /// # Examples
1598 ///
1599 /// ```no_run
1600 /// # #[tokio::main]
1601 /// # async fn main() -> Result<(), async_nats::Error> {
1602 /// use async_nats::jetstream::consumer::PullConsumer;
1603 /// use futures::StreamExt;
1604 /// let client = async_nats::connect("localhost:4222").await?;
1605 /// let jetstream = async_nats::jetstream::new(client);
1606 ///
1607 /// let consumer: PullConsumer = jetstream
1608 /// .get_stream("events")
1609 /// .await?
1610 /// .get_consumer("pull")
1611 /// .await?;
1612 ///
1613 /// let mut messages = consumer
1614 /// .stream()
1615 /// .max_messages_per_batch(100)
1616 /// .messages()
1617 /// .await?;
1618 ///
1619 /// while let Some(message) = messages.next().await {
1620 /// let message = message?;
1621 /// println!("message: {:?}", message);
1622 /// message.ack().await?;
1623 /// }
1624 /// # Ok(())
1625 /// # }
1626 /// ```
1627 pub async fn messages(self) -> Result<Stream, StreamError> {
1628 Stream::stream(
1629 BatchConfig {
1630 batch: self.batch,
1631 expires: Some(self.expires),
1632 no_wait: false,
1633 max_bytes: self.max_bytes,
1634 idle_heartbeat: self.heartbeat,
1635 min_pending: self.min_pending,
1636 group: self.group,
1637 min_ack_pending: self.min_ack_pending,
1638 },
1639 self.consumer,
1640 )
1641 .await
1642 }
1643}
1644
1645/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1646///
1647/// # Examples
1648///
1649/// ```no_run
1650/// # #[tokio::main]
1651/// # async fn main() -> Result<(), async_nats::Error> {
1652/// use async_nats::jetstream::consumer::PullConsumer;
1653/// use futures::StreamExt;
1654/// let client = async_nats::connect("localhost:4222").await?;
1655/// let jetstream = async_nats::jetstream::new(client);
1656///
1657/// let consumer: PullConsumer = jetstream
1658/// .get_stream("events")
1659/// .await?
1660/// .get_consumer("pull")
1661/// .await?;
1662///
1663/// let mut messages = consumer
1664/// .fetch()
1665/// .max_messages(100)
1666/// .max_bytes(1024)
1667/// .messages()
1668/// .await?;
1669///
1670/// while let Some(message) = messages.next().await {
1671/// let message = message?;
1672/// println!("message: {:?}", message);
1673/// message.ack().await?;
1674/// }
1675/// # Ok(())
1676/// # }
1677/// ```
1678pub struct FetchBuilder<'a> {
1679 batch: usize,
1680 max_bytes: usize,
1681 heartbeat: Duration,
1682 expires: Option<Duration>,
1683 min_pending: Option<usize>,
1684 min_ack_pending: Option<usize>,
1685 group: Option<String>,
1686 consumer: &'a Consumer<Config>,
1687}
1688
1689impl<'a> FetchBuilder<'a> {
1690 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1691 FetchBuilder {
1692 consumer,
1693 batch: 200,
1694 max_bytes: 0,
1695 expires: None,
1696 min_pending: None,
1697 min_ack_pending: None,
1698 group: None,
1699 heartbeat: Duration::default(),
1700 }
1701 }
1702
1703 /// Sets max bytes that can be buffered on the Client while processing already received
1704 /// messages.
1705 /// Higher values will yield better performance, but also potentially increase memory usage if
1706 /// application is acknowledging messages much slower than they arrive.
1707 ///
1708 /// Default values should provide reasonable balance between performance and memory usage.
1709 ///
1710 /// # Examples
1711 ///
1712 /// ```no_run
1713 /// # #[tokio::main]
1714 /// # async fn main() -> Result<(), async_nats::Error> {
1715 /// use futures::StreamExt;
1716 /// let client = async_nats::connect("localhost:4222").await?;
1717 /// let jetstream = async_nats::jetstream::new(client);
1718 ///
1719 /// let consumer = jetstream
1720 /// .get_stream("events")
1721 /// .await?
1722 /// .get_consumer("pull")
1723 /// .await?;
1724 ///
1725 /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1726 ///
1727 /// while let Some(message) = messages.next().await {
1728 /// let message = message?;
1729 /// println!("message: {:?}", message);
1730 /// message.ack().await?;
1731 /// }
1732 /// # Ok(())
1733 /// # }
1734 /// ```
1735 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1736 self.max_bytes = max_bytes;
1737 self
1738 }
1739
1740 /// Sets max number of messages that can be buffered on the Client while processing already received
1741 /// messages.
1742 /// Higher values will yield better performance, but also potentially increase memory usage if
1743 /// application is acknowledging messages much slower than they arrive.
1744 ///
1745 /// Default values should provide reasonable balance between performance and memory usage.
1746 ///
1747 /// # Examples
1748 ///
1749 /// ```no_run
1750 /// # #[tokio::main]
1751 /// # async fn main() -> Result<(), async_nats::Error> {
1752 /// use futures::StreamExt;
1753 /// let client = async_nats::connect("localhost:4222").await?;
1754 /// let jetstream = async_nats::jetstream::new(client);
1755 ///
1756 /// let consumer = jetstream
1757 /// .get_stream("events")
1758 /// .await?
1759 /// .get_consumer("pull")
1760 /// .await?;
1761 ///
1762 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1763 ///
1764 /// while let Some(message) = messages.next().await {
1765 /// let message = message?;
1766 /// println!("message: {:?}", message);
1767 /// message.ack().await?;
1768 /// }
1769 /// # Ok(())
1770 /// # }
1771 /// ```
1772 pub fn max_messages(mut self, batch: usize) -> Self {
1773 self.batch = batch;
1774 self
1775 }
1776
1777 /// Sets heartbeat which will be send by the server if there are no messages for a given
1778 /// [Consumer] pending.
1779 ///
1780 /// # Examples
1781 ///
1782 /// ```no_run
1783 /// # #[tokio::main]
1784 /// # async fn main() -> Result<(), async_nats::Error> {
1785 /// use async_nats::jetstream::consumer::PullConsumer;
1786 /// use futures::StreamExt;
1787 /// let client = async_nats::connect("localhost:4222").await?;
1788 /// let jetstream = async_nats::jetstream::new(client);
1789 ///
1790 /// let consumer = jetstream
1791 /// .get_stream("events")
1792 /// .await?
1793 /// .get_consumer("pull")
1794 /// .await?;
1795 ///
1796 /// let mut messages = consumer
1797 /// .fetch()
1798 /// .heartbeat(std::time::Duration::from_secs(10))
1799 /// .messages()
1800 /// .await?;
1801 ///
1802 /// while let Some(message) = messages.next().await {
1803 /// let message = message?;
1804 /// println!("message: {:?}", message);
1805 /// message.ack().await?;
1806 /// }
1807 /// # Ok(())
1808 /// # }
1809 /// ```
1810 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1811 self.heartbeat = heartbeat;
1812 self
1813 }
1814
1815 /// Low level API that does not need tweaking for most use cases.
1816 /// Sets how long each batch request waits for whole batch of messages before timing out.
1817 /// [Consumer] pending.
1818 ///
1819 /// # Examples
1820 ///
1821 /// ```no_run
1822 /// # #[tokio::main]
1823 /// # async fn main() -> Result<(), async_nats::Error> {
1824 /// use async_nats::jetstream::consumer::PullConsumer;
1825 /// use futures::StreamExt;
1826 ///
1827 /// let client = async_nats::connect("localhost:4222").await?;
1828 /// let jetstream = async_nats::jetstream::new(client);
1829 ///
1830 /// let consumer: PullConsumer = jetstream
1831 /// .get_stream("events")
1832 /// .await?
1833 /// .get_consumer("pull")
1834 /// .await?;
1835 ///
1836 /// let mut messages = consumer
1837 /// .fetch()
1838 /// .expires(std::time::Duration::from_secs(30))
1839 /// .messages()
1840 /// .await?;
1841 ///
1842 /// while let Some(message) = messages.next().await {
1843 /// let message = message?;
1844 /// println!("message: {:?}", message);
1845 /// message.ack().await?;
1846 /// }
1847 /// # Ok(())
1848 /// # }
1849 /// ```
1850 pub fn expires(mut self, expires: Duration) -> Self {
1851 self.expires = Some(expires);
1852 self
1853 }
1854
1855 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1856 /// messages.
1857 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1858 /// [PriorityPolicy::Overflow] set.
1859 ///
1860 /// # Examples
1861 ///
1862 /// ```no_run
1863 /// # #[tokio::main]
1864 /// # async fn main() -> Result<(), async_nats::Error> {
1865 /// use async_nats::jetstream::consumer::PullConsumer;
1866 /// use futures::StreamExt;
1867 ///
1868 /// let client = async_nats::connect("localhost:4222").await?;
1869 /// let jetstream = async_nats::jetstream::new(client);
1870 ///
1871 /// let consumer: PullConsumer = jetstream
1872 /// .get_stream("events")
1873 /// .await?
1874 /// .get_consumer("pull")
1875 /// .await?;
1876 ///
1877 /// let mut messages = consumer
1878 /// .fetch()
1879 /// .expires(std::time::Duration::from_secs(30))
1880 /// .group("A")
1881 /// .min_pending(100)
1882 /// .messages()
1883 /// .await?;
1884 ///
1885 /// while let Some(message) = messages.next().await {
1886 /// let message = message?;
1887 /// println!("message: {:?}", message);
1888 /// message.ack().await?;
1889 /// }
1890 /// # Ok(())
1891 /// # }
1892 /// ```
1893 pub fn min_pending(mut self, min_pending: usize) -> Self {
1894 self.min_pending = Some(min_pending);
1895 self
1896 }
1897
1898 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1899 /// messages.
1900 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1901 /// [PriorityPolicy::Overflow] set.
1902 ///
1903 /// # Examples
1904 ///
1905 /// ```no_run
1906 /// # #[tokio::main]
1907 /// # async fn main() -> Result<(), async_nats::Error> {
1908 /// use async_nats::jetstream::consumer::PullConsumer;
1909 /// use futures::StreamExt;
1910 ///
1911 /// let client = async_nats::connect("localhost:4222").await?;
1912 /// let jetstream = async_nats::jetstream::new(client);
1913 ///
1914 /// let consumer: PullConsumer = jetstream
1915 /// .get_stream("events")
1916 /// .await?
1917 /// .get_consumer("pull")
1918 /// .await?;
1919 ///
1920 /// let mut messages = consumer
1921 /// .fetch()
1922 /// .expires(std::time::Duration::from_secs(30))
1923 /// .group("A")
1924 /// .min_ack_pending(100)
1925 /// .messages()
1926 /// .await?;
1927 ///
1928 /// while let Some(message) = messages.next().await {
1929 /// let message = message?;
1930 /// println!("message: {:?}", message);
1931 /// message.ack().await?;
1932 /// }
1933 /// # Ok(())
1934 /// # }
1935 /// ```
1936 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1937 self.min_ack_pending = Some(min_ack_pending);
1938 self
1939 }
1940
1941 /// Setting group when using [Consumer] with [PriorityPolicy].
1942 ///
1943 /// # Examples
1944 ///
1945 /// ```no_run
1946 /// # #[tokio::main]
1947 /// # async fn main() -> Result<(), async_nats::Error> {
1948 /// use async_nats::jetstream::consumer::PullConsumer;
1949 /// use futures::StreamExt;
1950 ///
1951 /// let client = async_nats::connect("localhost:4222").await?;
1952 /// let jetstream = async_nats::jetstream::new(client);
1953 ///
1954 /// let consumer: PullConsumer = jetstream
1955 /// .get_stream("events")
1956 /// .await?
1957 /// .get_consumer("pull")
1958 /// .await?;
1959 ///
1960 /// let mut messages = consumer
1961 /// .fetch()
1962 /// .expires(std::time::Duration::from_secs(30))
1963 /// .group("A")
1964 /// .min_ack_pending(100)
1965 /// .messages()
1966 /// .await?;
1967 ///
1968 /// while let Some(message) = messages.next().await {
1969 /// let message = message?;
1970 /// println!("message: {:?}", message);
1971 /// message.ack().await?;
1972 /// }
1973 /// # Ok(())
1974 /// # }
1975 /// ```
1976 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1977 self.group = Some(group.into());
1978 self
1979 }
1980
1981 /// Creates actual [Stream] with provided configuration.
1982 ///
1983 /// # Examples
1984 ///
1985 /// ```no_run
1986 /// # #[tokio::main]
1987 /// # async fn main() -> Result<(), async_nats::Error> {
1988 /// use async_nats::jetstream::consumer::PullConsumer;
1989 /// use futures::StreamExt;
1990 /// let client = async_nats::connect("localhost:4222").await?;
1991 /// let jetstream = async_nats::jetstream::new(client);
1992 ///
1993 /// let consumer: PullConsumer = jetstream
1994 /// .get_stream("events")
1995 /// .await?
1996 /// .get_consumer("pull")
1997 /// .await?;
1998 ///
1999 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
2000 ///
2001 /// while let Some(message) = messages.next().await {
2002 /// let message = message?;
2003 /// println!("message: {:?}", message);
2004 /// message.ack().await?;
2005 /// }
2006 /// # Ok(())
2007 /// # }
2008 /// ```
2009 pub async fn messages(self) -> Result<Batch, BatchError> {
2010 Batch::batch(
2011 BatchConfig {
2012 batch: self.batch,
2013 expires: self.expires,
2014 no_wait: true,
2015 max_bytes: self.max_bytes,
2016 idle_heartbeat: self.heartbeat,
2017 min_pending: self.min_pending,
2018 min_ack_pending: self.min_ack_pending,
2019 group: self.group,
2020 },
2021 self.consumer,
2022 )
2023 .await
2024 }
2025}
2026
2027/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2028///
2029/// # Examples
2030///
2031/// ```no_run
2032/// # #[tokio::main]
2033/// # async fn main() -> Result<(), async_nats::Error> {
2034/// use async_nats::jetstream::consumer::PullConsumer;
2035/// use futures::StreamExt;
2036/// let client = async_nats::connect("localhost:4222").await?;
2037/// let jetstream = async_nats::jetstream::new(client);
2038///
2039/// let consumer: PullConsumer = jetstream
2040/// .get_stream("events")
2041/// .await?
2042/// .get_consumer("pull")
2043/// .await?;
2044///
2045/// let mut messages = consumer
2046/// .batch()
2047/// .max_messages(100)
2048/// .max_bytes(1024)
2049/// .messages()
2050/// .await?;
2051///
2052/// while let Some(message) = messages.next().await {
2053/// let message = message?;
2054/// println!("message: {:?}", message);
2055/// message.ack().await?;
2056/// }
2057/// # Ok(())
2058/// # }
2059/// ```
2060pub struct BatchBuilder<'a> {
2061 batch: usize,
2062 max_bytes: usize,
2063 heartbeat: Duration,
2064 expires: Duration,
2065 min_pending: Option<usize>,
2066 min_ack_pending: Option<usize>,
2067 group: Option<String>,
2068 consumer: &'a Consumer<Config>,
2069}
2070
2071impl<'a> BatchBuilder<'a> {
2072 pub fn new(consumer: &'a Consumer<Config>) -> Self {
2073 BatchBuilder {
2074 consumer,
2075 batch: 200,
2076 max_bytes: 0,
2077 expires: Duration::ZERO,
2078 heartbeat: Duration::default(),
2079 min_pending: None,
2080 min_ack_pending: None,
2081 group: None,
2082 }
2083 }
2084
2085 /// Sets max bytes that can be buffered on the Client while processing already received
2086 /// messages.
2087 /// Higher values will yield better performance, but also potentially increase memory usage if
2088 /// application is acknowledging messages much slower than they arrive.
2089 ///
2090 /// Default values should provide reasonable balance between performance and memory usage.
2091 ///
2092 /// # Examples
2093 ///
2094 /// ```no_run
2095 /// # #[tokio::main]
2096 /// # async fn main() -> Result<(), async_nats::Error> {
2097 /// use async_nats::jetstream::consumer::PullConsumer;
2098 /// use futures::StreamExt;
2099 /// let client = async_nats::connect("localhost:4222").await?;
2100 /// let jetstream = async_nats::jetstream::new(client);
2101 ///
2102 /// let consumer: PullConsumer = jetstream
2103 /// .get_stream("events")
2104 /// .await?
2105 /// .get_consumer("pull")
2106 /// .await?;
2107 ///
2108 /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2109 ///
2110 /// while let Some(message) = messages.next().await {
2111 /// let message = message?;
2112 /// println!("message: {:?}", message);
2113 /// message.ack().await?;
2114 /// }
2115 /// # Ok(())
2116 /// # }
2117 /// ```
2118 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2119 self.max_bytes = max_bytes;
2120 self
2121 }
2122
2123 /// Sets max number of messages that can be buffered on the Client while processing already received
2124 /// messages.
2125 /// Higher values will yield better performance, but also potentially increase memory usage if
2126 /// application is acknowledging messages much slower than they arrive.
2127 ///
2128 /// Default values should provide reasonable balance between performance and memory usage.
2129 ///
2130 /// # Examples
2131 ///
2132 /// ```no_run
2133 /// # #[tokio::main]
2134 /// # async fn main() -> Result<(), async_nats::Error> {
2135 /// use async_nats::jetstream::consumer::PullConsumer;
2136 /// use futures::StreamExt;
2137 /// let client = async_nats::connect("localhost:4222").await?;
2138 /// let jetstream = async_nats::jetstream::new(client);
2139 ///
2140 /// let consumer: PullConsumer = jetstream
2141 /// .get_stream("events")
2142 /// .await?
2143 /// .get_consumer("pull")
2144 /// .await?;
2145 ///
2146 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2147 ///
2148 /// while let Some(message) = messages.next().await {
2149 /// let message = message?;
2150 /// println!("message: {:?}", message);
2151 /// message.ack().await?;
2152 /// }
2153 /// # Ok(())
2154 /// # }
2155 /// ```
2156 pub fn max_messages(mut self, batch: usize) -> Self {
2157 self.batch = batch;
2158 self
2159 }
2160
2161 /// Sets heartbeat which will be send by the server if there are no messages for a given
2162 /// [Consumer] pending.
2163 ///
2164 /// # Examples
2165 ///
2166 /// ```no_run
2167 /// # #[tokio::main]
2168 /// # async fn main() -> Result<(), async_nats::Error> {
2169 /// use async_nats::jetstream::consumer::PullConsumer;
2170 /// use futures::StreamExt;
2171 /// let client = async_nats::connect("localhost:4222").await?;
2172 /// let jetstream = async_nats::jetstream::new(client);
2173 ///
2174 /// let consumer: PullConsumer = jetstream
2175 /// .get_stream("events")
2176 /// .await?
2177 /// .get_consumer("pull")
2178 /// .await?;
2179 ///
2180 /// let mut messages = consumer
2181 /// .batch()
2182 /// .heartbeat(std::time::Duration::from_secs(10))
2183 /// .messages()
2184 /// .await?;
2185 ///
2186 /// while let Some(message) = messages.next().await {
2187 /// let message = message?;
2188 /// println!("message: {:?}", message);
2189 /// message.ack().await?;
2190 /// }
2191 /// # Ok(())
2192 /// # }
2193 /// ```
2194 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2195 self.heartbeat = heartbeat;
2196 self
2197 }
2198
2199 /// Sets overflow threshold for minimum pending messages before this stream will start getting
2200 /// messages.
2201 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2202 /// [PriorityPolicy::Overflow] set.
2203 ///
2204 /// # Examples
2205 ///
2206 /// ```no_run
2207 /// # #[tokio::main]
2208 /// # async fn main() -> Result<(), async_nats::Error> {
2209 /// use async_nats::jetstream::consumer::PullConsumer;
2210 /// use futures::StreamExt;
2211 ///
2212 /// let client = async_nats::connect("localhost:4222").await?;
2213 /// let jetstream = async_nats::jetstream::new(client);
2214 ///
2215 /// let consumer: PullConsumer = jetstream
2216 /// .get_stream("events")
2217 /// .await?
2218 /// .get_consumer("pull")
2219 /// .await?;
2220 ///
2221 /// let mut messages = consumer
2222 /// .batch()
2223 /// .expires(std::time::Duration::from_secs(30))
2224 /// .group("A")
2225 /// .min_pending(100)
2226 /// .messages()
2227 /// .await?;
2228 ///
2229 /// while let Some(message) = messages.next().await {
2230 /// let message = message?;
2231 /// println!("message: {:?}", message);
2232 /// message.ack().await?;
2233 /// }
2234 /// # Ok(())
2235 /// # }
2236 /// ```
2237 pub fn min_pending(mut self, min_pending: usize) -> Self {
2238 self.min_pending = Some(min_pending);
2239 self
2240 }
2241
2242 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2243 /// messages.
2244 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2245 /// [PriorityPolicy::Overflow] set.
2246 ///
2247 /// # Examples
2248 ///
2249 /// ```no_run
2250 /// # #[tokio::main]
2251 /// # async fn main() -> Result<(), async_nats::Error> {
2252 /// use async_nats::jetstream::consumer::PullConsumer;
2253 /// use futures::StreamExt;
2254 ///
2255 /// let client = async_nats::connect("localhost:4222").await?;
2256 /// let jetstream = async_nats::jetstream::new(client);
2257 ///
2258 /// let consumer: PullConsumer = jetstream
2259 /// .get_stream("events")
2260 /// .await?
2261 /// .get_consumer("pull")
2262 /// .await?;
2263 ///
2264 /// let mut messages = consumer
2265 /// .batch()
2266 /// .expires(std::time::Duration::from_secs(30))
2267 /// .group("A")
2268 /// .min_ack_pending(100)
2269 /// .messages()
2270 /// .await?;
2271 ///
2272 /// while let Some(message) = messages.next().await {
2273 /// let message = message?;
2274 /// println!("message: {:?}", message);
2275 /// message.ack().await?;
2276 /// }
2277 /// # Ok(())
2278 /// # }
2279 /// ```
2280 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2281 self.min_ack_pending = Some(min_ack_pending);
2282 self
2283 }
2284
2285 /// Setting group when using [Consumer] with [PriorityPolicy].
2286 ///
2287 /// # Examples
2288 ///
2289 /// ```no_run
2290 /// # #[tokio::main]
2291 /// # async fn main() -> Result<(), async_nats::Error> {
2292 /// use async_nats::jetstream::consumer::PullConsumer;
2293 /// use futures::StreamExt;
2294 ///
2295 /// let client = async_nats::connect("localhost:4222").await?;
2296 /// let jetstream = async_nats::jetstream::new(client);
2297 ///
2298 /// let consumer: PullConsumer = jetstream
2299 /// .get_stream("events")
2300 /// .await?
2301 /// .get_consumer("pull")
2302 /// .await?;
2303 ///
2304 /// let mut messages = consumer
2305 /// .batch()
2306 /// .expires(std::time::Duration::from_secs(30))
2307 /// .group("A")
2308 /// .min_ack_pending(100)
2309 /// .messages()
2310 /// .await?;
2311 ///
2312 /// while let Some(message) = messages.next().await {
2313 /// let message = message?;
2314 /// println!("message: {:?}", message);
2315 /// message.ack().await?;
2316 /// }
2317 /// # Ok(())
2318 /// # }
2319 /// ```
2320 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2321 self.group = Some(group.into());
2322 self
2323 }
2324
2325 /// Low level API that does not need tweaking for most use cases.
2326 /// Sets how long each batch request waits for whole batch of messages before timing out.
2327 /// [Consumer] pending.
2328 ///
2329 /// # Examples
2330 ///
2331 /// ```no_run
2332 /// # #[tokio::main]
2333 /// # async fn main() -> Result<(), async_nats::Error> {
2334 /// use async_nats::jetstream::consumer::PullConsumer;
2335 /// use futures::StreamExt;
2336 /// let client = async_nats::connect("localhost:4222").await?;
2337 /// let jetstream = async_nats::jetstream::new(client);
2338 ///
2339 /// let consumer: PullConsumer = jetstream
2340 /// .get_stream("events")
2341 /// .await?
2342 /// .get_consumer("pull")
2343 /// .await?;
2344 ///
2345 /// let mut messages = consumer
2346 /// .batch()
2347 /// .expires(std::time::Duration::from_secs(30))
2348 /// .messages()
2349 /// .await?;
2350 ///
2351 /// while let Some(message) = messages.next().await {
2352 /// let message = message?;
2353 /// println!("message: {:?}", message);
2354 /// message.ack().await?;
2355 /// }
2356 /// # Ok(())
2357 /// # }
2358 /// ```
2359 pub fn expires(mut self, expires: Duration) -> Self {
2360 self.expires = expires;
2361 self
2362 }
2363
2364 /// Creates actual [Stream] with provided configuration.
2365 ///
2366 /// # Examples
2367 ///
2368 /// ```no_run
2369 /// # #[tokio::main]
2370 /// # async fn main() -> Result<(), async_nats::Error> {
2371 /// use async_nats::jetstream::consumer::PullConsumer;
2372 /// use futures::StreamExt;
2373 /// let client = async_nats::connect("localhost:4222").await?;
2374 /// let jetstream = async_nats::jetstream::new(client);
2375 ///
2376 /// let consumer: PullConsumer = jetstream
2377 /// .get_stream("events")
2378 /// .await?
2379 /// .get_consumer("pull")
2380 /// .await?;
2381 ///
2382 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2383 ///
2384 /// while let Some(message) = messages.next().await {
2385 /// let message = message?;
2386 /// println!("message: {:?}", message);
2387 /// message.ack().await?;
2388 /// }
2389 /// # Ok(())
2390 /// # }
2391 /// ```
2392 pub async fn messages(self) -> Result<Batch, BatchError> {
2393 let config = BatchConfig {
2394 batch: self.batch,
2395 expires: Some(self.expires),
2396 no_wait: false,
2397 max_bytes: self.max_bytes,
2398 idle_heartbeat: self.heartbeat,
2399 min_pending: self.min_pending,
2400 min_ack_pending: self.min_ack_pending,
2401 group: self.group,
2402 };
2403 Batch::batch(config, self.consumer).await
2404 }
2405}
2406
2407/// Used for next Pull Request for Pull Consumer
2408#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2409pub struct BatchConfig {
2410 /// The number of messages that are being requested to be delivered.
2411 pub batch: usize,
2412 /// The optional number of nanoseconds that the server will store this next request for
2413 /// before forgetting about the pending batch size.
2414 #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2415 pub expires: Option<Duration>,
2416 /// This optionally causes the server not to store this pending request at all, but when there are no
2417 /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2418 /// can know when you reached the end of the stream for example. A 409 is returned if the
2419 /// Consumer has reached MaxAckPending limits.
2420 #[serde(skip_serializing_if = "is_default")]
2421 pub no_wait: bool,
2422
2423 /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2424 /// Whichever value is reached first, batch will complete.
2425 pub max_bytes: usize,
2426
2427 /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2428 /// client
2429 #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2430 pub idle_heartbeat: Duration,
2431
2432 pub min_pending: Option<usize>,
2433 pub min_ack_pending: Option<usize>,
2434 pub group: Option<String>,
2435}
2436
2437fn is_default<T: Default + Eq>(t: &T) -> bool {
2438 t == &T::default()
2439}
2440
2441#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2442pub struct Config {
2443 /// Setting `durable_name` to `Some(...)` will cause this consumer
2444 /// to be "durable". This may be a good choice for workloads that
2445 /// benefit from the `JetStream` server or cluster remembering the
2446 /// progress of consumers for fault tolerance purposes. If a consumer
2447 /// crashes, the `JetStream` server or cluster will remember which
2448 /// messages the consumer acknowledged. When the consumer recovers,
2449 /// this information will allow the consumer to resume processing
2450 /// where it left off. If you're unsure, set this to `Some(...)`.
2451 ///
2452 /// Setting `durable_name` to `None` will cause this consumer to
2453 /// be "ephemeral". This may be a good choice for workloads where
2454 /// you don't need the `JetStream` server to remember the consumer's
2455 /// progress in the case of a crash, such as certain "high churn"
2456 /// workloads or workloads where a crashed instance is not required
2457 /// to recover.
2458 #[serde(default, skip_serializing_if = "Option::is_none")]
2459 pub durable_name: Option<String>,
2460 /// A name of the consumer. Can be specified for both durable and ephemeral
2461 /// consumers.
2462 #[serde(default, skip_serializing_if = "Option::is_none")]
2463 pub name: Option<String>,
2464 /// A short description of the purpose of this consumer.
2465 #[serde(default, skip_serializing_if = "Option::is_none")]
2466 pub description: Option<String>,
2467 /// Allows for a variety of options that determine how this consumer will receive messages
2468 #[serde(flatten)]
2469 pub deliver_policy: DeliverPolicy,
2470 /// How messages should be acknowledged
2471 pub ack_policy: AckPolicy,
2472 /// How long to allow messages to remain un-acknowledged before attempting redelivery
2473 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2474 pub ack_wait: Duration,
2475 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2476 #[serde(default, skip_serializing_if = "is_default")]
2477 pub max_deliver: i64,
2478 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2479 #[serde(default, skip_serializing_if = "is_default")]
2480 pub filter_subject: String,
2481 #[cfg(feature = "server_2_10")]
2482 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2483 #[serde(default, skip_serializing_if = "is_default")]
2484 pub filter_subjects: Vec<String>,
2485 /// Whether messages are sent as quickly as possible or at the rate of receipt
2486 pub replay_policy: ReplayPolicy,
2487 /// The rate of message delivery in bits per second
2488 #[serde(default, skip_serializing_if = "is_default")]
2489 pub rate_limit: u64,
2490 /// What percentage of acknowledgments should be samples for observability, 0-100
2491 #[serde(
2492 rename = "sample_freq",
2493 with = "super::sample_freq_deser",
2494 default,
2495 skip_serializing_if = "is_default"
2496 )]
2497 pub sample_frequency: u8,
2498 /// The maximum number of waiting consumers.
2499 #[serde(default, skip_serializing_if = "is_default")]
2500 pub max_waiting: i64,
2501 /// The maximum number of unacknowledged messages that may be
2502 /// in-flight before pausing sending additional messages to
2503 /// this consumer.
2504 #[serde(default, skip_serializing_if = "is_default")]
2505 pub max_ack_pending: i64,
2506 /// Only deliver headers without payloads.
2507 #[serde(default, skip_serializing_if = "is_default")]
2508 pub headers_only: bool,
2509 /// Maximum size of a request batch
2510 #[serde(default, skip_serializing_if = "is_default")]
2511 pub max_batch: i64,
2512 /// Maximum value of request max_bytes
2513 #[serde(default, skip_serializing_if = "is_default")]
2514 pub max_bytes: i64,
2515 /// Maximum value for request expiration
2516 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2517 pub max_expires: Duration,
2518 /// Threshold for consumer inactivity
2519 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2520 pub inactive_threshold: Duration,
2521 /// Number of consumer replicas
2522 #[serde(default, skip_serializing_if = "is_default")]
2523 pub num_replicas: usize,
2524 /// Force consumer to use memory storage.
2525 #[serde(default, skip_serializing_if = "is_default")]
2526 pub memory_storage: bool,
2527 #[cfg(feature = "server_2_10")]
2528 // Additional consumer metadata.
2529 #[serde(default, skip_serializing_if = "is_default")]
2530 pub metadata: HashMap<String, String>,
2531 /// Custom backoff for missed acknowledgments.
2532 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2533 pub backoff: Vec<Duration>,
2534
2535 /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2536 #[cfg(feature = "server_2_11")]
2537 #[serde(default, skip_serializing_if = "is_default")]
2538 pub priority_policy: PriorityPolicy,
2539 /// Priority groups for this consumer. Currently only one group is supported and is used
2540 /// in conjunction with [Config::priority_policy].
2541 #[cfg(feature = "server_2_11")]
2542 #[serde(default, skip_serializing_if = "is_default")]
2543 pub priority_groups: Vec<String>,
2544 /// For suspending the consumer until the deadline.
2545 #[cfg(feature = "server_2_11")]
2546 #[serde(
2547 default,
2548 with = "rfc3339::option",
2549 skip_serializing_if = "Option::is_none"
2550 )]
2551 pub pause_until: Option<OffsetDateTime>,
2552}
2553
2554impl IntoConsumerConfig for &Config {
2555 fn into_consumer_config(self) -> consumer::Config {
2556 self.clone().into_consumer_config()
2557 }
2558}
2559
2560impl IntoConsumerConfig for Config {
2561 fn into_consumer_config(self) -> consumer::Config {
2562 jetstream::consumer::Config {
2563 deliver_subject: None,
2564 name: self.name,
2565 durable_name: self.durable_name,
2566 description: self.description,
2567 deliver_group: None,
2568 deliver_policy: self.deliver_policy,
2569 ack_policy: self.ack_policy,
2570 ack_wait: self.ack_wait,
2571 max_deliver: self.max_deliver,
2572 filter_subject: self.filter_subject,
2573 #[cfg(feature = "server_2_10")]
2574 filter_subjects: self.filter_subjects,
2575 replay_policy: self.replay_policy,
2576 rate_limit: self.rate_limit,
2577 sample_frequency: self.sample_frequency,
2578 max_waiting: self.max_waiting,
2579 max_ack_pending: self.max_ack_pending,
2580 headers_only: self.headers_only,
2581 flow_control: false,
2582 idle_heartbeat: Duration::default(),
2583 max_batch: self.max_batch,
2584 max_bytes: self.max_bytes,
2585 max_expires: self.max_expires,
2586 inactive_threshold: self.inactive_threshold,
2587 num_replicas: self.num_replicas,
2588 memory_storage: self.memory_storage,
2589 #[cfg(feature = "server_2_10")]
2590 metadata: self.metadata,
2591 backoff: self.backoff,
2592 #[cfg(feature = "server_2_11")]
2593 priority_policy: self.priority_policy,
2594 #[cfg(feature = "server_2_11")]
2595 priority_groups: self.priority_groups,
2596 #[cfg(feature = "server_2_11")]
2597 pause_until: self.pause_until,
2598 }
2599 }
2600}
2601impl FromConsumer for Config {
2602 fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2603 if config.deliver_subject.is_some() {
2604 return Err(Box::new(std::io::Error::new(
2605 std::io::ErrorKind::Other,
2606 "pull consumer cannot have delivery subject",
2607 )));
2608 }
2609 Ok(Config {
2610 durable_name: config.durable_name,
2611 name: config.name,
2612 description: config.description,
2613 deliver_policy: config.deliver_policy,
2614 ack_policy: config.ack_policy,
2615 ack_wait: config.ack_wait,
2616 max_deliver: config.max_deliver,
2617 filter_subject: config.filter_subject,
2618 #[cfg(feature = "server_2_10")]
2619 filter_subjects: config.filter_subjects,
2620 replay_policy: config.replay_policy,
2621 rate_limit: config.rate_limit,
2622 sample_frequency: config.sample_frequency,
2623 max_waiting: config.max_waiting,
2624 max_ack_pending: config.max_ack_pending,
2625 headers_only: config.headers_only,
2626 max_batch: config.max_batch,
2627 max_bytes: config.max_bytes,
2628 max_expires: config.max_expires,
2629 inactive_threshold: config.inactive_threshold,
2630 num_replicas: config.num_replicas,
2631 memory_storage: config.memory_storage,
2632 #[cfg(feature = "server_2_10")]
2633 metadata: config.metadata,
2634 backoff: config.backoff,
2635 #[cfg(feature = "server_2_11")]
2636 priority_policy: config.priority_policy,
2637 #[cfg(feature = "server_2_11")]
2638 priority_groups: config.priority_groups,
2639 #[cfg(feature = "server_2_11")]
2640 pause_until: config.pause_until,
2641 })
2642 }
2643}
2644
2645#[derive(Clone, Copy, Debug, PartialEq)]
2646pub enum BatchRequestErrorKind {
2647 Publish,
2648 Flush,
2649 Serialize,
2650}
2651
2652impl std::fmt::Display for BatchRequestErrorKind {
2653 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2654 match self {
2655 Self::Publish => write!(f, "publish failed"),
2656 Self::Flush => write!(f, "flush failed"),
2657 Self::Serialize => write!(f, "serialize failed"),
2658 }
2659 }
2660}
2661
2662pub type BatchRequestError = Error<BatchRequestErrorKind>;
2663
2664#[derive(Clone, Copy, Debug, PartialEq)]
2665pub enum BatchErrorKind {
2666 Subscribe,
2667 Pull,
2668 Flush,
2669 Serialize,
2670}
2671
2672impl std::fmt::Display for BatchErrorKind {
2673 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2674 match self {
2675 Self::Pull => write!(f, "pull request failed"),
2676 Self::Flush => write!(f, "flush failed"),
2677 Self::Serialize => write!(f, "serialize failed"),
2678 Self::Subscribe => write!(f, "subscribe failed"),
2679 }
2680 }
2681}
2682
2683pub type BatchError = Error<BatchErrorKind>;
2684
2685impl From<SubscribeError> for BatchError {
2686 fn from(err: SubscribeError) -> Self {
2687 BatchError::with_source(BatchErrorKind::Subscribe, err)
2688 }
2689}
2690
2691impl From<BatchRequestError> for BatchError {
2692 fn from(err: BatchRequestError) -> Self {
2693 BatchError::with_source(BatchErrorKind::Pull, err)
2694 }
2695}
2696
2697#[derive(Clone, Copy, Debug, PartialEq)]
2698pub enum ConsumerRecreateErrorKind {
2699 GetStream,
2700 Recreate,
2701 TimedOut,
2702}
2703
2704impl std::fmt::Display for ConsumerRecreateErrorKind {
2705 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2706 match self {
2707 Self::GetStream => write!(f, "error getting stream"),
2708 Self::Recreate => write!(f, "consumer creation failed"),
2709 Self::TimedOut => write!(f, "timed out"),
2710 }
2711 }
2712}
2713
2714pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2715
2716async fn recreate_consumer_stream(
2717 context: &Context,
2718 config: &OrderedConfig,
2719 stream_name: &str,
2720 consumer_name: &str,
2721 sequence: u64,
2722) -> Result<Stream, ConsumerRecreateError> {
2723 let span = tracing::span!(
2724 tracing::Level::DEBUG,
2725 "recreate_ordered_consumer",
2726 stream_name = stream_name,
2727 consumer_name = consumer_name,
2728 sequence = sequence
2729 );
2730 let _span_handle = span.enter();
2731 let config = config.to_owned();
2732 trace!("delete old consumer before creating new one");
2733
2734 tokio::time::timeout(
2735 Duration::from_secs(5),
2736 context.delete_consumer_from_stream(consumer_name, stream_name),
2737 )
2738 .await
2739 .ok();
2740
2741 let deliver_policy = {
2742 if sequence == 0 {
2743 DeliverPolicy::All
2744 } else {
2745 DeliverPolicy::ByStartSequence {
2746 start_sequence: sequence + 1,
2747 }
2748 }
2749 };
2750 trace!("create the new ordered consumer for sequence {}", sequence);
2751 let consumer = tokio::time::timeout(
2752 Duration::from_secs(5),
2753 context.create_consumer_on_stream(
2754 jetstream::consumer::pull::OrderedConfig {
2755 deliver_policy,
2756 ..config.clone()
2757 },
2758 stream_name,
2759 ),
2760 )
2761 .await
2762 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2763 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2764
2765 let config = Consumer {
2766 config: config.clone().into(),
2767 context: context.clone(),
2768 info: consumer.info,
2769 };
2770
2771 trace!("create iterator");
2772 let stream = tokio::time::timeout(
2773 Duration::from_secs(5),
2774 Stream::stream(
2775 BatchConfig {
2776 batch: 500,
2777 expires: Some(Duration::from_secs(30)),
2778 no_wait: false,
2779 max_bytes: 0,
2780 idle_heartbeat: Duration::from_secs(15),
2781 min_pending: None,
2782 min_ack_pending: None,
2783 group: None,
2784 },
2785 &config,
2786 ),
2787 )
2788 .await
2789 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2790 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2791 trace!("recreated consumer");
2792 stream
2793}