async_nats/jetstream/consumer/pull.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16 future::{BoxFuture, Either},
17 FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32 connection::State,
33 error::Error,
34 jetstream::{self, Context},
35 StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44 AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45 StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50 /// Returns a stream of messages for Pull Consumer.
51 ///
52 /// # Example
53 ///
54 /// ```no_run
55 /// # #[tokio::main]
56 /// # async fn mains() -> Result<(), async_nats::Error> {
57 /// use futures::StreamExt;
58 /// use futures::TryStreamExt;
59 ///
60 /// let client = async_nats::connect("localhost:4222").await?;
61 /// let jetstream = async_nats::jetstream::new(client);
62 ///
63 /// let stream = jetstream
64 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
65 /// name: "events".to_string(),
66 /// max_messages: 10_000,
67 /// ..Default::default()
68 /// })
69 /// .await?;
70 ///
71 /// jetstream.publish("events", "data".into()).await?;
72 ///
73 /// let consumer = stream
74 /// .get_or_create_consumer(
75 /// "consumer",
76 /// async_nats::jetstream::consumer::pull::Config {
77 /// durable_name: Some("consumer".to_string()),
78 /// ..Default::default()
79 /// },
80 /// )
81 /// .await?;
82 ///
83 /// let mut messages = consumer.messages().await?.take(100);
84 /// while let Some(Ok(message)) = messages.next().await {
85 /// println!("got message {:?}", message);
86 /// message.ack().await?;
87 /// }
88 /// Ok(())
89 /// # }
90 /// ```
91 pub async fn messages(&self) -> Result<Stream, StreamError> {
92 Stream::stream(
93 BatchConfig {
94 batch: 200,
95 expires: Some(Duration::from_secs(30)),
96 no_wait: false,
97 max_bytes: 0,
98 idle_heartbeat: Duration::from_secs(15),
99 min_pending: None,
100 min_ack_pending: None,
101 group: None,
102 },
103 self,
104 )
105 .await
106 }
107
108 /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
109 /// messages or bytes buffered.
110 ///
111 /// # Examples
112 ///
113 /// ```no_run
114 /// # #[tokio::main]
115 /// # async fn main() -> Result<(), async_nats::Error> {
116 /// use async_nats::jetstream::consumer::PullConsumer;
117 /// use futures::StreamExt;
118 /// let client = async_nats::connect("localhost:4222").await?;
119 /// let jetstream = async_nats::jetstream::new(client);
120 ///
121 /// let consumer: PullConsumer = jetstream
122 /// .get_stream("events")
123 /// .await?
124 /// .get_consumer("pull")
125 /// .await?;
126 ///
127 /// let mut messages = consumer
128 /// .stream()
129 /// .max_messages_per_batch(100)
130 /// .max_bytes_per_batch(1024)
131 /// .messages()
132 /// .await?;
133 ///
134 /// while let Some(message) = messages.next().await {
135 /// let message = message?;
136 /// println!("message: {:?}", message);
137 /// message.ack().await?;
138 /// }
139 /// # Ok(())
140 /// # }
141 /// ```
142 pub fn stream(&self) -> StreamBuilder<'_> {
143 StreamBuilder::new(self)
144 }
145
146 pub async fn request_batch<I: Into<BatchConfig>>(
147 &self,
148 batch: I,
149 inbox: Subject,
150 ) -> Result<(), BatchRequestError> {
151 debug!("sending batch");
152 let subject = format!(
153 "{}.CONSUMER.MSG.NEXT.{}.{}",
154 self.context.prefix, self.info.stream_name, self.info.name
155 );
156
157 let payload = serde_json::to_vec(&batch.into())
158 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
159
160 self.context
161 .client
162 .publish_with_reply(subject, inbox, payload.into())
163 .await
164 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
165 debug!("batch request sent");
166 Ok(())
167 }
168
169 /// Returns a batch of specified number of messages, or if there are less messages on the
170 /// [Stream] than requested, returns all available messages.
171 ///
172 /// # Example
173 ///
174 /// ```no_run
175 /// # #[tokio::main]
176 /// # async fn mains() -> Result<(), async_nats::Error> {
177 /// use futures::StreamExt;
178 /// use futures::TryStreamExt;
179 ///
180 /// let client = async_nats::connect("localhost:4222").await?;
181 /// let jetstream = async_nats::jetstream::new(client);
182 ///
183 /// let stream = jetstream
184 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
185 /// name: "events".to_string(),
186 /// max_messages: 10_000,
187 /// ..Default::default()
188 /// })
189 /// .await?;
190 ///
191 /// jetstream.publish("events", "data".into()).await?;
192 ///
193 /// let consumer = stream
194 /// .get_or_create_consumer(
195 /// "consumer",
196 /// async_nats::jetstream::consumer::pull::Config {
197 /// durable_name: Some("consumer".to_string()),
198 /// ..Default::default()
199 /// },
200 /// )
201 /// .await?;
202 ///
203 /// for _ in 0..100 {
204 /// jetstream.publish("events", "data".into()).await?;
205 /// }
206 ///
207 /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
208 /// // will finish after 100 messages, as that is the number of messages available on the
209 /// // stream.
210 /// while let Some(Ok(message)) = messages.next().await {
211 /// println!("got message {:?}", message);
212 /// message.ack().await?;
213 /// }
214 /// Ok(())
215 /// # }
216 /// ```
217 pub fn fetch(&self) -> FetchBuilder {
218 FetchBuilder::new(self)
219 }
220
221 /// Returns a batch of specified number of messages unless timeout happens first.
222 ///
223 /// # Example
224 ///
225 /// ```no_run
226 /// # #[tokio::main]
227 /// # async fn mains() -> Result<(), async_nats::Error> {
228 /// use futures::StreamExt;
229 /// use futures::TryStreamExt;
230 ///
231 /// let client = async_nats::connect("localhost:4222").await?;
232 /// let jetstream = async_nats::jetstream::new(client);
233 ///
234 /// let stream = jetstream
235 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
236 /// name: "events".to_string(),
237 /// max_messages: 10_000,
238 /// ..Default::default()
239 /// })
240 /// .await?;
241 ///
242 /// jetstream.publish("events", "data".into()).await?;
243 ///
244 /// let consumer = stream
245 /// .get_or_create_consumer(
246 /// "consumer",
247 /// async_nats::jetstream::consumer::pull::Config {
248 /// durable_name: Some("consumer".to_string()),
249 /// ..Default::default()
250 /// },
251 /// )
252 /// .await?;
253 ///
254 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
255 /// while let Some(Ok(message)) = messages.next().await {
256 /// println!("got message {:?}", message);
257 /// message.ack().await?;
258 /// }
259 /// Ok(())
260 /// # }
261 /// ```
262 pub fn batch(&self) -> BatchBuilder {
263 BatchBuilder::new(self)
264 }
265
266 /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
267 /// messages in those batches.
268 ///
269 /// # Example
270 ///
271 /// ```no_run
272 /// # #[tokio::main]
273 /// # async fn mains() -> Result<(), async_nats::Error> {
274 /// use futures::StreamExt;
275 /// use futures::TryStreamExt;
276 ///
277 /// let client = async_nats::connect("localhost:4222").await?;
278 /// let jetstream = async_nats::jetstream::new(client);
279 ///
280 /// let stream = jetstream
281 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
282 /// name: "events".to_string(),
283 /// max_messages: 10_000,
284 /// ..Default::default()
285 /// })
286 /// .await?;
287 ///
288 /// jetstream.publish("events", "data".into()).await?;
289 ///
290 /// let consumer = stream
291 /// .get_or_create_consumer(
292 /// "consumer",
293 /// async_nats::jetstream::consumer::pull::Config {
294 /// durable_name: Some("consumer".to_string()),
295 /// ..Default::default()
296 /// },
297 /// )
298 /// .await?;
299 ///
300 /// let mut iter = consumer.sequence(50).unwrap().take(10);
301 /// while let Ok(Some(mut batch)) = iter.try_next().await {
302 /// while let Ok(Some(message)) = batch.try_next().await {
303 /// println!("message received: {:?}", message);
304 /// }
305 /// }
306 /// Ok(())
307 /// # }
308 /// ```
309 pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
310 let context = self.context.clone();
311 let subject = format!(
312 "{}.CONSUMER.MSG.NEXT.{}.{}",
313 self.context.prefix, self.info.stream_name, self.info.name
314 );
315
316 let request = serde_json::to_vec(&BatchConfig {
317 batch,
318 expires: Some(Duration::from_secs(60)),
319 ..Default::default()
320 })
321 .map(Bytes::from)
322 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
323
324 Ok(Sequence {
325 context,
326 subject,
327 request,
328 pending_messages: batch,
329 next: None,
330 })
331 }
332}
333
334pub struct Batch {
335 pending_messages: usize,
336 subscriber: Subscriber,
337 context: Context,
338 timeout: Option<Pin<Box<Sleep>>>,
339 terminated: bool,
340}
341
342impl Batch {
343 async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
344 let inbox = Subject::from(consumer.context.client.new_inbox());
345 let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
346 consumer.request_batch(batch.clone(), inbox.clone()).await?;
347
348 let sleep = batch.expires.map(|expires| {
349 Box::pin(tokio::time::sleep(
350 expires.saturating_add(Duration::from_secs(5)),
351 ))
352 });
353
354 Ok(Batch {
355 pending_messages: batch.batch,
356 subscriber: subscription,
357 context: consumer.context.clone(),
358 terminated: false,
359 timeout: sleep,
360 })
361 }
362}
363
364impl futures::Stream for Batch {
365 type Item = Result<jetstream::Message, crate::Error>;
366
367 fn poll_next(
368 mut self: std::pin::Pin<&mut Self>,
369 cx: &mut std::task::Context<'_>,
370 ) -> std::task::Poll<Option<Self::Item>> {
371 if self.terminated {
372 return Poll::Ready(None);
373 }
374 if self.pending_messages == 0 {
375 self.terminated = true;
376 return Poll::Ready(None);
377 }
378 if let Some(sleep) = self.timeout.as_mut() {
379 match sleep.poll_unpin(cx) {
380 Poll::Ready(_) => {
381 debug!("batch timeout timer triggered");
382 // TODO(tp): Maybe we can be smarter here and before timing out, check if
383 // we consumed all the messages from the subscription buffer in case of user
384 // slowly consuming messages. Keep in mind that we time out here only if
385 // for some reason we missed timeout from the server and few seconds have
386 // passed since expected timeout message.
387 self.terminated = true;
388 return Poll::Ready(None);
389 }
390 Poll::Pending => (),
391 }
392 }
393 match self.subscriber.receiver.poll_recv(cx) {
394 Poll::Ready(maybe_message) => match maybe_message {
395 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
396 StatusCode::TIMEOUT => {
397 debug!("received timeout. Iterator done");
398 self.terminated = true;
399 Poll::Ready(None)
400 }
401 StatusCode::IDLE_HEARTBEAT => {
402 debug!("received heartbeat");
403 Poll::Pending
404 }
405 // If this is fetch variant, terminate on no more messages.
406 // We do not need to check if this is a fetch, not batch,
407 // as only fetch will send back `NO_MESSAGES` status.
408 StatusCode::NOT_FOUND => {
409 debug!("received `NO_MESSAGES`. Iterator done");
410 self.terminated = true;
411 Poll::Ready(None)
412 }
413 StatusCode::OK => {
414 debug!("received message");
415 self.pending_messages -= 1;
416 Poll::Ready(Some(Ok(jetstream::Message {
417 context: self.context.clone(),
418 message,
419 })))
420 }
421 status => {
422 debug!("received error");
423 self.terminated = true;
424 Poll::Ready(Some(Err(Box::new(std::io::Error::new(
425 std::io::ErrorKind::Other,
426 format!(
427 "error while processing messages from the stream: {}, {:?}",
428 status, message.description
429 ),
430 )))))
431 }
432 },
433 None => Poll::Ready(None),
434 },
435 std::task::Poll::Pending => std::task::Poll::Pending,
436 }
437 }
438}
439
440pub struct Sequence {
441 context: Context,
442 subject: String,
443 request: Bytes,
444 pending_messages: usize,
445 next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
446}
447
448impl futures::Stream for Sequence {
449 type Item = Result<Batch, MessagesError>;
450
451 fn poll_next(
452 mut self: std::pin::Pin<&mut Self>,
453 cx: &mut std::task::Context<'_>,
454 ) -> std::task::Poll<Option<Self::Item>> {
455 match self.next.as_mut() {
456 None => {
457 let context = self.context.clone();
458 let subject = self.subject.clone();
459 let request = self.request.clone();
460 let pending_messages = self.pending_messages;
461
462 let next = self.next.insert(Box::pin(async move {
463 let inbox = context.client.new_inbox();
464 let subscriber = context
465 .client
466 .subscribe(inbox.clone())
467 .await
468 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
469
470 context
471 .client
472 .publish_with_reply(subject, inbox, request)
473 .await
474 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
475
476 // TODO(tp): Add timeout config and defaults.
477 Ok(Batch {
478 pending_messages,
479 subscriber,
480 context,
481 terminated: false,
482 timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
483 })
484 }));
485
486 match next.as_mut().poll(cx) {
487 Poll::Ready(result) => {
488 self.next = None;
489 Poll::Ready(Some(result.map_err(|err| {
490 MessagesError::with_source(MessagesErrorKind::Pull, err)
491 })))
492 }
493 Poll::Pending => Poll::Pending,
494 }
495 }
496
497 Some(next) => match next.as_mut().poll(cx) {
498 Poll::Ready(result) => {
499 self.next = None;
500 Poll::Ready(Some(result.map_err(|err| {
501 MessagesError::with_source(MessagesErrorKind::Pull, err)
502 })))
503 }
504 Poll::Pending => Poll::Pending,
505 },
506 }
507 }
508}
509
510impl Consumer<OrderedConfig> {
511 /// Returns a stream of messages for Ordered Pull Consumer.
512 ///
513 /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
514 /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
515 /// sees mismatch.
516 ///
517 /// # Example
518 ///
519 /// ```no_run
520 /// # #[tokio::main]
521 /// # async fn mains() -> Result<(), async_nats::Error> {
522 /// use futures::StreamExt;
523 /// use futures::TryStreamExt;
524 ///
525 /// let client = async_nats::connect("localhost:4222").await?;
526 /// let jetstream = async_nats::jetstream::new(client);
527 ///
528 /// let stream = jetstream
529 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
530 /// name: "events".to_string(),
531 /// max_messages: 10_000,
532 /// ..Default::default()
533 /// })
534 /// .await?;
535 ///
536 /// jetstream.publish("events", "data".into()).await?;
537 ///
538 /// let consumer = stream
539 /// .get_or_create_consumer(
540 /// "consumer",
541 /// async_nats::jetstream::consumer::pull::OrderedConfig {
542 /// name: Some("consumer".to_string()),
543 /// ..Default::default()
544 /// },
545 /// )
546 /// .await?;
547 ///
548 /// let mut messages = consumer.messages().await?.take(100);
549 /// while let Some(Ok(message)) = messages.next().await {
550 /// println!("got message {:?}", message);
551 /// }
552 /// Ok(())
553 /// # }
554 /// ```
555 pub async fn messages(self) -> Result<Ordered, StreamError> {
556 let config = Consumer {
557 config: self.config.clone().into(),
558 context: self.context.clone(),
559 info: self.info.clone(),
560 };
561 let stream = Stream::stream(
562 BatchConfig {
563 batch: 500,
564 expires: Some(Duration::from_secs(30)),
565 no_wait: false,
566 max_bytes: 0,
567 idle_heartbeat: Duration::from_secs(15),
568 min_pending: None,
569 min_ack_pending: None,
570 group: None,
571 },
572 &config,
573 )
574 .await?;
575
576 Ok(Ordered {
577 consumer_sequence: 0,
578 stream_sequence: 0,
579 missed_heartbeats: false,
580 create_stream: None,
581 context: self.context.clone(),
582 consumer_name: self
583 .config
584 .name
585 .clone()
586 .unwrap_or_else(|| self.context.client.new_inbox()),
587 consumer: self.config,
588 stream: Some(stream),
589 stream_name: self.info.stream_name.clone(),
590 })
591 }
592}
593
594/// Configuration for consumers. From a high level, the
595/// `durable_name` and `deliver_subject` fields have a particularly
596/// strong influence on the consumer's overall behavior.
597#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
598pub struct OrderedConfig {
599 /// A name of the consumer. Can be specified for both durable and ephemeral
600 /// consumers.
601 #[serde(default, skip_serializing_if = "Option::is_none")]
602 pub name: Option<String>,
603 /// A short description of the purpose of this consumer.
604 #[serde(default, skip_serializing_if = "Option::is_none")]
605 pub description: Option<String>,
606 #[serde(default, skip_serializing_if = "is_default")]
607 pub filter_subject: String,
608 #[cfg(feature = "server_2_10")]
609 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
610 #[serde(default, skip_serializing_if = "is_default")]
611 pub filter_subjects: Vec<String>,
612 /// Whether messages are sent as quickly as possible or at the rate of receipt
613 pub replay_policy: ReplayPolicy,
614 /// The rate of message delivery in bits per second
615 #[serde(default, skip_serializing_if = "is_default")]
616 pub rate_limit: u64,
617 /// What percentage of acknowledgments should be samples for observability, 0-100
618 #[serde(
619 rename = "sample_freq",
620 with = "super::sample_freq_deser",
621 default,
622 skip_serializing_if = "is_default"
623 )]
624 pub sample_frequency: u8,
625 /// Only deliver headers without payloads.
626 #[serde(default, skip_serializing_if = "is_default")]
627 pub headers_only: bool,
628 /// Allows for a variety of options that determine how this consumer will receive messages
629 #[serde(flatten)]
630 pub deliver_policy: DeliverPolicy,
631 /// The maximum number of waiting consumers.
632 #[serde(default, skip_serializing_if = "is_default")]
633 pub max_waiting: i64,
634 #[cfg(feature = "server_2_10")]
635 // Additional consumer metadata.
636 #[serde(default, skip_serializing_if = "is_default")]
637 pub metadata: HashMap<String, String>,
638 // Maximum number of messages that can be requested in single Pull Request.
639 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
640 // [stream]
641 pub max_batch: i64,
642 // Maximum number of bytes that can be requested in single Pull Request.
643 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
644 // [stream]
645 pub max_bytes: i64,
646 // Maximum expiry that can be set for a single Pull Request.
647 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
648 // [stream]
649 pub max_expires: Duration,
650}
651
652impl From<OrderedConfig> for Config {
653 fn from(config: OrderedConfig) -> Self {
654 Config {
655 durable_name: None,
656 name: config.name,
657 description: config.description,
658 deliver_policy: config.deliver_policy,
659 ack_policy: AckPolicy::None,
660 ack_wait: Duration::default(),
661 max_deliver: 1,
662 filter_subject: config.filter_subject,
663 #[cfg(feature = "server_2_10")]
664 filter_subjects: config.filter_subjects,
665 replay_policy: config.replay_policy,
666 rate_limit: config.rate_limit,
667 sample_frequency: config.sample_frequency,
668 max_waiting: config.max_waiting,
669 max_ack_pending: 0,
670 headers_only: config.headers_only,
671 max_batch: config.max_batch,
672 max_bytes: config.max_bytes,
673 max_expires: config.max_expires,
674 inactive_threshold: Duration::from_secs(30),
675 num_replicas: 1,
676 memory_storage: true,
677 #[cfg(feature = "server_2_10")]
678 metadata: config.metadata,
679 backoff: Vec::new(),
680 #[cfg(feature = "server_2_11")]
681 priority_policy: PriorityPolicy::None,
682 #[cfg(feature = "server_2_11")]
683 priority_groups: Vec::new(),
684 #[cfg(feature = "server_2_11")]
685 pause_until: None,
686 }
687 }
688}
689
690impl FromConsumer for OrderedConfig {
691 fn try_from_consumer_config(
692 config: crate::jetstream::consumer::Config,
693 ) -> Result<Self, crate::Error>
694 where
695 Self: Sized,
696 {
697 Ok(OrderedConfig {
698 name: config.name,
699 description: config.description,
700 filter_subject: config.filter_subject,
701 #[cfg(feature = "server_2_10")]
702 filter_subjects: config.filter_subjects,
703 replay_policy: config.replay_policy,
704 rate_limit: config.rate_limit,
705 sample_frequency: config.sample_frequency,
706 headers_only: config.headers_only,
707 deliver_policy: config.deliver_policy,
708 max_waiting: config.max_waiting,
709 #[cfg(feature = "server_2_10")]
710 metadata: config.metadata,
711 max_batch: config.max_batch,
712 max_bytes: config.max_bytes,
713 max_expires: config.max_expires,
714 })
715 }
716}
717
718impl IntoConsumerConfig for OrderedConfig {
719 fn into_consumer_config(self) -> super::Config {
720 jetstream::consumer::Config {
721 deliver_subject: None,
722 durable_name: None,
723 name: self.name,
724 description: self.description,
725 deliver_group: None,
726 deliver_policy: self.deliver_policy,
727 ack_policy: AckPolicy::None,
728 ack_wait: Duration::default(),
729 max_deliver: 1,
730 filter_subject: self.filter_subject,
731 #[cfg(feature = "server_2_10")]
732 filter_subjects: self.filter_subjects,
733 replay_policy: self.replay_policy,
734 rate_limit: self.rate_limit,
735 sample_frequency: self.sample_frequency,
736 max_waiting: self.max_waiting,
737 max_ack_pending: 0,
738 headers_only: self.headers_only,
739 flow_control: false,
740 idle_heartbeat: Duration::default(),
741 max_batch: 0,
742 max_bytes: 0,
743 max_expires: Duration::default(),
744 inactive_threshold: Duration::from_secs(30),
745 num_replicas: 1,
746 memory_storage: true,
747 #[cfg(feature = "server_2_10")]
748 metadata: self.metadata,
749 backoff: Vec::new(),
750 #[cfg(feature = "server_2_11")]
751 priority_policy: PriorityPolicy::None,
752 #[cfg(feature = "server_2_11")]
753 priority_groups: Vec::new(),
754 #[cfg(feature = "server_2_11")]
755 pause_until: None,
756 }
757 }
758}
759
760pub struct Ordered {
761 context: Context,
762 stream_name: String,
763 consumer: OrderedConfig,
764 consumer_name: String,
765 stream: Option<Stream>,
766 create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
767 consumer_sequence: u64,
768 stream_sequence: u64,
769 missed_heartbeats: bool,
770}
771
772impl futures::Stream for Ordered {
773 type Item = Result<jetstream::Message, OrderedError>;
774
775 fn poll_next(
776 mut self: Pin<&mut Self>,
777 cx: &mut std::task::Context<'_>,
778 ) -> Poll<Option<Self::Item>> {
779 let mut recreate = false;
780 // Poll messages
781 if let Some(stream) = self.stream.as_mut() {
782 match stream.poll_next_unpin(cx) {
783 Poll::Ready(message) => match message {
784 Some(message) => match message {
785 Ok(message) => {
786 self.missed_heartbeats = false;
787 let info = message.info().map_err(|err| {
788 OrderedError::with_source(OrderedErrorKind::Other, err)
789 })?;
790 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
791 self.consumer_sequence,
792 self.stream_sequence,
793 info.consumer_sequence,
794 info.stream_sequence);
795 if info.consumer_sequence != self.consumer_sequence + 1 {
796 debug!(
797 "ordered consumer mismatch. current {}, info: {}",
798 self.consumer_sequence, info.consumer_sequence
799 );
800 recreate = true;
801 self.consumer_sequence = 0;
802 } else {
803 self.stream_sequence = info.stream_sequence;
804 self.consumer_sequence = info.consumer_sequence;
805 return Poll::Ready(Some(Ok(message)));
806 }
807 }
808 Err(err) => match err.kind() {
809 MessagesErrorKind::MissingHeartbeat => {
810 // If we have missed heartbeats set, it means this is a second
811 // missed heartbeat, so we need to recreate consumer.
812 if self.missed_heartbeats {
813 self.consumer_sequence = 0;
814 recreate = true;
815 } else {
816 self.missed_heartbeats = true;
817 }
818 }
819 MessagesErrorKind::ConsumerDeleted => {
820 recreate = true;
821 self.consumer_sequence = 0;
822 }
823 MessagesErrorKind::Pull
824 | MessagesErrorKind::PushBasedConsumer
825 | MessagesErrorKind::Other => {
826 return Poll::Ready(Some(Err(err.into())));
827 }
828 },
829 },
830 None => return Poll::Ready(None),
831 },
832 Poll::Pending => (),
833 }
834 }
835 // Recreate consumer if needed
836 if recreate {
837 self.stream = None;
838 self.create_stream = Some(Box::pin({
839 let context = self.context.clone();
840 let config = self.consumer.clone();
841 let stream_name = self.stream_name.clone();
842 let consumer_name = self.consumer_name.clone();
843 let sequence = self.stream_sequence;
844 async move {
845 tryhard::retry_fn(|| {
846 recreate_consumer_stream(
847 &context,
848 &config,
849 &stream_name,
850 &consumer_name,
851 sequence,
852 )
853 })
854 .retries(5)
855 .exponential_backoff(Duration::from_millis(500))
856 .await
857 }
858 }))
859 }
860 // check for recreation future
861 if let Some(result) = self.create_stream.as_mut() {
862 match result.poll_unpin(cx) {
863 Poll::Ready(result) => match result {
864 Ok(stream) => {
865 self.create_stream = None;
866 self.stream = Some(stream);
867 return self.poll_next(cx);
868 }
869 Err(err) => {
870 return Poll::Ready(Some(Err(OrderedError::with_source(
871 OrderedErrorKind::Recreate,
872 err,
873 ))))
874 }
875 },
876 Poll::Pending => (),
877 }
878 }
879 Poll::Pending
880 }
881}
882
883pub struct Stream {
884 pending_messages: usize,
885 pending_bytes: usize,
886 request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
887 request_tx: tokio::sync::watch::Sender<()>,
888 subscriber: Subscriber,
889 batch_config: BatchConfig,
890 context: Context,
891 pending_request: bool,
892 task_handle: JoinHandle<()>,
893 terminated: bool,
894 heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
895}
896
897impl Drop for Stream {
898 fn drop(&mut self) {
899 self.task_handle.abort();
900 }
901}
902
903impl Stream {
904 async fn stream(
905 batch_config: BatchConfig,
906 consumer: &Consumer<Config>,
907 ) -> Result<Stream, StreamError> {
908 let inbox = consumer.context.client.new_inbox();
909 let subscription = consumer
910 .context
911 .client
912 .subscribe(inbox.clone())
913 .await
914 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
915 let subject = format!(
916 "{}.CONSUMER.MSG.NEXT.{}.{}",
917 consumer.context.prefix, consumer.info.stream_name, consumer.info.name
918 );
919
920 let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
921 let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
922 let task_handle = tokio::task::spawn({
923 let batch = batch_config.clone();
924 let consumer = consumer.clone();
925 let mut context = consumer.context.clone();
926 let inbox = inbox.clone();
927 async move {
928 loop {
929 // this is just in edge case of missing response for some reason.
930 let expires = batch_config
931 .expires
932 .map(|expires| {
933 if expires.is_zero() {
934 Either::Left(future::pending())
935 } else {
936 Either::Right(tokio::time::sleep(
937 expires.saturating_add(Duration::from_secs(5)),
938 ))
939 }
940 })
941 .unwrap_or_else(|| Either::Left(future::pending()));
942 // Need to check previous state, as `changed` will always fire on first
943 // call.
944 let prev_state = context.client.state.borrow().to_owned();
945 let mut pending_reset = false;
946
947 tokio::select! {
948 _ = context.client.state.changed() => {
949 let state = context.client.state.borrow().to_owned();
950 if !(state == crate::connection::State::Connected
951 && prev_state != State::Connected) {
952 continue;
953 }
954 debug!("detected !Connected -> Connected state change");
955
956 match tryhard::retry_fn(|| consumer.fetch_info()).retries(5).exponential_backoff(Duration::from_millis(500)).await {
957 Ok(info) => {
958 if info.num_waiting == 0 {
959 pending_reset = true;
960 }
961 }
962 Err(err) => {
963 if let Err(err) = request_result_tx.send(Err(err)).await {
964 debug!("failed to sent request result: {}", err);
965 }
966 },
967 }
968 },
969 _ = request_rx.changed() => debug!("task received request request"),
970 _ = expires => {
971 pending_reset = true;
972 debug!("expired pull request")},
973 }
974
975 let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
976 let result = context
977 .client
978 .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
979 .await
980 .map(|_| pending_reset);
981 // TODO: add tracing instead of ignoring this.
982 request_result_tx
983 .send(result.map(|_| pending_reset).map_err(|err| {
984 crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
985 .into()
986 }))
987 .await
988 .ok();
989 trace!("result send over tx");
990 }
991 }
992 });
993
994 Ok(Stream {
995 task_handle,
996 request_result_rx,
997 request_tx,
998 batch_config,
999 pending_messages: 0,
1000 pending_bytes: 0,
1001 subscriber: subscription,
1002 context: consumer.context.clone(),
1003 pending_request: false,
1004 terminated: false,
1005 heartbeat_timeout: None,
1006 })
1007 }
1008}
1009
1010#[derive(Clone, Copy, Debug, PartialEq)]
1011pub enum OrderedErrorKind {
1012 MissingHeartbeat,
1013 ConsumerDeleted,
1014 Pull,
1015 PushBasedConsumer,
1016 Recreate,
1017 Other,
1018}
1019
1020impl std::fmt::Display for OrderedErrorKind {
1021 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1022 match self {
1023 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1024 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1025 Self::Pull => write!(f, "pull request failed"),
1026 Self::Other => write!(f, "error"),
1027 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1028 Self::Recreate => write!(f, "consumer recreation failed"),
1029 }
1030 }
1031}
1032
1033pub type OrderedError = Error<OrderedErrorKind>;
1034
1035impl From<MessagesError> for OrderedError {
1036 fn from(err: MessagesError) -> Self {
1037 match err.kind() {
1038 MessagesErrorKind::MissingHeartbeat => {
1039 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1040 }
1041 MessagesErrorKind::ConsumerDeleted => {
1042 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1043 }
1044 MessagesErrorKind::Pull => OrderedError {
1045 kind: OrderedErrorKind::Pull,
1046 source: err.source,
1047 },
1048 MessagesErrorKind::PushBasedConsumer => {
1049 OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1050 }
1051 MessagesErrorKind::Other => OrderedError {
1052 kind: OrderedErrorKind::Other,
1053 source: err.source,
1054 },
1055 }
1056 }
1057}
1058
1059#[derive(Clone, Copy, Debug, PartialEq)]
1060pub enum MessagesErrorKind {
1061 MissingHeartbeat,
1062 ConsumerDeleted,
1063 Pull,
1064 PushBasedConsumer,
1065 Other,
1066}
1067
1068impl std::fmt::Display for MessagesErrorKind {
1069 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1070 match self {
1071 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1072 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1073 Self::Pull => write!(f, "pull request failed"),
1074 Self::Other => write!(f, "error"),
1075 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1076 }
1077 }
1078}
1079
1080pub type MessagesError = Error<MessagesErrorKind>;
1081
1082impl futures::Stream for Stream {
1083 type Item = Result<jetstream::Message, MessagesError>;
1084
1085 fn poll_next(
1086 mut self: std::pin::Pin<&mut Self>,
1087 cx: &mut std::task::Context<'_>,
1088 ) -> std::task::Poll<Option<Self::Item>> {
1089 if self.terminated {
1090 return Poll::Ready(None);
1091 }
1092
1093 if !self.batch_config.idle_heartbeat.is_zero() {
1094 trace!("checking idle hearbeats");
1095 let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1096 match self
1097 .heartbeat_timeout
1098 .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1099 .poll_unpin(cx)
1100 {
1101 Poll::Ready(_) => {
1102 self.heartbeat_timeout = None;
1103 return Poll::Ready(Some(Err(MessagesError::new(
1104 MessagesErrorKind::MissingHeartbeat,
1105 ))));
1106 }
1107 Poll::Pending => (),
1108 }
1109 }
1110
1111 loop {
1112 trace!("pending messages: {}", self.pending_messages);
1113 if (self.pending_messages <= self.batch_config.batch / 2
1114 || (self.batch_config.max_bytes > 0
1115 && self.pending_bytes <= self.batch_config.max_bytes / 2))
1116 && !self.pending_request
1117 {
1118 debug!("pending messages reached threshold to send new fetch request");
1119 self.request_tx.send(()).ok();
1120 self.pending_request = true;
1121 }
1122
1123 match self.request_result_rx.poll_recv(cx) {
1124 Poll::Ready(resp) => match resp {
1125 Some(resp) => match resp {
1126 Ok(reset) => {
1127 trace!("request response: {:?}", reset);
1128 debug!("request sent, setting pending messages");
1129 if reset {
1130 self.pending_messages = self.batch_config.batch;
1131 self.pending_bytes = self.batch_config.max_bytes;
1132 } else {
1133 self.pending_messages += self.batch_config.batch;
1134 self.pending_bytes += self.batch_config.max_bytes;
1135 }
1136 self.pending_request = false;
1137 continue;
1138 }
1139 Err(err) => {
1140 return Poll::Ready(Some(Err(MessagesError::with_source(
1141 MessagesErrorKind::Pull,
1142 err,
1143 ))))
1144 }
1145 },
1146 None => return Poll::Ready(None),
1147 },
1148 Poll::Pending => {
1149 trace!("pending result");
1150 }
1151 }
1152
1153 trace!("polling subscriber");
1154 match self.subscriber.receiver.poll_recv(cx) {
1155 Poll::Ready(maybe_message) => {
1156 self.heartbeat_timeout = None;
1157 match maybe_message {
1158 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1159 StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1160 debug!("received status message: {:?}", message);
1161 // If consumer has been deleted, error and shutdown the iterator.
1162 if message.description.as_deref() == Some("Consumer Deleted") {
1163 self.terminated = true;
1164 return Poll::Ready(Some(Err(MessagesError::new(
1165 MessagesErrorKind::ConsumerDeleted,
1166 ))));
1167 }
1168 // If consumer is not pull based, error and shutdown the iterator.
1169 if message.description.as_deref() == Some("Consumer is push based")
1170 {
1171 self.terminated = true;
1172 return Poll::Ready(Some(Err(MessagesError::new(
1173 MessagesErrorKind::PushBasedConsumer,
1174 ))));
1175 }
1176
1177 // Do accounting for messages left after terminated/completed pull request.
1178 let pending_messages = message
1179 .headers
1180 .as_ref()
1181 .and_then(|headers| headers.get("Nats-Pending-Messages"))
1182 .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1183 .map_err(|err| {
1184 MessagesError::with_source(MessagesErrorKind::Other, err)
1185 })?;
1186
1187 let pending_bytes = message
1188 .headers
1189 .as_ref()
1190 .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1191 .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1192 .map_err(|err| {
1193 MessagesError::with_source(MessagesErrorKind::Other, err)
1194 })?;
1195
1196 debug!(
1197 "timeout reached. remaining messages: {}, bytes {}",
1198 pending_messages, pending_bytes
1199 );
1200 self.pending_messages =
1201 self.pending_messages.saturating_sub(pending_messages);
1202 trace!("message bytes len: {}", pending_bytes);
1203 self.pending_bytes =
1204 self.pending_bytes.saturating_sub(pending_bytes);
1205 continue;
1206 }
1207 // Idle Hearbeat means we have no messages, but consumer is fine.
1208 StatusCode::IDLE_HEARTBEAT => {
1209 debug!("received idle heartbeat");
1210 continue;
1211 }
1212 // We got an message from a stream.
1213 StatusCode::OK => {
1214 trace!("message received");
1215 self.pending_messages = self.pending_messages.saturating_sub(1);
1216 self.pending_bytes =
1217 self.pending_bytes.saturating_sub(message.length);
1218 return Poll::Ready(Some(Ok(jetstream::Message {
1219 context: self.context.clone(),
1220 message,
1221 })));
1222 }
1223 status => {
1224 debug!("received unknown message: {:?}", message);
1225 return Poll::Ready(Some(Err(MessagesError::with_source(
1226 MessagesErrorKind::Other,
1227 format!(
1228 "error while processing messages from the stream: {}, {:?}",
1229 status, message.description
1230 ),
1231 ))));
1232 }
1233 },
1234 None => return Poll::Ready(None),
1235 }
1236 }
1237 Poll::Pending => {
1238 debug!("subscriber still pending");
1239 return std::task::Poll::Pending;
1240 }
1241 }
1242 }
1243 }
1244}
1245
1246/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1247///
1248/// # Examples
1249///
1250/// ```no_run
1251/// # #[tokio::main]
1252/// # async fn main() -> Result<(), async_nats::Error> {
1253/// use futures::StreamExt;
1254/// use async_nats::jetstream::consumer::PullConsumer;
1255/// let client = async_nats::connect("localhost:4222").await?;
1256/// let jetstream = async_nats::jetstream::new(client);
1257///
1258/// let consumer: PullConsumer = jetstream
1259/// .get_stream("events").await?
1260/// .get_consumer("pull").await?;
1261///
1262/// let mut messages = consumer.stream()
1263/// .max_messages_per_batch(100)
1264/// .max_bytes_per_batch(1024)
1265/// .messages().await?;
1266///
1267/// while let Some(message) = messages.next().await {
1268/// let message = message?;
1269/// println!("message: {:?}", message);
1270/// message.ack().await?;
1271/// }
1272/// # Ok(())
1273/// # }
1274pub struct StreamBuilder<'a> {
1275 batch: usize,
1276 max_bytes: usize,
1277 heartbeat: Duration,
1278 expires: Duration,
1279 group: Option<String>,
1280 min_pending: Option<usize>,
1281 min_ack_pending: Option<usize>,
1282 consumer: &'a Consumer<Config>,
1283}
1284
1285impl<'a> StreamBuilder<'a> {
1286 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1287 StreamBuilder {
1288 consumer,
1289 batch: 200,
1290 max_bytes: 0,
1291 expires: Duration::from_secs(30),
1292 heartbeat: Duration::default(),
1293 group: None,
1294 min_pending: None,
1295 min_ack_pending: None,
1296 }
1297 }
1298
1299 /// Sets max bytes that can be buffered on the Client while processing already received
1300 /// messages.
1301 /// Higher values will yield better performance, but also potentially increase memory usage if
1302 /// application is acknowledging messages much slower than they arrive.
1303 ///
1304 /// Default values should provide reasonable balance between performance and memory usage.
1305 ///
1306 /// # Examples
1307 ///
1308 /// ```no_run
1309 /// # #[tokio::main]
1310 /// # async fn main() -> Result<(), async_nats::Error> {
1311 /// use async_nats::jetstream::consumer::PullConsumer;
1312 /// use futures::StreamExt;
1313 /// let client = async_nats::connect("localhost:4222").await?;
1314 /// let jetstream = async_nats::jetstream::new(client);
1315 ///
1316 /// let consumer: PullConsumer = jetstream
1317 /// .get_stream("events")
1318 /// .await?
1319 /// .get_consumer("pull")
1320 /// .await?;
1321 ///
1322 /// let mut messages = consumer
1323 /// .stream()
1324 /// .max_bytes_per_batch(1024)
1325 /// .messages()
1326 /// .await?;
1327 ///
1328 /// while let Some(message) = messages.next().await {
1329 /// let message = message?;
1330 /// println!("message: {:?}", message);
1331 /// message.ack().await?;
1332 /// }
1333 /// # Ok(())
1334 /// # }
1335 /// ```
1336 pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1337 self.max_bytes = max_bytes;
1338 self
1339 }
1340
1341 /// Sets max number of messages that can be buffered on the Client while processing already received
1342 /// messages.
1343 /// Higher values will yield better performance, but also potentially increase memory usage if
1344 /// application is acknowledging messages much slower than they arrive.
1345 ///
1346 /// Default values should provide reasonable balance between performance and memory usage.
1347 ///
1348 /// # Examples
1349 ///
1350 /// ```no_run
1351 /// # #[tokio::main]
1352 /// # async fn main() -> Result<(), async_nats::Error> {
1353 /// use async_nats::jetstream::consumer::PullConsumer;
1354 /// use futures::StreamExt;
1355 /// let client = async_nats::connect("localhost:4222").await?;
1356 /// let jetstream = async_nats::jetstream::new(client);
1357 ///
1358 /// let consumer: PullConsumer = jetstream
1359 /// .get_stream("events")
1360 /// .await?
1361 /// .get_consumer("pull")
1362 /// .await?;
1363 ///
1364 /// let mut messages = consumer
1365 /// .stream()
1366 /// .max_messages_per_batch(100)
1367 /// .messages()
1368 /// .await?;
1369 ///
1370 /// while let Some(message) = messages.next().await {
1371 /// let message = message?;
1372 /// println!("message: {:?}", message);
1373 /// message.ack().await?;
1374 /// }
1375 /// # Ok(())
1376 /// # }
1377 /// ```
1378 pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1379 self.batch = batch;
1380 self
1381 }
1382
1383 /// Sets heartbeat which will be send by the server if there are no messages for a given
1384 /// [Consumer] pending.
1385 ///
1386 /// # Examples
1387 ///
1388 /// ```no_run
1389 /// # #[tokio::main]
1390 /// # async fn main() -> Result<(), async_nats::Error> {
1391 /// use async_nats::jetstream::consumer::PullConsumer;
1392 /// use futures::StreamExt;
1393 /// let client = async_nats::connect("localhost:4222").await?;
1394 /// let jetstream = async_nats::jetstream::new(client);
1395 ///
1396 /// let consumer: PullConsumer = jetstream
1397 /// .get_stream("events")
1398 /// .await?
1399 /// .get_consumer("pull")
1400 /// .await?;
1401 ///
1402 /// let mut messages = consumer
1403 /// .stream()
1404 /// .heartbeat(std::time::Duration::from_secs(10))
1405 /// .messages()
1406 /// .await?;
1407 ///
1408 /// while let Some(message) = messages.next().await {
1409 /// let message = message?;
1410 /// println!("message: {:?}", message);
1411 /// message.ack().await?;
1412 /// }
1413 /// # Ok(())
1414 /// # }
1415 /// ```
1416 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1417 self.heartbeat = heartbeat;
1418 self
1419 }
1420
1421 /// Low level API that does not need tweaking for most use cases.
1422 /// Sets how long each batch request waits for whole batch of messages before timing out.
1423 /// [Consumer] pending.
1424 ///
1425 /// # Examples
1426 ///
1427 /// ```no_run
1428 /// # #[tokio::main]
1429 /// # async fn main() -> Result<(), async_nats::Error> {
1430 /// use async_nats::jetstream::consumer::PullConsumer;
1431 /// use futures::StreamExt;
1432 /// let client = async_nats::connect("localhost:4222").await?;
1433 /// let jetstream = async_nats::jetstream::new(client);
1434 ///
1435 /// let consumer: PullConsumer = jetstream
1436 /// .get_stream("events")
1437 /// .await?
1438 /// .get_consumer("pull")
1439 /// .await?;
1440 ///
1441 /// let mut messages = consumer
1442 /// .stream()
1443 /// .expires(std::time::Duration::from_secs(30))
1444 /// .messages()
1445 /// .await?;
1446 ///
1447 /// while let Some(message) = messages.next().await {
1448 /// let message = message?;
1449 /// println!("message: {:?}", message);
1450 /// message.ack().await?;
1451 /// }
1452 /// # Ok(())
1453 /// # }
1454 /// ```
1455 pub fn expires(mut self, expires: Duration) -> Self {
1456 self.expires = expires;
1457 self
1458 }
1459
1460 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1461 /// messages for a [Consumer].
1462 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1463 ///
1464 /// # Examples
1465 ///
1466 /// ```no_run
1467 /// # #[tokio::main]
1468 /// # async fn main() -> Result<(), async_nats::Error> {
1469 /// use async_nats::jetstream::consumer::PullConsumer;
1470 /// use futures::StreamExt;
1471 /// let client = async_nats::connect("localhost:4222").await?;
1472 /// let jetstream = async_nats::jetstream::new(client);
1473 ///
1474 /// let consumer: PullConsumer = jetstream
1475 /// .get_stream("events")
1476 /// .await?
1477 /// .get_consumer("pull")
1478 /// .await?;
1479 ///
1480 /// let mut messages = consumer
1481 /// .stream()
1482 /// .expires(std::time::Duration::from_secs(30))
1483 /// .group("A")
1484 /// .min_pending(100)
1485 /// .messages()
1486 /// .await?;
1487 ///
1488 /// while let Some(message) = messages.next().await {
1489 /// let message = message?;
1490 /// println!("message: {:?}", message);
1491 /// message.ack().await?;
1492 /// }
1493 /// # Ok(())
1494 /// # }
1495 /// ```
1496 pub fn min_pending(mut self, min_pending: usize) -> Self {
1497 self.min_pending = Some(min_pending);
1498 self
1499 }
1500
1501 /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1502 /// messages for a [Consumer].
1503 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1504 ///
1505 /// # Examples
1506 ///
1507 /// ```no_run
1508 /// # #[tokio::main]
1509 /// # async fn main() -> Result<(), async_nats::Error> {
1510 /// use async_nats::jetstream::consumer::PullConsumer;
1511 /// use futures::StreamExt;
1512 /// let client = async_nats::connect("localhost:4222").await?;
1513 /// let jetstream = async_nats::jetstream::new(client);
1514 ///
1515 /// let consumer: PullConsumer = jetstream
1516 /// .get_stream("events")
1517 /// .await?
1518 /// .get_consumer("pull")
1519 /// .await?;
1520 ///
1521 /// let mut messages = consumer
1522 /// .stream()
1523 /// .expires(std::time::Duration::from_secs(30))
1524 /// .group("A")
1525 /// .min_ack_pending(100)
1526 /// .messages()
1527 /// .await?;
1528 ///
1529 /// while let Some(message) = messages.next().await {
1530 /// let message = message?;
1531 /// println!("message: {:?}", message);
1532 /// message.ack().await?;
1533 /// }
1534 /// # Ok(())
1535 /// # }
1536 /// ```
1537 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1538 self.min_ack_pending = Some(min_ack_pending);
1539 self
1540 }
1541
1542 /// Setting group when using [Consumer] with [Config::priority_groups].
1543 ///
1544 /// # Examples
1545 ///
1546 /// ```no_run
1547 /// # #[tokio::main]
1548 /// # async fn main() -> Result<(), async_nats::Error> {
1549 /// use async_nats::jetstream::consumer::PullConsumer;
1550 /// use futures::StreamExt;
1551 /// let client = async_nats::connect("localhost:4222").await?;
1552 /// let jetstream = async_nats::jetstream::new(client);
1553 ///
1554 /// let consumer: PullConsumer = jetstream
1555 /// .get_stream("events")
1556 /// .await?
1557 /// .get_consumer("pull")
1558 /// .await?;
1559 ///
1560 /// let mut messages = consumer
1561 /// .stream()
1562 /// .expires(std::time::Duration::from_secs(30))
1563 /// .group("A")
1564 /// .min_ack_pending(100)
1565 /// .messages()
1566 /// .await?;
1567 ///
1568 /// while let Some(message) = messages.next().await {
1569 /// let message = message?;
1570 /// println!("message: {:?}", message);
1571 /// message.ack().await?;
1572 /// }
1573 /// # Ok(())
1574 /// # }
1575 /// ```
1576 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1577 self.group = Some(group.into());
1578 self
1579 }
1580
1581 /// Creates actual [Stream] with provided configuration.
1582 ///
1583 /// # Examples
1584 ///
1585 /// ```no_run
1586 /// # #[tokio::main]
1587 /// # async fn main() -> Result<(), async_nats::Error> {
1588 /// use async_nats::jetstream::consumer::PullConsumer;
1589 /// use futures::StreamExt;
1590 /// let client = async_nats::connect("localhost:4222").await?;
1591 /// let jetstream = async_nats::jetstream::new(client);
1592 ///
1593 /// let consumer: PullConsumer = jetstream
1594 /// .get_stream("events")
1595 /// .await?
1596 /// .get_consumer("pull")
1597 /// .await?;
1598 ///
1599 /// let mut messages = consumer
1600 /// .stream()
1601 /// .max_messages_per_batch(100)
1602 /// .messages()
1603 /// .await?;
1604 ///
1605 /// while let Some(message) = messages.next().await {
1606 /// let message = message?;
1607 /// println!("message: {:?}", message);
1608 /// message.ack().await?;
1609 /// }
1610 /// # Ok(())
1611 /// # }
1612 /// ```
1613 pub async fn messages(self) -> Result<Stream, StreamError> {
1614 Stream::stream(
1615 BatchConfig {
1616 batch: self.batch,
1617 expires: Some(self.expires),
1618 no_wait: false,
1619 max_bytes: self.max_bytes,
1620 idle_heartbeat: self.heartbeat,
1621 min_pending: self.min_pending,
1622 group: self.group,
1623 min_ack_pending: self.min_ack_pending,
1624 },
1625 self.consumer,
1626 )
1627 .await
1628 }
1629}
1630
1631/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1632///
1633/// # Examples
1634///
1635/// ```no_run
1636/// # #[tokio::main]
1637/// # async fn main() -> Result<(), async_nats::Error> {
1638/// use async_nats::jetstream::consumer::PullConsumer;
1639/// use futures::StreamExt;
1640/// let client = async_nats::connect("localhost:4222").await?;
1641/// let jetstream = async_nats::jetstream::new(client);
1642///
1643/// let consumer: PullConsumer = jetstream
1644/// .get_stream("events")
1645/// .await?
1646/// .get_consumer("pull")
1647/// .await?;
1648///
1649/// let mut messages = consumer
1650/// .fetch()
1651/// .max_messages(100)
1652/// .max_bytes(1024)
1653/// .messages()
1654/// .await?;
1655///
1656/// while let Some(message) = messages.next().await {
1657/// let message = message?;
1658/// println!("message: {:?}", message);
1659/// message.ack().await?;
1660/// }
1661/// # Ok(())
1662/// # }
1663/// ```
1664pub struct FetchBuilder<'a> {
1665 batch: usize,
1666 max_bytes: usize,
1667 heartbeat: Duration,
1668 expires: Option<Duration>,
1669 min_pending: Option<usize>,
1670 min_ack_pending: Option<usize>,
1671 group: Option<String>,
1672 consumer: &'a Consumer<Config>,
1673}
1674
1675impl<'a> FetchBuilder<'a> {
1676 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1677 FetchBuilder {
1678 consumer,
1679 batch: 200,
1680 max_bytes: 0,
1681 expires: None,
1682 min_pending: None,
1683 min_ack_pending: None,
1684 group: None,
1685 heartbeat: Duration::default(),
1686 }
1687 }
1688
1689 /// Sets max bytes that can be buffered on the Client while processing already received
1690 /// messages.
1691 /// Higher values will yield better performance, but also potentially increase memory usage if
1692 /// application is acknowledging messages much slower than they arrive.
1693 ///
1694 /// Default values should provide reasonable balance between performance and memory usage.
1695 ///
1696 /// # Examples
1697 ///
1698 /// ```no_run
1699 /// # #[tokio::main]
1700 /// # async fn main() -> Result<(), async_nats::Error> {
1701 /// use futures::StreamExt;
1702 /// let client = async_nats::connect("localhost:4222").await?;
1703 /// let jetstream = async_nats::jetstream::new(client);
1704 ///
1705 /// let consumer = jetstream
1706 /// .get_stream("events")
1707 /// .await?
1708 /// .get_consumer("pull")
1709 /// .await?;
1710 ///
1711 /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1712 ///
1713 /// while let Some(message) = messages.next().await {
1714 /// let message = message?;
1715 /// println!("message: {:?}", message);
1716 /// message.ack().await?;
1717 /// }
1718 /// # Ok(())
1719 /// # }
1720 /// ```
1721 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1722 self.max_bytes = max_bytes;
1723 self
1724 }
1725
1726 /// Sets max number of messages that can be buffered on the Client while processing already received
1727 /// messages.
1728 /// Higher values will yield better performance, but also potentially increase memory usage if
1729 /// application is acknowledging messages much slower than they arrive.
1730 ///
1731 /// Default values should provide reasonable balance between performance and memory usage.
1732 ///
1733 /// # Examples
1734 ///
1735 /// ```no_run
1736 /// # #[tokio::main]
1737 /// # async fn main() -> Result<(), async_nats::Error> {
1738 /// use futures::StreamExt;
1739 /// let client = async_nats::connect("localhost:4222").await?;
1740 /// let jetstream = async_nats::jetstream::new(client);
1741 ///
1742 /// let consumer = jetstream
1743 /// .get_stream("events")
1744 /// .await?
1745 /// .get_consumer("pull")
1746 /// .await?;
1747 ///
1748 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1749 ///
1750 /// while let Some(message) = messages.next().await {
1751 /// let message = message?;
1752 /// println!("message: {:?}", message);
1753 /// message.ack().await?;
1754 /// }
1755 /// # Ok(())
1756 /// # }
1757 /// ```
1758 pub fn max_messages(mut self, batch: usize) -> Self {
1759 self.batch = batch;
1760 self
1761 }
1762
1763 /// Sets heartbeat which will be send by the server if there are no messages for a given
1764 /// [Consumer] pending.
1765 ///
1766 /// # Examples
1767 ///
1768 /// ```no_run
1769 /// # #[tokio::main]
1770 /// # async fn main() -> Result<(), async_nats::Error> {
1771 /// use async_nats::jetstream::consumer::PullConsumer;
1772 /// use futures::StreamExt;
1773 /// let client = async_nats::connect("localhost:4222").await?;
1774 /// let jetstream = async_nats::jetstream::new(client);
1775 ///
1776 /// let consumer = jetstream
1777 /// .get_stream("events")
1778 /// .await?
1779 /// .get_consumer("pull")
1780 /// .await?;
1781 ///
1782 /// let mut messages = consumer
1783 /// .fetch()
1784 /// .heartbeat(std::time::Duration::from_secs(10))
1785 /// .messages()
1786 /// .await?;
1787 ///
1788 /// while let Some(message) = messages.next().await {
1789 /// let message = message?;
1790 /// println!("message: {:?}", message);
1791 /// message.ack().await?;
1792 /// }
1793 /// # Ok(())
1794 /// # }
1795 /// ```
1796 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1797 self.heartbeat = heartbeat;
1798 self
1799 }
1800
1801 /// Low level API that does not need tweaking for most use cases.
1802 /// Sets how long each batch request waits for whole batch of messages before timing out.
1803 /// [Consumer] pending.
1804 ///
1805 /// # Examples
1806 ///
1807 /// ```no_run
1808 /// # #[tokio::main]
1809 /// # async fn main() -> Result<(), async_nats::Error> {
1810 /// use async_nats::jetstream::consumer::PullConsumer;
1811 /// use futures::StreamExt;
1812 ///
1813 /// let client = async_nats::connect("localhost:4222").await?;
1814 /// let jetstream = async_nats::jetstream::new(client);
1815 ///
1816 /// let consumer: PullConsumer = jetstream
1817 /// .get_stream("events")
1818 /// .await?
1819 /// .get_consumer("pull")
1820 /// .await?;
1821 ///
1822 /// let mut messages = consumer
1823 /// .fetch()
1824 /// .expires(std::time::Duration::from_secs(30))
1825 /// .messages()
1826 /// .await?;
1827 ///
1828 /// while let Some(message) = messages.next().await {
1829 /// let message = message?;
1830 /// println!("message: {:?}", message);
1831 /// message.ack().await?;
1832 /// }
1833 /// # Ok(())
1834 /// # }
1835 /// ```
1836 pub fn expires(mut self, expires: Duration) -> Self {
1837 self.expires = Some(expires);
1838 self
1839 }
1840
1841 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1842 /// messages.
1843 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1844 /// [PriorityPolicy::Overflow] set.
1845 ///
1846 /// # Examples
1847 ///
1848 /// ```no_run
1849 /// # #[tokio::main]
1850 /// # async fn main() -> Result<(), async_nats::Error> {
1851 /// use async_nats::jetstream::consumer::PullConsumer;
1852 /// use futures::StreamExt;
1853 ///
1854 /// let client = async_nats::connect("localhost:4222").await?;
1855 /// let jetstream = async_nats::jetstream::new(client);
1856 ///
1857 /// let consumer: PullConsumer = jetstream
1858 /// .get_stream("events")
1859 /// .await?
1860 /// .get_consumer("pull")
1861 /// .await?;
1862 ///
1863 /// let mut messages = consumer
1864 /// .fetch()
1865 /// .expires(std::time::Duration::from_secs(30))
1866 /// .group("A")
1867 /// .min_pending(100)
1868 /// .messages()
1869 /// .await?;
1870 ///
1871 /// while let Some(message) = messages.next().await {
1872 /// let message = message?;
1873 /// println!("message: {:?}", message);
1874 /// message.ack().await?;
1875 /// }
1876 /// # Ok(())
1877 /// # }
1878 /// ```
1879 pub fn min_pending(mut self, min_pending: usize) -> Self {
1880 self.min_pending = Some(min_pending);
1881 self
1882 }
1883
1884 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1885 /// messages.
1886 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1887 /// [PriorityPolicy::Overflow] set.
1888 ///
1889 /// # Examples
1890 ///
1891 /// ```no_run
1892 /// # #[tokio::main]
1893 /// # async fn main() -> Result<(), async_nats::Error> {
1894 /// use async_nats::jetstream::consumer::PullConsumer;
1895 /// use futures::StreamExt;
1896 ///
1897 /// let client = async_nats::connect("localhost:4222").await?;
1898 /// let jetstream = async_nats::jetstream::new(client);
1899 ///
1900 /// let consumer: PullConsumer = jetstream
1901 /// .get_stream("events")
1902 /// .await?
1903 /// .get_consumer("pull")
1904 /// .await?;
1905 ///
1906 /// let mut messages = consumer
1907 /// .fetch()
1908 /// .expires(std::time::Duration::from_secs(30))
1909 /// .group("A")
1910 /// .min_ack_pending(100)
1911 /// .messages()
1912 /// .await?;
1913 ///
1914 /// while let Some(message) = messages.next().await {
1915 /// let message = message?;
1916 /// println!("message: {:?}", message);
1917 /// message.ack().await?;
1918 /// }
1919 /// # Ok(())
1920 /// # }
1921 /// ```
1922 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1923 self.min_ack_pending = Some(min_ack_pending);
1924 self
1925 }
1926
1927 /// Setting group when using [Consumer] with [PriorityPolicy].
1928 ///
1929 /// # Examples
1930 ///
1931 /// ```no_run
1932 /// # #[tokio::main]
1933 /// # async fn main() -> Result<(), async_nats::Error> {
1934 /// use async_nats::jetstream::consumer::PullConsumer;
1935 /// use futures::StreamExt;
1936 ///
1937 /// let client = async_nats::connect("localhost:4222").await?;
1938 /// let jetstream = async_nats::jetstream::new(client);
1939 ///
1940 /// let consumer: PullConsumer = jetstream
1941 /// .get_stream("events")
1942 /// .await?
1943 /// .get_consumer("pull")
1944 /// .await?;
1945 ///
1946 /// let mut messages = consumer
1947 /// .fetch()
1948 /// .expires(std::time::Duration::from_secs(30))
1949 /// .group("A")
1950 /// .min_ack_pending(100)
1951 /// .messages()
1952 /// .await?;
1953 ///
1954 /// while let Some(message) = messages.next().await {
1955 /// let message = message?;
1956 /// println!("message: {:?}", message);
1957 /// message.ack().await?;
1958 /// }
1959 /// # Ok(())
1960 /// # }
1961 /// ```
1962 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1963 self.group = Some(group.into());
1964 self
1965 }
1966
1967 /// Creates actual [Stream] with provided configuration.
1968 ///
1969 /// # Examples
1970 ///
1971 /// ```no_run
1972 /// # #[tokio::main]
1973 /// # async fn main() -> Result<(), async_nats::Error> {
1974 /// use async_nats::jetstream::consumer::PullConsumer;
1975 /// use futures::StreamExt;
1976 /// let client = async_nats::connect("localhost:4222").await?;
1977 /// let jetstream = async_nats::jetstream::new(client);
1978 ///
1979 /// let consumer: PullConsumer = jetstream
1980 /// .get_stream("events")
1981 /// .await?
1982 /// .get_consumer("pull")
1983 /// .await?;
1984 ///
1985 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1986 ///
1987 /// while let Some(message) = messages.next().await {
1988 /// let message = message?;
1989 /// println!("message: {:?}", message);
1990 /// message.ack().await?;
1991 /// }
1992 /// # Ok(())
1993 /// # }
1994 /// ```
1995 pub async fn messages(self) -> Result<Batch, BatchError> {
1996 Batch::batch(
1997 BatchConfig {
1998 batch: self.batch,
1999 expires: self.expires,
2000 no_wait: true,
2001 max_bytes: self.max_bytes,
2002 idle_heartbeat: self.heartbeat,
2003 min_pending: self.min_pending,
2004 min_ack_pending: self.min_ack_pending,
2005 group: self.group,
2006 },
2007 self.consumer,
2008 )
2009 .await
2010 }
2011}
2012
2013/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2014///
2015/// # Examples
2016///
2017/// ```no_run
2018/// # #[tokio::main]
2019/// # async fn main() -> Result<(), async_nats::Error> {
2020/// use async_nats::jetstream::consumer::PullConsumer;
2021/// use futures::StreamExt;
2022/// let client = async_nats::connect("localhost:4222").await?;
2023/// let jetstream = async_nats::jetstream::new(client);
2024///
2025/// let consumer: PullConsumer = jetstream
2026/// .get_stream("events")
2027/// .await?
2028/// .get_consumer("pull")
2029/// .await?;
2030///
2031/// let mut messages = consumer
2032/// .batch()
2033/// .max_messages(100)
2034/// .max_bytes(1024)
2035/// .messages()
2036/// .await?;
2037///
2038/// while let Some(message) = messages.next().await {
2039/// let message = message?;
2040/// println!("message: {:?}", message);
2041/// message.ack().await?;
2042/// }
2043/// # Ok(())
2044/// # }
2045/// ```
2046pub struct BatchBuilder<'a> {
2047 batch: usize,
2048 max_bytes: usize,
2049 heartbeat: Duration,
2050 expires: Duration,
2051 min_pending: Option<usize>,
2052 min_ack_pending: Option<usize>,
2053 group: Option<String>,
2054 consumer: &'a Consumer<Config>,
2055}
2056
2057impl<'a> BatchBuilder<'a> {
2058 pub fn new(consumer: &'a Consumer<Config>) -> Self {
2059 BatchBuilder {
2060 consumer,
2061 batch: 200,
2062 max_bytes: 0,
2063 expires: Duration::ZERO,
2064 heartbeat: Duration::default(),
2065 min_pending: None,
2066 min_ack_pending: None,
2067 group: None,
2068 }
2069 }
2070
2071 /// Sets max bytes that can be buffered on the Client while processing already received
2072 /// messages.
2073 /// Higher values will yield better performance, but also potentially increase memory usage if
2074 /// application is acknowledging messages much slower than they arrive.
2075 ///
2076 /// Default values should provide reasonable balance between performance and memory usage.
2077 ///
2078 /// # Examples
2079 ///
2080 /// ```no_run
2081 /// # #[tokio::main]
2082 /// # async fn main() -> Result<(), async_nats::Error> {
2083 /// use async_nats::jetstream::consumer::PullConsumer;
2084 /// use futures::StreamExt;
2085 /// let client = async_nats::connect("localhost:4222").await?;
2086 /// let jetstream = async_nats::jetstream::new(client);
2087 ///
2088 /// let consumer: PullConsumer = jetstream
2089 /// .get_stream("events")
2090 /// .await?
2091 /// .get_consumer("pull")
2092 /// .await?;
2093 ///
2094 /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2095 ///
2096 /// while let Some(message) = messages.next().await {
2097 /// let message = message?;
2098 /// println!("message: {:?}", message);
2099 /// message.ack().await?;
2100 /// }
2101 /// # Ok(())
2102 /// # }
2103 /// ```
2104 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2105 self.max_bytes = max_bytes;
2106 self
2107 }
2108
2109 /// Sets max number of messages that can be buffered on the Client while processing already received
2110 /// messages.
2111 /// Higher values will yield better performance, but also potentially increase memory usage if
2112 /// application is acknowledging messages much slower than they arrive.
2113 ///
2114 /// Default values should provide reasonable balance between performance and memory usage.
2115 ///
2116 /// # Examples
2117 ///
2118 /// ```no_run
2119 /// # #[tokio::main]
2120 /// # async fn main() -> Result<(), async_nats::Error> {
2121 /// use async_nats::jetstream::consumer::PullConsumer;
2122 /// use futures::StreamExt;
2123 /// let client = async_nats::connect("localhost:4222").await?;
2124 /// let jetstream = async_nats::jetstream::new(client);
2125 ///
2126 /// let consumer: PullConsumer = jetstream
2127 /// .get_stream("events")
2128 /// .await?
2129 /// .get_consumer("pull")
2130 /// .await?;
2131 ///
2132 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2133 ///
2134 /// while let Some(message) = messages.next().await {
2135 /// let message = message?;
2136 /// println!("message: {:?}", message);
2137 /// message.ack().await?;
2138 /// }
2139 /// # Ok(())
2140 /// # }
2141 /// ```
2142 pub fn max_messages(mut self, batch: usize) -> Self {
2143 self.batch = batch;
2144 self
2145 }
2146
2147 /// Sets heartbeat which will be send by the server if there are no messages for a given
2148 /// [Consumer] pending.
2149 ///
2150 /// # Examples
2151 ///
2152 /// ```no_run
2153 /// # #[tokio::main]
2154 /// # async fn main() -> Result<(), async_nats::Error> {
2155 /// use async_nats::jetstream::consumer::PullConsumer;
2156 /// use futures::StreamExt;
2157 /// let client = async_nats::connect("localhost:4222").await?;
2158 /// let jetstream = async_nats::jetstream::new(client);
2159 ///
2160 /// let consumer: PullConsumer = jetstream
2161 /// .get_stream("events")
2162 /// .await?
2163 /// .get_consumer("pull")
2164 /// .await?;
2165 ///
2166 /// let mut messages = consumer
2167 /// .batch()
2168 /// .heartbeat(std::time::Duration::from_secs(10))
2169 /// .messages()
2170 /// .await?;
2171 ///
2172 /// while let Some(message) = messages.next().await {
2173 /// let message = message?;
2174 /// println!("message: {:?}", message);
2175 /// message.ack().await?;
2176 /// }
2177 /// # Ok(())
2178 /// # }
2179 /// ```
2180 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2181 self.heartbeat = heartbeat;
2182 self
2183 }
2184
2185 /// Sets overflow threshold for minimum pending messages before this stream will start getting
2186 /// messages.
2187 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2188 /// [PriorityPolicy::Overflow] set.
2189 ///
2190 /// # Examples
2191 ///
2192 /// ```no_run
2193 /// # #[tokio::main]
2194 /// # async fn main() -> Result<(), async_nats::Error> {
2195 /// use async_nats::jetstream::consumer::PullConsumer;
2196 /// use futures::StreamExt;
2197 ///
2198 /// let client = async_nats::connect("localhost:4222").await?;
2199 /// let jetstream = async_nats::jetstream::new(client);
2200 ///
2201 /// let consumer: PullConsumer = jetstream
2202 /// .get_stream("events")
2203 /// .await?
2204 /// .get_consumer("pull")
2205 /// .await?;
2206 ///
2207 /// let mut messages = consumer
2208 /// .batch()
2209 /// .expires(std::time::Duration::from_secs(30))
2210 /// .group("A")
2211 /// .min_pending(100)
2212 /// .messages()
2213 /// .await?;
2214 ///
2215 /// while let Some(message) = messages.next().await {
2216 /// let message = message?;
2217 /// println!("message: {:?}", message);
2218 /// message.ack().await?;
2219 /// }
2220 /// # Ok(())
2221 /// # }
2222 /// ```
2223 pub fn min_pending(mut self, min_pending: usize) -> Self {
2224 self.min_pending = Some(min_pending);
2225 self
2226 }
2227
2228 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2229 /// messages.
2230 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2231 /// [PriorityPolicy::Overflow] set.
2232 ///
2233 /// # Examples
2234 ///
2235 /// ```no_run
2236 /// # #[tokio::main]
2237 /// # async fn main() -> Result<(), async_nats::Error> {
2238 /// use async_nats::jetstream::consumer::PullConsumer;
2239 /// use futures::StreamExt;
2240 ///
2241 /// let client = async_nats::connect("localhost:4222").await?;
2242 /// let jetstream = async_nats::jetstream::new(client);
2243 ///
2244 /// let consumer: PullConsumer = jetstream
2245 /// .get_stream("events")
2246 /// .await?
2247 /// .get_consumer("pull")
2248 /// .await?;
2249 ///
2250 /// let mut messages = consumer
2251 /// .batch()
2252 /// .expires(std::time::Duration::from_secs(30))
2253 /// .group("A")
2254 /// .min_ack_pending(100)
2255 /// .messages()
2256 /// .await?;
2257 ///
2258 /// while let Some(message) = messages.next().await {
2259 /// let message = message?;
2260 /// println!("message: {:?}", message);
2261 /// message.ack().await?;
2262 /// }
2263 /// # Ok(())
2264 /// # }
2265 /// ```
2266 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2267 self.min_ack_pending = Some(min_ack_pending);
2268 self
2269 }
2270
2271 /// Setting group when using [Consumer] with [PriorityPolicy].
2272 ///
2273 /// # Examples
2274 ///
2275 /// ```no_run
2276 /// # #[tokio::main]
2277 /// # async fn main() -> Result<(), async_nats::Error> {
2278 /// use async_nats::jetstream::consumer::PullConsumer;
2279 /// use futures::StreamExt;
2280 ///
2281 /// let client = async_nats::connect("localhost:4222").await?;
2282 /// let jetstream = async_nats::jetstream::new(client);
2283 ///
2284 /// let consumer: PullConsumer = jetstream
2285 /// .get_stream("events")
2286 /// .await?
2287 /// .get_consumer("pull")
2288 /// .await?;
2289 ///
2290 /// let mut messages = consumer
2291 /// .batch()
2292 /// .expires(std::time::Duration::from_secs(30))
2293 /// .group("A")
2294 /// .min_ack_pending(100)
2295 /// .messages()
2296 /// .await?;
2297 ///
2298 /// while let Some(message) = messages.next().await {
2299 /// let message = message?;
2300 /// println!("message: {:?}", message);
2301 /// message.ack().await?;
2302 /// }
2303 /// # Ok(())
2304 /// # }
2305 /// ```
2306 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2307 self.group = Some(group.into());
2308 self
2309 }
2310
2311 /// Low level API that does not need tweaking for most use cases.
2312 /// Sets how long each batch request waits for whole batch of messages before timing out.
2313 /// [Consumer] pending.
2314 ///
2315 /// # Examples
2316 ///
2317 /// ```no_run
2318 /// # #[tokio::main]
2319 /// # async fn main() -> Result<(), async_nats::Error> {
2320 /// use async_nats::jetstream::consumer::PullConsumer;
2321 /// use futures::StreamExt;
2322 /// let client = async_nats::connect("localhost:4222").await?;
2323 /// let jetstream = async_nats::jetstream::new(client);
2324 ///
2325 /// let consumer: PullConsumer = jetstream
2326 /// .get_stream("events")
2327 /// .await?
2328 /// .get_consumer("pull")
2329 /// .await?;
2330 ///
2331 /// let mut messages = consumer
2332 /// .batch()
2333 /// .expires(std::time::Duration::from_secs(30))
2334 /// .messages()
2335 /// .await?;
2336 ///
2337 /// while let Some(message) = messages.next().await {
2338 /// let message = message?;
2339 /// println!("message: {:?}", message);
2340 /// message.ack().await?;
2341 /// }
2342 /// # Ok(())
2343 /// # }
2344 /// ```
2345 pub fn expires(mut self, expires: Duration) -> Self {
2346 self.expires = expires;
2347 self
2348 }
2349
2350 /// Creates actual [Stream] with provided configuration.
2351 ///
2352 /// # Examples
2353 ///
2354 /// ```no_run
2355 /// # #[tokio::main]
2356 /// # async fn main() -> Result<(), async_nats::Error> {
2357 /// use async_nats::jetstream::consumer::PullConsumer;
2358 /// use futures::StreamExt;
2359 /// let client = async_nats::connect("localhost:4222").await?;
2360 /// let jetstream = async_nats::jetstream::new(client);
2361 ///
2362 /// let consumer: PullConsumer = jetstream
2363 /// .get_stream("events")
2364 /// .await?
2365 /// .get_consumer("pull")
2366 /// .await?;
2367 ///
2368 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2369 ///
2370 /// while let Some(message) = messages.next().await {
2371 /// let message = message?;
2372 /// println!("message: {:?}", message);
2373 /// message.ack().await?;
2374 /// }
2375 /// # Ok(())
2376 /// # }
2377 /// ```
2378 pub async fn messages(self) -> Result<Batch, BatchError> {
2379 let config = BatchConfig {
2380 batch: self.batch,
2381 expires: Some(self.expires),
2382 no_wait: false,
2383 max_bytes: self.max_bytes,
2384 idle_heartbeat: self.heartbeat,
2385 min_pending: self.min_pending,
2386 min_ack_pending: self.min_ack_pending,
2387 group: self.group,
2388 };
2389 Batch::batch(config, self.consumer).await
2390 }
2391}
2392
2393/// Used for next Pull Request for Pull Consumer
2394#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2395pub struct BatchConfig {
2396 /// The number of messages that are being requested to be delivered.
2397 pub batch: usize,
2398 /// The optional number of nanoseconds that the server will store this next request for
2399 /// before forgetting about the pending batch size.
2400 #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2401 pub expires: Option<Duration>,
2402 /// This optionally causes the server not to store this pending request at all, but when there are no
2403 /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2404 /// can know when you reached the end of the stream for example. A 409 is returned if the
2405 /// Consumer has reached MaxAckPending limits.
2406 #[serde(skip_serializing_if = "is_default")]
2407 pub no_wait: bool,
2408
2409 /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2410 /// Whichever value is reached first, batch will complete.
2411 pub max_bytes: usize,
2412
2413 /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2414 /// client
2415 #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2416 pub idle_heartbeat: Duration,
2417
2418 pub min_pending: Option<usize>,
2419 pub min_ack_pending: Option<usize>,
2420 pub group: Option<String>,
2421}
2422
2423fn is_default<T: Default + Eq>(t: &T) -> bool {
2424 t == &T::default()
2425}
2426
2427#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2428pub struct Config {
2429 /// Setting `durable_name` to `Some(...)` will cause this consumer
2430 /// to be "durable". This may be a good choice for workloads that
2431 /// benefit from the `JetStream` server or cluster remembering the
2432 /// progress of consumers for fault tolerance purposes. If a consumer
2433 /// crashes, the `JetStream` server or cluster will remember which
2434 /// messages the consumer acknowledged. When the consumer recovers,
2435 /// this information will allow the consumer to resume processing
2436 /// where it left off. If you're unsure, set this to `Some(...)`.
2437 ///
2438 /// Setting `durable_name` to `None` will cause this consumer to
2439 /// be "ephemeral". This may be a good choice for workloads where
2440 /// you don't need the `JetStream` server to remember the consumer's
2441 /// progress in the case of a crash, such as certain "high churn"
2442 /// workloads or workloads where a crashed instance is not required
2443 /// to recover.
2444 #[serde(default, skip_serializing_if = "Option::is_none")]
2445 pub durable_name: Option<String>,
2446 /// A name of the consumer. Can be specified for both durable and ephemeral
2447 /// consumers.
2448 #[serde(default, skip_serializing_if = "Option::is_none")]
2449 pub name: Option<String>,
2450 /// A short description of the purpose of this consumer.
2451 #[serde(default, skip_serializing_if = "Option::is_none")]
2452 pub description: Option<String>,
2453 /// Allows for a variety of options that determine how this consumer will receive messages
2454 #[serde(flatten)]
2455 pub deliver_policy: DeliverPolicy,
2456 /// How messages should be acknowledged
2457 pub ack_policy: AckPolicy,
2458 /// How long to allow messages to remain un-acknowledged before attempting redelivery
2459 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2460 pub ack_wait: Duration,
2461 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2462 #[serde(default, skip_serializing_if = "is_default")]
2463 pub max_deliver: i64,
2464 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2465 #[serde(default, skip_serializing_if = "is_default")]
2466 pub filter_subject: String,
2467 #[cfg(feature = "server_2_10")]
2468 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2469 #[serde(default, skip_serializing_if = "is_default")]
2470 pub filter_subjects: Vec<String>,
2471 /// Whether messages are sent as quickly as possible or at the rate of receipt
2472 pub replay_policy: ReplayPolicy,
2473 /// The rate of message delivery in bits per second
2474 #[serde(default, skip_serializing_if = "is_default")]
2475 pub rate_limit: u64,
2476 /// What percentage of acknowledgments should be samples for observability, 0-100
2477 #[serde(
2478 rename = "sample_freq",
2479 with = "super::sample_freq_deser",
2480 default,
2481 skip_serializing_if = "is_default"
2482 )]
2483 pub sample_frequency: u8,
2484 /// The maximum number of waiting consumers.
2485 #[serde(default, skip_serializing_if = "is_default")]
2486 pub max_waiting: i64,
2487 /// The maximum number of unacknowledged messages that may be
2488 /// in-flight before pausing sending additional messages to
2489 /// this consumer.
2490 #[serde(default, skip_serializing_if = "is_default")]
2491 pub max_ack_pending: i64,
2492 /// Only deliver headers without payloads.
2493 #[serde(default, skip_serializing_if = "is_default")]
2494 pub headers_only: bool,
2495 /// Maximum size of a request batch
2496 #[serde(default, skip_serializing_if = "is_default")]
2497 pub max_batch: i64,
2498 /// Maximum value of request max_bytes
2499 #[serde(default, skip_serializing_if = "is_default")]
2500 pub max_bytes: i64,
2501 /// Maximum value for request expiration
2502 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2503 pub max_expires: Duration,
2504 /// Threshold for consumer inactivity
2505 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2506 pub inactive_threshold: Duration,
2507 /// Number of consumer replicas
2508 #[serde(default, skip_serializing_if = "is_default")]
2509 pub num_replicas: usize,
2510 /// Force consumer to use memory storage.
2511 #[serde(default, skip_serializing_if = "is_default")]
2512 pub memory_storage: bool,
2513 #[cfg(feature = "server_2_10")]
2514 // Additional consumer metadata.
2515 #[serde(default, skip_serializing_if = "is_default")]
2516 pub metadata: HashMap<String, String>,
2517 /// Custom backoff for missed acknowledgments.
2518 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2519 pub backoff: Vec<Duration>,
2520
2521 /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2522 #[cfg(feature = "server_2_11")]
2523 #[serde(default, skip_serializing_if = "is_default")]
2524 pub priority_policy: PriorityPolicy,
2525 /// Priority groups for this consumer. Currently only one group is supported and is used
2526 /// in conjunction with [Config::priority_policy].
2527 #[cfg(feature = "server_2_11")]
2528 #[serde(default, skip_serializing_if = "is_default")]
2529 pub priority_groups: Vec<String>,
2530 /// For suspending the consumer until the deadline.
2531 #[cfg(feature = "server_2_11")]
2532 #[serde(
2533 default,
2534 with = "rfc3339::option",
2535 skip_serializing_if = "Option::is_none"
2536 )]
2537 pub pause_until: Option<OffsetDateTime>,
2538}
2539
2540impl IntoConsumerConfig for &Config {
2541 fn into_consumer_config(self) -> consumer::Config {
2542 self.clone().into_consumer_config()
2543 }
2544}
2545
2546impl IntoConsumerConfig for Config {
2547 fn into_consumer_config(self) -> consumer::Config {
2548 jetstream::consumer::Config {
2549 deliver_subject: None,
2550 name: self.name,
2551 durable_name: self.durable_name,
2552 description: self.description,
2553 deliver_group: None,
2554 deliver_policy: self.deliver_policy,
2555 ack_policy: self.ack_policy,
2556 ack_wait: self.ack_wait,
2557 max_deliver: self.max_deliver,
2558 filter_subject: self.filter_subject,
2559 #[cfg(feature = "server_2_10")]
2560 filter_subjects: self.filter_subjects,
2561 replay_policy: self.replay_policy,
2562 rate_limit: self.rate_limit,
2563 sample_frequency: self.sample_frequency,
2564 max_waiting: self.max_waiting,
2565 max_ack_pending: self.max_ack_pending,
2566 headers_only: self.headers_only,
2567 flow_control: false,
2568 idle_heartbeat: Duration::default(),
2569 max_batch: self.max_batch,
2570 max_bytes: self.max_bytes,
2571 max_expires: self.max_expires,
2572 inactive_threshold: self.inactive_threshold,
2573 num_replicas: self.num_replicas,
2574 memory_storage: self.memory_storage,
2575 #[cfg(feature = "server_2_10")]
2576 metadata: self.metadata,
2577 backoff: self.backoff,
2578 #[cfg(feature = "server_2_11")]
2579 priority_policy: self.priority_policy,
2580 #[cfg(feature = "server_2_11")]
2581 priority_groups: self.priority_groups,
2582 #[cfg(feature = "server_2_11")]
2583 pause_until: self.pause_until,
2584 }
2585 }
2586}
2587impl FromConsumer for Config {
2588 fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2589 if config.deliver_subject.is_some() {
2590 return Err(Box::new(std::io::Error::new(
2591 std::io::ErrorKind::Other,
2592 "pull consumer cannot have delivery subject",
2593 )));
2594 }
2595 Ok(Config {
2596 durable_name: config.durable_name,
2597 name: config.name,
2598 description: config.description,
2599 deliver_policy: config.deliver_policy,
2600 ack_policy: config.ack_policy,
2601 ack_wait: config.ack_wait,
2602 max_deliver: config.max_deliver,
2603 filter_subject: config.filter_subject,
2604 #[cfg(feature = "server_2_10")]
2605 filter_subjects: config.filter_subjects,
2606 replay_policy: config.replay_policy,
2607 rate_limit: config.rate_limit,
2608 sample_frequency: config.sample_frequency,
2609 max_waiting: config.max_waiting,
2610 max_ack_pending: config.max_ack_pending,
2611 headers_only: config.headers_only,
2612 max_batch: config.max_batch,
2613 max_bytes: config.max_bytes,
2614 max_expires: config.max_expires,
2615 inactive_threshold: config.inactive_threshold,
2616 num_replicas: config.num_replicas,
2617 memory_storage: config.memory_storage,
2618 #[cfg(feature = "server_2_10")]
2619 metadata: config.metadata,
2620 backoff: config.backoff,
2621 #[cfg(feature = "server_2_11")]
2622 priority_policy: config.priority_policy,
2623 #[cfg(feature = "server_2_11")]
2624 priority_groups: config.priority_groups,
2625 #[cfg(feature = "server_2_11")]
2626 pause_until: config.pause_until,
2627 })
2628 }
2629}
2630
2631#[derive(Clone, Copy, Debug, PartialEq)]
2632pub enum BatchRequestErrorKind {
2633 Publish,
2634 Flush,
2635 Serialize,
2636}
2637
2638impl std::fmt::Display for BatchRequestErrorKind {
2639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2640 match self {
2641 Self::Publish => write!(f, "publish failed"),
2642 Self::Flush => write!(f, "flush failed"),
2643 Self::Serialize => write!(f, "serialize failed"),
2644 }
2645 }
2646}
2647
2648pub type BatchRequestError = Error<BatchRequestErrorKind>;
2649
2650#[derive(Clone, Copy, Debug, PartialEq)]
2651pub enum BatchErrorKind {
2652 Subscribe,
2653 Pull,
2654 Flush,
2655 Serialize,
2656}
2657
2658impl std::fmt::Display for BatchErrorKind {
2659 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2660 match self {
2661 Self::Pull => write!(f, "pull request failed"),
2662 Self::Flush => write!(f, "flush failed"),
2663 Self::Serialize => write!(f, "serialize failed"),
2664 Self::Subscribe => write!(f, "subscribe failed"),
2665 }
2666 }
2667}
2668
2669pub type BatchError = Error<BatchErrorKind>;
2670
2671impl From<SubscribeError> for BatchError {
2672 fn from(err: SubscribeError) -> Self {
2673 BatchError::with_source(BatchErrorKind::Subscribe, err)
2674 }
2675}
2676
2677impl From<BatchRequestError> for BatchError {
2678 fn from(err: BatchRequestError) -> Self {
2679 BatchError::with_source(BatchErrorKind::Pull, err)
2680 }
2681}
2682
2683#[derive(Clone, Copy, Debug, PartialEq)]
2684pub enum ConsumerRecreateErrorKind {
2685 GetStream,
2686 Recreate,
2687 TimedOut,
2688}
2689
2690impl std::fmt::Display for ConsumerRecreateErrorKind {
2691 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2692 match self {
2693 Self::GetStream => write!(f, "error getting stream"),
2694 Self::Recreate => write!(f, "consumer creation failed"),
2695 Self::TimedOut => write!(f, "timed out"),
2696 }
2697 }
2698}
2699
2700pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2701
2702async fn recreate_consumer_stream(
2703 context: &Context,
2704 config: &OrderedConfig,
2705 stream_name: &str,
2706 consumer_name: &str,
2707 sequence: u64,
2708) -> Result<Stream, ConsumerRecreateError> {
2709 let span = tracing::span!(
2710 tracing::Level::DEBUG,
2711 "recreate_ordered_consumer",
2712 stream_name = stream_name,
2713 consumer_name = consumer_name,
2714 sequence = sequence
2715 );
2716 let _span_handle = span.enter();
2717 let config = config.to_owned();
2718 trace!("delete old consumer before creating new one");
2719
2720 tokio::time::timeout(
2721 Duration::from_secs(5),
2722 context.delete_consumer_from_stream(consumer_name, stream_name),
2723 )
2724 .await
2725 .ok();
2726
2727 let deliver_policy = {
2728 if sequence == 0 {
2729 DeliverPolicy::All
2730 } else {
2731 DeliverPolicy::ByStartSequence {
2732 start_sequence: sequence + 1,
2733 }
2734 }
2735 };
2736 trace!("create the new ordered consumer for sequence {}", sequence);
2737 let consumer = tokio::time::timeout(
2738 Duration::from_secs(5),
2739 context.create_consumer_on_stream(
2740 jetstream::consumer::pull::OrderedConfig {
2741 deliver_policy,
2742 ..config.clone()
2743 },
2744 stream_name,
2745 ),
2746 )
2747 .await
2748 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2749 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2750
2751 let config = Consumer {
2752 config: config.clone().into(),
2753 context: context.clone(),
2754 info: consumer.info,
2755 };
2756
2757 trace!("create iterator");
2758 let stream = tokio::time::timeout(
2759 Duration::from_secs(5),
2760 Stream::stream(
2761 BatchConfig {
2762 batch: 500,
2763 expires: Some(Duration::from_secs(30)),
2764 no_wait: false,
2765 max_bytes: 0,
2766 idle_heartbeat: Duration::from_secs(15),
2767 min_pending: None,
2768 min_ack_pending: None,
2769 group: None,
2770 },
2771 &config,
2772 ),
2773 )
2774 .await
2775 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2776 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2777 trace!("recreated consumer");
2778 stream
2779}