async_nats/jetstream/consumer/
pull.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16    future::{BoxFuture, Either},
17    FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32    connection::State,
33    error::Error,
34    jetstream::{self, Context},
35    StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44    backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45    StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50    /// Returns a stream of messages for Pull Consumer.
51    ///
52    /// # Example
53    ///
54    /// ```no_run
55    /// # #[tokio::main]
56    /// # async fn mains() -> Result<(), async_nats::Error> {
57    /// use futures::StreamExt;
58    /// use futures::TryStreamExt;
59    ///
60    /// let client = async_nats::connect("localhost:4222").await?;
61    /// let jetstream = async_nats::jetstream::new(client);
62    ///
63    /// let stream = jetstream
64    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
65    ///         name: "events".to_string(),
66    ///         max_messages: 10_000,
67    ///         ..Default::default()
68    ///     })
69    ///     .await?;
70    ///
71    /// jetstream.publish("events", "data".into()).await?;
72    ///
73    /// let consumer = stream
74    ///     .get_or_create_consumer(
75    ///         "consumer",
76    ///         async_nats::jetstream::consumer::pull::Config {
77    ///             durable_name: Some("consumer".to_string()),
78    ///             ..Default::default()
79    ///         },
80    ///     )
81    ///     .await?;
82    ///
83    /// let mut messages = consumer.messages().await?.take(100);
84    /// while let Some(Ok(message)) = messages.next().await {
85    ///     println!("got message {:?}", message);
86    ///     message.ack().await?;
87    /// }
88    /// Ok(())
89    /// # }
90    /// ```
91    pub async fn messages(&self) -> Result<Stream, StreamError> {
92        Stream::stream(
93            BatchConfig {
94                batch: 200,
95                expires: Some(Duration::from_secs(30)),
96                no_wait: false,
97                max_bytes: 0,
98                idle_heartbeat: Duration::from_secs(15),
99                min_pending: None,
100                min_ack_pending: None,
101                group: None,
102            },
103            self,
104        )
105        .await
106    }
107
108    /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
109    /// messages or bytes buffered.
110    ///
111    /// # Examples
112    ///
113    /// ```no_run
114    /// # #[tokio::main]
115    /// # async fn main() -> Result<(), async_nats::Error>  {
116    /// use async_nats::jetstream::consumer::PullConsumer;
117    /// use futures::StreamExt;
118    /// let client = async_nats::connect("localhost:4222").await?;
119    /// let jetstream = async_nats::jetstream::new(client);
120    ///
121    /// let consumer: PullConsumer = jetstream
122    ///     .get_stream("events")
123    ///     .await?
124    ///     .get_consumer("pull")
125    ///     .await?;
126    ///
127    /// let mut messages = consumer
128    ///     .stream()
129    ///     .max_messages_per_batch(100)
130    ///     .max_bytes_per_batch(1024)
131    ///     .messages()
132    ///     .await?;
133    ///
134    /// while let Some(message) = messages.next().await {
135    ///     let message = message?;
136    ///     println!("message: {:?}", message);
137    ///     message.ack().await?;
138    /// }
139    /// # Ok(())
140    /// # }
141    /// ```
142    pub fn stream(&self) -> StreamBuilder<'_> {
143        StreamBuilder::new(self)
144    }
145
146    pub async fn request_batch<I: Into<BatchConfig>>(
147        &self,
148        batch: I,
149        inbox: Subject,
150    ) -> Result<(), BatchRequestError> {
151        debug!("sending batch");
152        let subject = format!(
153            "{}.CONSUMER.MSG.NEXT.{}.{}",
154            self.context.prefix, self.info.stream_name, self.info.name
155        );
156
157        let payload = serde_json::to_vec(&batch.into())
158            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
159
160        self.context
161            .client
162            .publish_with_reply(subject, inbox, payload.into())
163            .await
164            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
165        debug!("batch request sent");
166        Ok(())
167    }
168
169    /// Returns a batch of specified number of messages, or if there are less messages on the
170    /// [Stream] than requested, returns all available messages.
171    ///
172    /// # Example
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn mains() -> Result<(), async_nats::Error> {
177    /// use futures::StreamExt;
178    /// use futures::TryStreamExt;
179    ///
180    /// let client = async_nats::connect("localhost:4222").await?;
181    /// let jetstream = async_nats::jetstream::new(client);
182    ///
183    /// let stream = jetstream
184    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
185    ///         name: "events".to_string(),
186    ///         max_messages: 10_000,
187    ///         ..Default::default()
188    ///     })
189    ///     .await?;
190    ///
191    /// jetstream.publish("events", "data".into()).await?;
192    ///
193    /// let consumer = stream
194    ///     .get_or_create_consumer(
195    ///         "consumer",
196    ///         async_nats::jetstream::consumer::pull::Config {
197    ///             durable_name: Some("consumer".to_string()),
198    ///             ..Default::default()
199    ///         },
200    ///     )
201    ///     .await?;
202    ///
203    /// for _ in 0..100 {
204    ///     jetstream.publish("events", "data".into()).await?;
205    /// }
206    ///
207    /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
208    /// // will finish after 100 messages, as that is the number of messages available on the
209    /// // stream.
210    /// while let Some(Ok(message)) = messages.next().await {
211    ///     println!("got message {:?}", message);
212    ///     message.ack().await?;
213    /// }
214    /// Ok(())
215    /// # }
216    /// ```
217    pub fn fetch(&self) -> FetchBuilder {
218        FetchBuilder::new(self)
219    }
220
221    /// Returns a batch of specified number of messages unless timeout happens first.
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// # #[tokio::main]
227    /// # async fn mains() -> Result<(), async_nats::Error> {
228    /// use futures::StreamExt;
229    /// use futures::TryStreamExt;
230    ///
231    /// let client = async_nats::connect("localhost:4222").await?;
232    /// let jetstream = async_nats::jetstream::new(client);
233    ///
234    /// let stream = jetstream
235    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
236    ///         name: "events".to_string(),
237    ///         max_messages: 10_000,
238    ///         ..Default::default()
239    ///     })
240    ///     .await?;
241    ///
242    /// jetstream.publish("events", "data".into()).await?;
243    ///
244    /// let consumer = stream
245    ///     .get_or_create_consumer(
246    ///         "consumer",
247    ///         async_nats::jetstream::consumer::pull::Config {
248    ///             durable_name: Some("consumer".to_string()),
249    ///             ..Default::default()
250    ///         },
251    ///     )
252    ///     .await?;
253    ///
254    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
255    /// while let Some(Ok(message)) = messages.next().await {
256    ///     println!("got message {:?}", message);
257    ///     message.ack().await?;
258    /// }
259    /// Ok(())
260    /// # }
261    /// ```
262    pub fn batch(&self) -> BatchBuilder {
263        BatchBuilder::new(self)
264    }
265
266    /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
267    /// messages in those batches.
268    ///
269    /// # Example
270    ///
271    /// ```no_run
272    /// # #[tokio::main]
273    /// # async fn mains() -> Result<(), async_nats::Error> {
274    /// use futures::StreamExt;
275    /// use futures::TryStreamExt;
276    ///
277    /// let client = async_nats::connect("localhost:4222").await?;
278    /// let jetstream = async_nats::jetstream::new(client);
279    ///
280    /// let stream = jetstream
281    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
282    ///         name: "events".to_string(),
283    ///         max_messages: 10_000,
284    ///         ..Default::default()
285    ///     })
286    ///     .await?;
287    ///
288    /// jetstream.publish("events", "data".into()).await?;
289    ///
290    /// let consumer = stream
291    ///     .get_or_create_consumer(
292    ///         "consumer",
293    ///         async_nats::jetstream::consumer::pull::Config {
294    ///             durable_name: Some("consumer".to_string()),
295    ///             ..Default::default()
296    ///         },
297    ///     )
298    ///     .await?;
299    ///
300    /// let mut iter = consumer.sequence(50).unwrap().take(10);
301    /// while let Ok(Some(mut batch)) = iter.try_next().await {
302    ///     while let Ok(Some(message)) = batch.try_next().await {
303    ///         println!("message received: {:?}", message);
304    ///     }
305    /// }
306    /// Ok(())
307    /// # }
308    /// ```
309    pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
310        let context = self.context.clone();
311        let subject = format!(
312            "{}.CONSUMER.MSG.NEXT.{}.{}",
313            self.context.prefix, self.info.stream_name, self.info.name
314        );
315
316        let request = serde_json::to_vec(&BatchConfig {
317            batch,
318            expires: Some(Duration::from_secs(60)),
319            ..Default::default()
320        })
321        .map(Bytes::from)
322        .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
323
324        Ok(Sequence {
325            context,
326            subject,
327            request,
328            pending_messages: batch,
329            next: None,
330        })
331    }
332}
333
334pub struct Batch {
335    pending_messages: usize,
336    subscriber: Subscriber,
337    context: Context,
338    timeout: Option<Pin<Box<Sleep>>>,
339    terminated: bool,
340}
341
342impl Batch {
343    async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
344        let inbox = Subject::from(consumer.context.client.new_inbox());
345        let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
346        consumer.request_batch(batch.clone(), inbox.clone()).await?;
347
348        let sleep = batch.expires.map(|expires| {
349            Box::pin(tokio::time::sleep(
350                expires.saturating_add(Duration::from_secs(5)),
351            ))
352        });
353
354        Ok(Batch {
355            pending_messages: batch.batch,
356            subscriber: subscription,
357            context: consumer.context.clone(),
358            terminated: false,
359            timeout: sleep,
360        })
361    }
362}
363
364impl futures::Stream for Batch {
365    type Item = Result<jetstream::Message, crate::Error>;
366
367    fn poll_next(
368        mut self: std::pin::Pin<&mut Self>,
369        cx: &mut std::task::Context<'_>,
370    ) -> std::task::Poll<Option<Self::Item>> {
371        if self.terminated {
372            return Poll::Ready(None);
373        }
374        if self.pending_messages == 0 {
375            self.terminated = true;
376            return Poll::Ready(None);
377        }
378        if let Some(sleep) = self.timeout.as_mut() {
379            match sleep.poll_unpin(cx) {
380                Poll::Ready(_) => {
381                    debug!("batch timeout timer triggered");
382                    // TODO(tp): Maybe we can be smarter here and before timing out, check if
383                    // we consumed all the messages from the subscription buffer in case of user
384                    // slowly consuming messages. Keep in mind that we time out here only if
385                    // for some reason we missed timeout from the server and few seconds have
386                    // passed since expected timeout message.
387                    self.terminated = true;
388                    return Poll::Ready(None);
389                }
390                Poll::Pending => (),
391            }
392        }
393        match self.subscriber.receiver.poll_recv(cx) {
394            Poll::Ready(maybe_message) => match maybe_message {
395                Some(message) => match message.status.unwrap_or(StatusCode::OK) {
396                    StatusCode::TIMEOUT => {
397                        debug!("received timeout. Iterator done");
398                        self.terminated = true;
399                        Poll::Ready(None)
400                    }
401                    StatusCode::IDLE_HEARTBEAT => {
402                        debug!("received heartbeat");
403                        Poll::Pending
404                    }
405                    // If this is fetch variant, terminate on no more messages.
406                    // We do not need to check if this is a fetch, not batch,
407                    // as only fetch will send back `NO_MESSAGES` status.
408                    StatusCode::NOT_FOUND => {
409                        debug!("received `NO_MESSAGES`. Iterator done");
410                        self.terminated = true;
411                        Poll::Ready(None)
412                    }
413                    StatusCode::OK => {
414                        debug!("received message");
415                        self.pending_messages -= 1;
416                        Poll::Ready(Some(Ok(jetstream::Message {
417                            context: self.context.clone(),
418                            message,
419                        })))
420                    }
421                    status => {
422                        debug!("received error");
423                        self.terminated = true;
424                        Poll::Ready(Some(Err(Box::new(std::io::Error::other(format!(
425                            "error while processing messages from the stream: {}, {:?}",
426                            status, message.description
427                        ))))))
428                    }
429                },
430                None => Poll::Ready(None),
431            },
432            std::task::Poll::Pending => std::task::Poll::Pending,
433        }
434    }
435}
436
437pub struct Sequence {
438    context: Context,
439    subject: String,
440    request: Bytes,
441    pending_messages: usize,
442    next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
443}
444
445impl futures::Stream for Sequence {
446    type Item = Result<Batch, MessagesError>;
447
448    fn poll_next(
449        mut self: std::pin::Pin<&mut Self>,
450        cx: &mut std::task::Context<'_>,
451    ) -> std::task::Poll<Option<Self::Item>> {
452        match self.next.as_mut() {
453            None => {
454                let context = self.context.clone();
455                let subject = self.subject.clone();
456                let request = self.request.clone();
457                let pending_messages = self.pending_messages;
458
459                let next = self.next.insert(Box::pin(async move {
460                    let inbox = context.client.new_inbox();
461                    let subscriber = context
462                        .client
463                        .subscribe(inbox.clone())
464                        .await
465                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
466
467                    context
468                        .client
469                        .publish_with_reply(subject, inbox, request)
470                        .await
471                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
472
473                    // TODO(tp): Add timeout config and defaults.
474                    Ok(Batch {
475                        pending_messages,
476                        subscriber,
477                        context,
478                        terminated: false,
479                        timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
480                    })
481                }));
482
483                match next.as_mut().poll(cx) {
484                    Poll::Ready(result) => {
485                        self.next = None;
486                        Poll::Ready(Some(result.map_err(|err| {
487                            MessagesError::with_source(MessagesErrorKind::Pull, err)
488                        })))
489                    }
490                    Poll::Pending => Poll::Pending,
491                }
492            }
493
494            Some(next) => match next.as_mut().poll(cx) {
495                Poll::Ready(result) => {
496                    self.next = None;
497                    Poll::Ready(Some(result.map_err(|err| {
498                        MessagesError::with_source(MessagesErrorKind::Pull, err)
499                    })))
500                }
501                Poll::Pending => Poll::Pending,
502            },
503        }
504    }
505}
506
507impl Consumer<OrderedConfig> {
508    /// Returns a stream of messages for Ordered Pull Consumer.
509    ///
510    /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
511    /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
512    /// sees mismatch.
513    ///
514    /// # Example
515    ///
516    /// ```no_run
517    /// # #[tokio::main]
518    /// # async fn mains() -> Result<(), async_nats::Error> {
519    /// use futures::StreamExt;
520    /// use futures::TryStreamExt;
521    ///
522    /// let client = async_nats::connect("localhost:4222").await?;
523    /// let jetstream = async_nats::jetstream::new(client);
524    ///
525    /// let stream = jetstream
526    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
527    ///         name: "events".to_string(),
528    ///         max_messages: 10_000,
529    ///         ..Default::default()
530    ///     })
531    ///     .await?;
532    ///
533    /// jetstream.publish("events", "data".into()).await?;
534    ///
535    /// let consumer = stream
536    ///     .get_or_create_consumer(
537    ///         "consumer",
538    ///         async_nats::jetstream::consumer::pull::OrderedConfig {
539    ///             name: Some("consumer".to_string()),
540    ///             ..Default::default()
541    ///         },
542    ///     )
543    ///     .await?;
544    ///
545    /// let mut messages = consumer.messages().await?.take(100);
546    /// while let Some(Ok(message)) = messages.next().await {
547    ///     println!("got message {:?}", message);
548    /// }
549    /// Ok(())
550    /// # }
551    /// ```
552    pub async fn messages(self) -> Result<Ordered, StreamError> {
553        let config = Consumer {
554            config: self.config.clone().into(),
555            context: self.context.clone(),
556            info: self.info.clone(),
557        };
558        let stream = Stream::stream(
559            BatchConfig {
560                batch: 500,
561                expires: Some(Duration::from_secs(30)),
562                no_wait: false,
563                max_bytes: 0,
564                idle_heartbeat: Duration::from_secs(15),
565                min_pending: None,
566                min_ack_pending: None,
567                group: None,
568            },
569            &config,
570        )
571        .await?;
572
573        Ok(Ordered {
574            consumer_sequence: 0,
575            stream_sequence: 0,
576            missed_heartbeats: false,
577            create_stream: None,
578            context: self.context.clone(),
579            consumer_name: self
580                .config
581                .name
582                .clone()
583                .unwrap_or_else(|| self.context.client.new_inbox()),
584            consumer: self.config,
585            stream: Some(stream),
586            stream_name: self.info.stream_name.clone(),
587        })
588    }
589}
590
591/// Configuration for consumers. From a high level, the
592/// `durable_name` and `deliver_subject` fields have a particularly
593/// strong influence on the consumer's overall behavior.
594#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
595pub struct OrderedConfig {
596    /// A name of the consumer. Can be specified for both durable and ephemeral
597    /// consumers.
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub name: Option<String>,
600    /// A short description of the purpose of this consumer.
601    #[serde(default, skip_serializing_if = "Option::is_none")]
602    pub description: Option<String>,
603    #[serde(default, skip_serializing_if = "is_default")]
604    pub filter_subject: String,
605    #[cfg(feature = "server_2_10")]
606    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
607    #[serde(default, skip_serializing_if = "is_default")]
608    pub filter_subjects: Vec<String>,
609    /// Whether messages are sent as quickly as possible or at the rate of receipt
610    pub replay_policy: ReplayPolicy,
611    /// The rate of message delivery in bits per second
612    #[serde(default, skip_serializing_if = "is_default")]
613    pub rate_limit: u64,
614    /// What percentage of acknowledgments should be samples for observability, 0-100
615    #[serde(
616        rename = "sample_freq",
617        with = "super::sample_freq_deser",
618        default,
619        skip_serializing_if = "is_default"
620    )]
621    pub sample_frequency: u8,
622    /// Only deliver headers without payloads.
623    #[serde(default, skip_serializing_if = "is_default")]
624    pub headers_only: bool,
625    /// Allows for a variety of options that determine how this consumer will receive messages
626    #[serde(flatten)]
627    pub deliver_policy: DeliverPolicy,
628    /// The maximum number of waiting consumers.
629    #[serde(default, skip_serializing_if = "is_default")]
630    pub max_waiting: i64,
631    #[cfg(feature = "server_2_10")]
632    // Additional consumer metadata.
633    #[serde(default, skip_serializing_if = "is_default")]
634    pub metadata: HashMap<String, String>,
635    // Maximum number of messages that can be requested in single Pull Request.
636    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
637    // [stream]
638    pub max_batch: i64,
639    // Maximum number of bytes that can be requested in single Pull Request.
640    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
641    // [stream]
642    pub max_bytes: i64,
643    // Maximum expiry that can be set for a single Pull Request.
644    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
645    // [stream]
646    pub max_expires: Duration,
647}
648
649impl From<OrderedConfig> for Config {
650    fn from(config: OrderedConfig) -> Self {
651        Config {
652            durable_name: None,
653            name: config.name,
654            description: config.description,
655            deliver_policy: config.deliver_policy,
656            ack_policy: AckPolicy::None,
657            ack_wait: Duration::default(),
658            max_deliver: 1,
659            filter_subject: config.filter_subject,
660            #[cfg(feature = "server_2_10")]
661            filter_subjects: config.filter_subjects,
662            replay_policy: config.replay_policy,
663            rate_limit: config.rate_limit,
664            sample_frequency: config.sample_frequency,
665            max_waiting: config.max_waiting,
666            max_ack_pending: 0,
667            headers_only: config.headers_only,
668            max_batch: config.max_batch,
669            max_bytes: config.max_bytes,
670            max_expires: config.max_expires,
671            inactive_threshold: Duration::from_secs(30),
672            num_replicas: 1,
673            memory_storage: true,
674            #[cfg(feature = "server_2_10")]
675            metadata: config.metadata,
676            backoff: Vec::new(),
677            #[cfg(feature = "server_2_11")]
678            priority_policy: PriorityPolicy::None,
679            #[cfg(feature = "server_2_11")]
680            priority_groups: Vec::new(),
681            #[cfg(feature = "server_2_11")]
682            pause_until: None,
683        }
684    }
685}
686
687impl FromConsumer for OrderedConfig {
688    fn try_from_consumer_config(
689        config: crate::jetstream::consumer::Config,
690    ) -> Result<Self, crate::Error>
691    where
692        Self: Sized,
693    {
694        Ok(OrderedConfig {
695            name: config.name,
696            description: config.description,
697            filter_subject: config.filter_subject,
698            #[cfg(feature = "server_2_10")]
699            filter_subjects: config.filter_subjects,
700            replay_policy: config.replay_policy,
701            rate_limit: config.rate_limit,
702            sample_frequency: config.sample_frequency,
703            headers_only: config.headers_only,
704            deliver_policy: config.deliver_policy,
705            max_waiting: config.max_waiting,
706            #[cfg(feature = "server_2_10")]
707            metadata: config.metadata,
708            max_batch: config.max_batch,
709            max_bytes: config.max_bytes,
710            max_expires: config.max_expires,
711        })
712    }
713}
714
715impl IntoConsumerConfig for OrderedConfig {
716    fn into_consumer_config(self) -> super::Config {
717        jetstream::consumer::Config {
718            deliver_subject: None,
719            durable_name: None,
720            name: self.name,
721            description: self.description,
722            deliver_group: None,
723            deliver_policy: self.deliver_policy,
724            ack_policy: AckPolicy::None,
725            ack_wait: Duration::default(),
726            max_deliver: 1,
727            filter_subject: self.filter_subject,
728            #[cfg(feature = "server_2_10")]
729            filter_subjects: self.filter_subjects,
730            replay_policy: self.replay_policy,
731            rate_limit: self.rate_limit,
732            sample_frequency: self.sample_frequency,
733            max_waiting: self.max_waiting,
734            max_ack_pending: 0,
735            headers_only: self.headers_only,
736            flow_control: false,
737            idle_heartbeat: Duration::default(),
738            max_batch: 0,
739            max_bytes: 0,
740            max_expires: Duration::default(),
741            inactive_threshold: Duration::from_secs(30),
742            num_replicas: 1,
743            memory_storage: true,
744            #[cfg(feature = "server_2_10")]
745            metadata: self.metadata,
746            backoff: Vec::new(),
747            #[cfg(feature = "server_2_11")]
748            priority_policy: PriorityPolicy::None,
749            #[cfg(feature = "server_2_11")]
750            priority_groups: Vec::new(),
751            #[cfg(feature = "server_2_11")]
752            pause_until: None,
753        }
754    }
755}
756
757pub struct Ordered {
758    context: Context,
759    stream_name: String,
760    consumer: OrderedConfig,
761    consumer_name: String,
762    stream: Option<Stream>,
763    create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
764    consumer_sequence: u64,
765    stream_sequence: u64,
766    missed_heartbeats: bool,
767}
768
769impl futures::Stream for Ordered {
770    type Item = Result<jetstream::Message, OrderedError>;
771
772    fn poll_next(
773        mut self: Pin<&mut Self>,
774        cx: &mut std::task::Context<'_>,
775    ) -> Poll<Option<Self::Item>> {
776        let mut recreate = false;
777        // Poll messages
778        if let Some(stream) = self.stream.as_mut() {
779            match stream.poll_next_unpin(cx) {
780                Poll::Ready(message) => match message {
781                    Some(message) => match message {
782                        Ok(message) => {
783                            self.missed_heartbeats = false;
784                            let info = message.info().map_err(|err| {
785                                OrderedError::with_source(OrderedErrorKind::Other, err)
786                            })?;
787                            trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
788                                           self.consumer_sequence,
789                                           self.stream_sequence,
790                                           info.consumer_sequence,
791                                           info.stream_sequence);
792                            if info.consumer_sequence != self.consumer_sequence + 1 {
793                                debug!(
794                                    "ordered consumer mismatch. current {}, info: {}",
795                                    self.consumer_sequence, info.consumer_sequence
796                                );
797                                recreate = true;
798                                self.consumer_sequence = 0;
799                            } else {
800                                self.stream_sequence = info.stream_sequence;
801                                self.consumer_sequence = info.consumer_sequence;
802                                return Poll::Ready(Some(Ok(message)));
803                            }
804                        }
805                        Err(err) => match err.kind() {
806                            MessagesErrorKind::MissingHeartbeat => {
807                                // If we have missed heartbeats set, it means this is a second
808                                // missed heartbeat, so we need to recreate consumer.
809                                if self.missed_heartbeats {
810                                    self.consumer_sequence = 0;
811                                    recreate = true;
812                                } else {
813                                    self.missed_heartbeats = true;
814                                }
815                            }
816                            MessagesErrorKind::ConsumerDeleted
817                            | MessagesErrorKind::NoResponders => {
818                                recreate = true;
819                                self.consumer_sequence = 0;
820                            }
821                            MessagesErrorKind::Pull
822                            | MessagesErrorKind::PushBasedConsumer
823                            | MessagesErrorKind::Other => {
824                                return Poll::Ready(Some(Err(err.into())));
825                            }
826                        },
827                    },
828                    None => return Poll::Ready(None),
829                },
830                Poll::Pending => (),
831            }
832        }
833        // Recreate consumer if needed
834        if recreate {
835            self.stream = None;
836            self.create_stream = Some(Box::pin({
837                let context = self.context.clone();
838                let config = self.consumer.clone();
839                let stream_name = self.stream_name.clone();
840                let consumer_name = self.consumer_name.clone();
841                let sequence = self.stream_sequence;
842                async move {
843                    tryhard::retry_fn(|| {
844                        recreate_consumer_stream(
845                            &context,
846                            &config,
847                            &stream_name,
848                            &consumer_name,
849                            sequence,
850                        )
851                    })
852                    .retries(u32::MAX)
853                    .custom_backoff(backoff)
854                    .await
855                }
856            }))
857        }
858        // check for recreation future
859        if let Some(result) = self.create_stream.as_mut() {
860            match result.poll_unpin(cx) {
861                Poll::Ready(result) => match result {
862                    Ok(stream) => {
863                        self.create_stream = None;
864                        self.stream = Some(stream);
865                        return self.poll_next(cx);
866                    }
867                    Err(err) => {
868                        return Poll::Ready(Some(Err(OrderedError::with_source(
869                            OrderedErrorKind::Recreate,
870                            err,
871                        ))))
872                    }
873                },
874                Poll::Pending => (),
875            }
876        }
877        Poll::Pending
878    }
879}
880
881pub struct Stream {
882    pending_messages: usize,
883    pending_bytes: usize,
884    request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
885    request_tx: tokio::sync::watch::Sender<()>,
886    subscriber: Subscriber,
887    batch_config: BatchConfig,
888    context: Context,
889    pending_request: bool,
890    task_handle: JoinHandle<()>,
891    terminated: bool,
892    heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
893    started: Option<tokio::sync::oneshot::Sender<()>>,
894}
895
896impl Drop for Stream {
897    fn drop(&mut self) {
898        self.task_handle.abort();
899    }
900}
901
902impl Stream {
903    async fn stream(
904        batch_config: BatchConfig,
905        consumer: &Consumer<Config>,
906    ) -> Result<Stream, StreamError> {
907        let inbox = consumer.context.client.new_inbox();
908        let subscription = consumer
909            .context
910            .client
911            .subscribe(inbox.clone())
912            .await
913            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
914        let subject = format!(
915            "{}.CONSUMER.MSG.NEXT.{}.{}",
916            consumer.context.prefix, consumer.info.stream_name, consumer.info.name
917        );
918
919        let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
920        let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
921        let (started_tx, started_rx) = tokio::sync::oneshot::channel();
922        let task_handle = tokio::task::spawn({
923            let batch = batch_config.clone();
924            let consumer = consumer.clone();
925            let mut context = consumer.context.clone();
926            let inbox = inbox.clone();
927            async move {
928                started_rx.await.ok();
929                loop {
930                    // this is just in edge case of missing response for some reason.
931                    let expires = batch_config
932                        .expires
933                        .map(|expires| {
934                            if expires.is_zero() {
935                                Either::Left(future::pending())
936                            } else {
937                                Either::Right(tokio::time::sleep(
938                                    expires.saturating_add(Duration::from_secs(5)),
939                                ))
940                            }
941                        })
942                        .unwrap_or_else(|| Either::Left(future::pending()));
943                    // Need to check previous state, as `changed` will always fire on first
944                    // call.
945                    let prev_state = context.client.state.borrow().to_owned();
946                    let mut pending_reset = false;
947
948                    tokio::select! {
949                       _ = context.client.state.changed() => {
950                            let state = context.client.state.borrow().to_owned();
951                            if !(state == crate::connection::State::Connected
952                                && prev_state != State::Connected) {
953                                    continue;
954                                }
955                            debug!("detected !Connected -> Connected state change");
956
957                            match tryhard::retry_fn(|| consumer.fetch_info())
958                                .retries(5).custom_backoff(backoff).await
959                                .map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
960                                    Ok(info) => {
961                                        if info.num_waiting == 0 {
962                                            pending_reset = true;
963                                        }
964                                    }
965                                    Err(err) => {
966                                         if let Err(err) = request_result_tx.send(Err(err)).await {
967                                            debug!("failed to sent request result: {}", err);
968                                        }
969                                    },
970                            }
971                        },
972                        _ = request_rx.changed() => debug!("task received request request"),
973                        _ = expires => {
974                            pending_reset = true;
975                            debug!("expired pull request")},
976                    }
977
978                    let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
979                    let result = context
980                        .client
981                        .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
982                        .await
983                        .map(|_| pending_reset);
984                    // TODO: add tracing instead of ignoring this.
985                    request_result_tx
986                        .send(result.map(|_| pending_reset).map_err(|err| {
987                            crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
988                                .into()
989                        }))
990                        .await
991                        .ok();
992                    trace!("result send over tx");
993                }
994            }
995        });
996
997        Ok(Stream {
998            task_handle,
999            request_result_rx,
1000            request_tx,
1001            batch_config,
1002            pending_messages: 0,
1003            pending_bytes: 0,
1004            subscriber: subscription,
1005            context: consumer.context.clone(),
1006            pending_request: false,
1007            terminated: false,
1008            heartbeat_timeout: None,
1009            started: Some(started_tx),
1010        })
1011    }
1012}
1013
1014#[derive(Clone, Copy, Debug, PartialEq)]
1015pub enum OrderedErrorKind {
1016    MissingHeartbeat,
1017    ConsumerDeleted,
1018    Pull,
1019    PushBasedConsumer,
1020    Recreate,
1021    NoResponders,
1022    Other,
1023}
1024
1025impl std::fmt::Display for OrderedErrorKind {
1026    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027        match self {
1028            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1029            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1030            Self::Pull => write!(f, "pull request failed"),
1031            Self::Other => write!(f, "error"),
1032            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1033            Self::Recreate => write!(f, "consumer recreation failed"),
1034            Self::NoResponders => write!(f, "no responders"),
1035        }
1036    }
1037}
1038
1039pub type OrderedError = Error<OrderedErrorKind>;
1040
1041impl From<MessagesError> for OrderedError {
1042    fn from(err: MessagesError) -> Self {
1043        match err.kind() {
1044            MessagesErrorKind::MissingHeartbeat => {
1045                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1046            }
1047            MessagesErrorKind::ConsumerDeleted => {
1048                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1049            }
1050            MessagesErrorKind::Pull => OrderedError {
1051                kind: OrderedErrorKind::Pull,
1052                source: err.source,
1053            },
1054            MessagesErrorKind::PushBasedConsumer => {
1055                OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1056            }
1057            MessagesErrorKind::Other => OrderedError {
1058                kind: OrderedErrorKind::Other,
1059                source: err.source,
1060            },
1061            MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
1062        }
1063    }
1064}
1065
1066#[derive(Clone, Copy, Debug, PartialEq)]
1067pub enum MessagesErrorKind {
1068    MissingHeartbeat,
1069    ConsumerDeleted,
1070    Pull,
1071    PushBasedConsumer,
1072    NoResponders,
1073    Other,
1074}
1075
1076impl std::fmt::Display for MessagesErrorKind {
1077    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1078        match self {
1079            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1080            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1081            Self::Pull => write!(f, "pull request failed"),
1082            Self::Other => write!(f, "error"),
1083            Self::NoResponders => write!(f, "no responders"),
1084            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1085        }
1086    }
1087}
1088
1089pub type MessagesError = Error<MessagesErrorKind>;
1090
1091impl futures::Stream for Stream {
1092    type Item = Result<jetstream::Message, MessagesError>;
1093
1094    fn poll_next(
1095        mut self: std::pin::Pin<&mut Self>,
1096        cx: &mut std::task::Context<'_>,
1097    ) -> std::task::Poll<Option<Self::Item>> {
1098        if let Some(started) = self.started.take() {
1099            trace!("stream started, sending started signal");
1100            if started.send(()).is_err() {
1101                debug!("failed to send started signal");
1102            }
1103        }
1104        if self.terminated {
1105            return Poll::Ready(None);
1106        }
1107
1108        if !self.batch_config.idle_heartbeat.is_zero() {
1109            trace!("checking idle hearbeats");
1110            let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1111            match self
1112                .heartbeat_timeout
1113                .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1114                .poll_unpin(cx)
1115            {
1116                Poll::Ready(_) => {
1117                    self.heartbeat_timeout = None;
1118                    return Poll::Ready(Some(Err(MessagesError::new(
1119                        MessagesErrorKind::MissingHeartbeat,
1120                    ))));
1121                }
1122                Poll::Pending => (),
1123            }
1124        }
1125
1126        loop {
1127            trace!("pending messages: {}", self.pending_messages);
1128            if (self.pending_messages <= self.batch_config.batch / 2
1129                || (self.batch_config.max_bytes > 0
1130                    && self.pending_bytes <= self.batch_config.max_bytes / 2))
1131                && !self.pending_request
1132            {
1133                debug!("pending messages reached threshold to send new fetch request");
1134                self.request_tx.send(()).ok();
1135                self.pending_request = true;
1136            }
1137
1138            match self.request_result_rx.poll_recv(cx) {
1139                Poll::Ready(resp) => match resp {
1140                    Some(resp) => match resp {
1141                        Ok(reset) => {
1142                            trace!("request response: {:?}", reset);
1143                            debug!("request sent, setting pending messages");
1144                            if reset {
1145                                self.pending_messages = self.batch_config.batch;
1146                                self.pending_bytes = self.batch_config.max_bytes;
1147                            } else {
1148                                self.pending_messages += self.batch_config.batch;
1149                                self.pending_bytes += self.batch_config.max_bytes;
1150                            }
1151                            self.pending_request = false;
1152                            continue;
1153                        }
1154                        Err(err) => {
1155                            return Poll::Ready(Some(Err(MessagesError::with_source(
1156                                MessagesErrorKind::Pull,
1157                                err,
1158                            ))))
1159                        }
1160                    },
1161                    None => return Poll::Ready(None),
1162                },
1163                Poll::Pending => {
1164                    trace!("pending result");
1165                }
1166            }
1167
1168            trace!("polling subscriber");
1169            match self.subscriber.receiver.poll_recv(cx) {
1170                Poll::Ready(maybe_message) => {
1171                    self.heartbeat_timeout = None;
1172                    match maybe_message {
1173                        Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1174                            StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1175                                debug!("received status message: {:?}", message);
1176                                // If consumer has been deleted, error and shutdown the iterator.
1177                                if message.description.as_deref() == Some("Consumer Deleted") {
1178                                    self.terminated = true;
1179                                    return Poll::Ready(Some(Err(MessagesError::new(
1180                                        MessagesErrorKind::ConsumerDeleted,
1181                                    ))));
1182                                }
1183                                // If consumer is not pull based, error and shutdown the iterator.
1184                                if message.description.as_deref() == Some("Consumer is push based")
1185                                {
1186                                    self.terminated = true;
1187                                    return Poll::Ready(Some(Err(MessagesError::new(
1188                                        MessagesErrorKind::PushBasedConsumer,
1189                                    ))));
1190                                }
1191
1192                                // Do accounting for messages left after terminated/completed pull request.
1193                                let pending_messages = message
1194                                    .headers
1195                                    .as_ref()
1196                                    .and_then(|headers| headers.get("Nats-Pending-Messages"))
1197                                    .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1198                                    .map_err(|err| {
1199                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1200                                    })?;
1201
1202                                let pending_bytes = message
1203                                    .headers
1204                                    .as_ref()
1205                                    .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1206                                    .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1207                                    .map_err(|err| {
1208                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1209                                    })?;
1210
1211                                debug!(
1212                                    "timeout reached. remaining messages: {}, bytes {}",
1213                                    pending_messages, pending_bytes
1214                                );
1215                                self.pending_messages =
1216                                    self.pending_messages.saturating_sub(pending_messages);
1217                                trace!("message bytes len: {}", pending_bytes);
1218                                self.pending_bytes =
1219                                    self.pending_bytes.saturating_sub(pending_bytes);
1220                                continue;
1221                            }
1222                            // Idle Hearbeat means we have no messages, but consumer is fine.
1223                            StatusCode::IDLE_HEARTBEAT => {
1224                                debug!("received idle heartbeat");
1225                                continue;
1226                            }
1227                            // We got an message from a stream.
1228                            StatusCode::OK => {
1229                                trace!("message received");
1230                                self.pending_messages = self.pending_messages.saturating_sub(1);
1231                                self.pending_bytes =
1232                                    self.pending_bytes.saturating_sub(message.length);
1233                                return Poll::Ready(Some(Ok(jetstream::Message {
1234                                    context: self.context.clone(),
1235                                    message,
1236                                })));
1237                            }
1238                            StatusCode::NO_RESPONDERS => {
1239                                debug!("received no responders");
1240                                return Poll::Ready(Some(Err(MessagesError::new(
1241                                    MessagesErrorKind::NoResponders,
1242                                ))));
1243                            }
1244                            status => {
1245                                debug!("received unknown message: {:?}", message);
1246                                return Poll::Ready(Some(Err(MessagesError::with_source(
1247                                    MessagesErrorKind::Other,
1248                                    format!(
1249                                        "error while processing messages from the stream: {}, {:?}",
1250                                        status, message.description
1251                                    ),
1252                                ))));
1253                            }
1254                        },
1255                        None => return Poll::Ready(None),
1256                    }
1257                }
1258                Poll::Pending => {
1259                    debug!("subscriber still pending");
1260                    return std::task::Poll::Pending;
1261                }
1262            }
1263        }
1264    }
1265}
1266
1267/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1268///
1269/// # Examples
1270///
1271/// ```no_run
1272/// # #[tokio::main]
1273/// # async fn main() -> Result<(), async_nats::Error>  {
1274/// use futures::StreamExt;
1275/// use async_nats::jetstream::consumer::PullConsumer;
1276/// let client = async_nats::connect("localhost:4222").await?;
1277/// let jetstream = async_nats::jetstream::new(client);
1278///
1279/// let consumer: PullConsumer = jetstream
1280///     .get_stream("events").await?
1281///     .get_consumer("pull").await?;
1282///
1283/// let mut messages = consumer.stream()
1284///     .max_messages_per_batch(100)
1285///     .max_bytes_per_batch(1024)
1286///     .messages().await?;
1287///
1288/// while let Some(message) = messages.next().await {
1289///     let message = message?;
1290///     println!("message: {:?}", message);
1291///     message.ack().await?;
1292/// }
1293/// # Ok(())
1294/// # }
1295pub struct StreamBuilder<'a> {
1296    batch: usize,
1297    max_bytes: usize,
1298    heartbeat: Duration,
1299    expires: Duration,
1300    group: Option<String>,
1301    min_pending: Option<usize>,
1302    min_ack_pending: Option<usize>,
1303    consumer: &'a Consumer<Config>,
1304}
1305
1306impl<'a> StreamBuilder<'a> {
1307    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1308        StreamBuilder {
1309            consumer,
1310            batch: 200,
1311            max_bytes: 0,
1312            expires: Duration::from_secs(30),
1313            heartbeat: Duration::default(),
1314            group: None,
1315            min_pending: None,
1316            min_ack_pending: None,
1317        }
1318    }
1319
1320    /// Sets max bytes that can be buffered on the Client while processing already received
1321    /// messages.
1322    /// Higher values will yield better performance, but also potentially increase memory usage if
1323    /// application is acknowledging messages much slower than they arrive.
1324    ///
1325    /// Default values should provide reasonable balance between performance and memory usage.
1326    ///
1327    /// # Examples
1328    ///
1329    /// ```no_run
1330    /// # #[tokio::main]
1331    /// # async fn main() -> Result<(), async_nats::Error>  {
1332    /// use async_nats::jetstream::consumer::PullConsumer;
1333    /// use futures::StreamExt;
1334    /// let client = async_nats::connect("localhost:4222").await?;
1335    /// let jetstream = async_nats::jetstream::new(client);
1336    ///
1337    /// let consumer: PullConsumer = jetstream
1338    ///     .get_stream("events")
1339    ///     .await?
1340    ///     .get_consumer("pull")
1341    ///     .await?;
1342    ///
1343    /// let mut messages = consumer
1344    ///     .stream()
1345    ///     .max_bytes_per_batch(1024)
1346    ///     .messages()
1347    ///     .await?;
1348    ///
1349    /// while let Some(message) = messages.next().await {
1350    ///     let message = message?;
1351    ///     println!("message: {:?}", message);
1352    ///     message.ack().await?;
1353    /// }
1354    /// # Ok(())
1355    /// # }
1356    /// ```
1357    pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1358        self.max_bytes = max_bytes;
1359        self
1360    }
1361
1362    /// Sets max number of messages that can be buffered on the Client while processing already received
1363    /// messages.
1364    /// Higher values will yield better performance, but also potentially increase memory usage if
1365    /// application is acknowledging messages much slower than they arrive.
1366    ///
1367    /// Default values should provide reasonable balance between performance and memory usage.
1368    ///
1369    /// # Examples
1370    ///
1371    /// ```no_run
1372    /// # #[tokio::main]
1373    /// # async fn main() -> Result<(), async_nats::Error>  {
1374    /// use async_nats::jetstream::consumer::PullConsumer;
1375    /// use futures::StreamExt;
1376    /// let client = async_nats::connect("localhost:4222").await?;
1377    /// let jetstream = async_nats::jetstream::new(client);
1378    ///
1379    /// let consumer: PullConsumer = jetstream
1380    ///     .get_stream("events")
1381    ///     .await?
1382    ///     .get_consumer("pull")
1383    ///     .await?;
1384    ///
1385    /// let mut messages = consumer
1386    ///     .stream()
1387    ///     .max_messages_per_batch(100)
1388    ///     .messages()
1389    ///     .await?;
1390    ///
1391    /// while let Some(message) = messages.next().await {
1392    ///     let message = message?;
1393    ///     println!("message: {:?}", message);
1394    ///     message.ack().await?;
1395    /// }
1396    /// # Ok(())
1397    /// # }
1398    /// ```
1399    pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1400        self.batch = batch;
1401        self
1402    }
1403
1404    /// Sets heartbeat which will be send by the server if there are no messages for a given
1405    /// [Consumer] pending.
1406    ///
1407    /// # Examples
1408    ///
1409    /// ```no_run
1410    /// # #[tokio::main]
1411    /// # async fn main() -> Result<(), async_nats::Error>  {
1412    /// use async_nats::jetstream::consumer::PullConsumer;
1413    /// use futures::StreamExt;
1414    /// let client = async_nats::connect("localhost:4222").await?;
1415    /// let jetstream = async_nats::jetstream::new(client);
1416    ///
1417    /// let consumer: PullConsumer = jetstream
1418    ///     .get_stream("events")
1419    ///     .await?
1420    ///     .get_consumer("pull")
1421    ///     .await?;
1422    ///
1423    /// let mut messages = consumer
1424    ///     .stream()
1425    ///     .heartbeat(std::time::Duration::from_secs(10))
1426    ///     .messages()
1427    ///     .await?;
1428    ///
1429    /// while let Some(message) = messages.next().await {
1430    ///     let message = message?;
1431    ///     println!("message: {:?}", message);
1432    ///     message.ack().await?;
1433    /// }
1434    /// # Ok(())
1435    /// # }
1436    /// ```
1437    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1438        self.heartbeat = heartbeat;
1439        self
1440    }
1441
1442    /// Low level API that does not need tweaking for most use cases.
1443    /// Sets how long each batch request waits for whole batch of messages before timing out.
1444    /// [Consumer] pending.
1445    ///
1446    /// # Examples
1447    ///
1448    /// ```no_run
1449    /// # #[tokio::main]
1450    /// # async fn main() -> Result<(), async_nats::Error>  {
1451    /// use async_nats::jetstream::consumer::PullConsumer;
1452    /// use futures::StreamExt;
1453    /// let client = async_nats::connect("localhost:4222").await?;
1454    /// let jetstream = async_nats::jetstream::new(client);
1455    ///
1456    /// let consumer: PullConsumer = jetstream
1457    ///     .get_stream("events")
1458    ///     .await?
1459    ///     .get_consumer("pull")
1460    ///     .await?;
1461    ///
1462    /// let mut messages = consumer
1463    ///     .stream()
1464    ///     .expires(std::time::Duration::from_secs(30))
1465    ///     .messages()
1466    ///     .await?;
1467    ///
1468    /// while let Some(message) = messages.next().await {
1469    ///     let message = message?;
1470    ///     println!("message: {:?}", message);
1471    ///     message.ack().await?;
1472    /// }
1473    /// # Ok(())
1474    /// # }
1475    /// ```
1476    pub fn expires(mut self, expires: Duration) -> Self {
1477        self.expires = expires;
1478        self
1479    }
1480
1481    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1482    /// messages for a [Consumer].
1483    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1484    ///
1485    /// # Examples
1486    ///
1487    /// ```no_run
1488    /// # #[tokio::main]
1489    /// # async fn main() -> Result<(), async_nats::Error>  {
1490    /// use async_nats::jetstream::consumer::PullConsumer;
1491    /// use futures::StreamExt;
1492    /// let client = async_nats::connect("localhost:4222").await?;
1493    /// let jetstream = async_nats::jetstream::new(client);
1494    ///
1495    /// let consumer: PullConsumer = jetstream
1496    ///     .get_stream("events")
1497    ///     .await?
1498    ///     .get_consumer("pull")
1499    ///     .await?;
1500    ///
1501    /// let mut messages = consumer
1502    ///     .stream()
1503    ///     .expires(std::time::Duration::from_secs(30))
1504    ///     .group("A")
1505    ///     .min_pending(100)
1506    ///     .messages()
1507    ///     .await?;
1508    ///
1509    /// while let Some(message) = messages.next().await {
1510    ///     let message = message?;
1511    ///     println!("message: {:?}", message);
1512    ///     message.ack().await?;
1513    /// }
1514    /// # Ok(())
1515    /// # }
1516    /// ```
1517    pub fn min_pending(mut self, min_pending: usize) -> Self {
1518        self.min_pending = Some(min_pending);
1519        self
1520    }
1521
1522    /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1523    /// messages for a [Consumer].
1524    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1525    ///
1526    /// # Examples
1527    ///
1528    /// ```no_run
1529    /// # #[tokio::main]
1530    /// # async fn main() -> Result<(), async_nats::Error>  {
1531    /// use async_nats::jetstream::consumer::PullConsumer;
1532    /// use futures::StreamExt;
1533    /// let client = async_nats::connect("localhost:4222").await?;
1534    /// let jetstream = async_nats::jetstream::new(client);
1535    ///
1536    /// let consumer: PullConsumer = jetstream
1537    ///     .get_stream("events")
1538    ///     .await?
1539    ///     .get_consumer("pull")
1540    ///     .await?;
1541    ///
1542    /// let mut messages = consumer
1543    ///     .stream()
1544    ///     .expires(std::time::Duration::from_secs(30))
1545    ///     .group("A")
1546    ///     .min_ack_pending(100)
1547    ///     .messages()
1548    ///     .await?;
1549    ///
1550    /// while let Some(message) = messages.next().await {
1551    ///     let message = message?;
1552    ///     println!("message: {:?}", message);
1553    ///     message.ack().await?;
1554    /// }
1555    /// # Ok(())
1556    /// # }
1557    /// ```
1558    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1559        self.min_ack_pending = Some(min_ack_pending);
1560        self
1561    }
1562
1563    /// Setting group when using [Consumer] with [Config::priority_groups].
1564    ///
1565    /// # Examples
1566    ///
1567    /// ```no_run
1568    /// # #[tokio::main]
1569    /// # async fn main() -> Result<(), async_nats::Error>  {
1570    /// use async_nats::jetstream::consumer::PullConsumer;
1571    /// use futures::StreamExt;
1572    /// let client = async_nats::connect("localhost:4222").await?;
1573    /// let jetstream = async_nats::jetstream::new(client);
1574    ///
1575    /// let consumer: PullConsumer = jetstream
1576    ///     .get_stream("events")
1577    ///     .await?
1578    ///     .get_consumer("pull")
1579    ///     .await?;
1580    ///
1581    /// let mut messages = consumer
1582    ///     .stream()
1583    ///     .expires(std::time::Duration::from_secs(30))
1584    ///     .group("A")
1585    ///     .min_ack_pending(100)
1586    ///     .messages()
1587    ///     .await?;
1588    ///
1589    /// while let Some(message) = messages.next().await {
1590    ///     let message = message?;
1591    ///     println!("message: {:?}", message);
1592    ///     message.ack().await?;
1593    /// }
1594    /// # Ok(())
1595    /// # }
1596    /// ```
1597    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1598        self.group = Some(group.into());
1599        self
1600    }
1601
1602    /// Creates actual [Stream] with provided configuration.
1603    ///
1604    /// # Examples
1605    ///
1606    /// ```no_run
1607    /// # #[tokio::main]
1608    /// # async fn main() -> Result<(), async_nats::Error>  {
1609    /// use async_nats::jetstream::consumer::PullConsumer;
1610    /// use futures::StreamExt;
1611    /// let client = async_nats::connect("localhost:4222").await?;
1612    /// let jetstream = async_nats::jetstream::new(client);
1613    ///
1614    /// let consumer: PullConsumer = jetstream
1615    ///     .get_stream("events")
1616    ///     .await?
1617    ///     .get_consumer("pull")
1618    ///     .await?;
1619    ///
1620    /// let mut messages = consumer
1621    ///     .stream()
1622    ///     .max_messages_per_batch(100)
1623    ///     .messages()
1624    ///     .await?;
1625    ///
1626    /// while let Some(message) = messages.next().await {
1627    ///     let message = message?;
1628    ///     println!("message: {:?}", message);
1629    ///     message.ack().await?;
1630    /// }
1631    /// # Ok(())
1632    /// # }
1633    /// ```
1634    pub async fn messages(self) -> Result<Stream, StreamError> {
1635        Stream::stream(
1636            BatchConfig {
1637                batch: self.batch,
1638                expires: Some(self.expires),
1639                no_wait: false,
1640                max_bytes: self.max_bytes,
1641                idle_heartbeat: self.heartbeat,
1642                min_pending: self.min_pending,
1643                group: self.group,
1644                min_ack_pending: self.min_ack_pending,
1645            },
1646            self.consumer,
1647        )
1648        .await
1649    }
1650}
1651
1652/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1653///
1654/// # Examples
1655///
1656/// ```no_run
1657/// # #[tokio::main]
1658/// # async fn main() -> Result<(), async_nats::Error>  {
1659/// use async_nats::jetstream::consumer::PullConsumer;
1660/// use futures::StreamExt;
1661/// let client = async_nats::connect("localhost:4222").await?;
1662/// let jetstream = async_nats::jetstream::new(client);
1663///
1664/// let consumer: PullConsumer = jetstream
1665///     .get_stream("events")
1666///     .await?
1667///     .get_consumer("pull")
1668///     .await?;
1669///
1670/// let mut messages = consumer
1671///     .fetch()
1672///     .max_messages(100)
1673///     .max_bytes(1024)
1674///     .messages()
1675///     .await?;
1676///
1677/// while let Some(message) = messages.next().await {
1678///     let message = message?;
1679///     println!("message: {:?}", message);
1680///     message.ack().await?;
1681/// }
1682/// # Ok(())
1683/// # }
1684/// ```
1685pub struct FetchBuilder<'a> {
1686    batch: usize,
1687    max_bytes: usize,
1688    heartbeat: Duration,
1689    expires: Option<Duration>,
1690    min_pending: Option<usize>,
1691    min_ack_pending: Option<usize>,
1692    group: Option<String>,
1693    consumer: &'a Consumer<Config>,
1694}
1695
1696impl<'a> FetchBuilder<'a> {
1697    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1698        FetchBuilder {
1699            consumer,
1700            batch: 200,
1701            max_bytes: 0,
1702            expires: None,
1703            min_pending: None,
1704            min_ack_pending: None,
1705            group: None,
1706            heartbeat: Duration::default(),
1707        }
1708    }
1709
1710    /// Sets max bytes that can be buffered on the Client while processing already received
1711    /// messages.
1712    /// Higher values will yield better performance, but also potentially increase memory usage if
1713    /// application is acknowledging messages much slower than they arrive.
1714    ///
1715    /// Default values should provide reasonable balance between performance and memory usage.
1716    ///
1717    /// # Examples
1718    ///
1719    /// ```no_run
1720    /// # #[tokio::main]
1721    /// # async fn main() -> Result<(), async_nats::Error>  {
1722    /// use futures::StreamExt;
1723    /// let client = async_nats::connect("localhost:4222").await?;
1724    /// let jetstream = async_nats::jetstream::new(client);
1725    ///
1726    /// let consumer = jetstream
1727    ///     .get_stream("events")
1728    ///     .await?
1729    ///     .get_consumer("pull")
1730    ///     .await?;
1731    ///
1732    /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1733    ///
1734    /// while let Some(message) = messages.next().await {
1735    ///     let message = message?;
1736    ///     println!("message: {:?}", message);
1737    ///     message.ack().await?;
1738    /// }
1739    /// # Ok(())
1740    /// # }
1741    /// ```
1742    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1743        self.max_bytes = max_bytes;
1744        self
1745    }
1746
1747    /// Sets max number of messages that can be buffered on the Client while processing already received
1748    /// messages.
1749    /// Higher values will yield better performance, but also potentially increase memory usage if
1750    /// application is acknowledging messages much slower than they arrive.
1751    ///
1752    /// Default values should provide reasonable balance between performance and memory usage.
1753    ///
1754    /// # Examples
1755    ///
1756    /// ```no_run
1757    /// # #[tokio::main]
1758    /// # async fn main() -> Result<(), async_nats::Error>  {
1759    /// use futures::StreamExt;
1760    /// let client = async_nats::connect("localhost:4222").await?;
1761    /// let jetstream = async_nats::jetstream::new(client);
1762    ///
1763    /// let consumer = jetstream
1764    ///     .get_stream("events")
1765    ///     .await?
1766    ///     .get_consumer("pull")
1767    ///     .await?;
1768    ///
1769    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1770    ///
1771    /// while let Some(message) = messages.next().await {
1772    ///     let message = message?;
1773    ///     println!("message: {:?}", message);
1774    ///     message.ack().await?;
1775    /// }
1776    /// # Ok(())
1777    /// # }
1778    /// ```
1779    pub fn max_messages(mut self, batch: usize) -> Self {
1780        self.batch = batch;
1781        self
1782    }
1783
1784    /// Sets heartbeat which will be send by the server if there are no messages for a given
1785    /// [Consumer] pending.
1786    ///
1787    /// # Examples
1788    ///
1789    /// ```no_run
1790    /// # #[tokio::main]
1791    /// # async fn main() -> Result<(), async_nats::Error>  {
1792    /// use async_nats::jetstream::consumer::PullConsumer;
1793    /// use futures::StreamExt;
1794    /// let client = async_nats::connect("localhost:4222").await?;
1795    /// let jetstream = async_nats::jetstream::new(client);
1796    ///
1797    /// let consumer = jetstream
1798    ///     .get_stream("events")
1799    ///     .await?
1800    ///     .get_consumer("pull")
1801    ///     .await?;
1802    ///
1803    /// let mut messages = consumer
1804    ///     .fetch()
1805    ///     .heartbeat(std::time::Duration::from_secs(10))
1806    ///     .messages()
1807    ///     .await?;
1808    ///
1809    /// while let Some(message) = messages.next().await {
1810    ///     let message = message?;
1811    ///     println!("message: {:?}", message);
1812    ///     message.ack().await?;
1813    /// }
1814    /// # Ok(())
1815    /// # }
1816    /// ```
1817    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1818        self.heartbeat = heartbeat;
1819        self
1820    }
1821
1822    /// Low level API that does not need tweaking for most use cases.
1823    /// Sets how long each batch request waits for whole batch of messages before timing out.
1824    /// [Consumer] pending.
1825    ///
1826    /// # Examples
1827    ///
1828    /// ```no_run
1829    /// # #[tokio::main]
1830    /// # async fn main() -> Result<(), async_nats::Error>  {
1831    /// use async_nats::jetstream::consumer::PullConsumer;
1832    /// use futures::StreamExt;
1833    ///
1834    /// let client = async_nats::connect("localhost:4222").await?;
1835    /// let jetstream = async_nats::jetstream::new(client);
1836    ///
1837    /// let consumer: PullConsumer = jetstream
1838    ///     .get_stream("events")
1839    ///     .await?
1840    ///     .get_consumer("pull")
1841    ///     .await?;
1842    ///
1843    /// let mut messages = consumer
1844    ///     .fetch()
1845    ///     .expires(std::time::Duration::from_secs(30))
1846    ///     .messages()
1847    ///     .await?;
1848    ///
1849    /// while let Some(message) = messages.next().await {
1850    ///     let message = message?;
1851    ///     println!("message: {:?}", message);
1852    ///     message.ack().await?;
1853    /// }
1854    /// # Ok(())
1855    /// # }
1856    /// ```
1857    pub fn expires(mut self, expires: Duration) -> Self {
1858        self.expires = Some(expires);
1859        self
1860    }
1861
1862    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1863    /// messages.
1864    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1865    /// [PriorityPolicy::Overflow] set.
1866    ///
1867    /// # Examples
1868    ///
1869    /// ```no_run
1870    /// # #[tokio::main]
1871    /// # async fn main() -> Result<(), async_nats::Error>  {
1872    /// use async_nats::jetstream::consumer::PullConsumer;
1873    /// use futures::StreamExt;
1874    ///
1875    /// let client = async_nats::connect("localhost:4222").await?;
1876    /// let jetstream = async_nats::jetstream::new(client);
1877    ///
1878    /// let consumer: PullConsumer = jetstream
1879    ///     .get_stream("events")
1880    ///     .await?
1881    ///     .get_consumer("pull")
1882    ///     .await?;
1883    ///
1884    /// let mut messages = consumer
1885    ///     .fetch()
1886    ///     .expires(std::time::Duration::from_secs(30))
1887    ///     .group("A")
1888    ///     .min_pending(100)
1889    ///     .messages()
1890    ///     .await?;
1891    ///
1892    /// while let Some(message) = messages.next().await {
1893    ///     let message = message?;
1894    ///     println!("message: {:?}", message);
1895    ///     message.ack().await?;
1896    /// }
1897    /// # Ok(())
1898    /// # }
1899    /// ```
1900    pub fn min_pending(mut self, min_pending: usize) -> Self {
1901        self.min_pending = Some(min_pending);
1902        self
1903    }
1904
1905    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1906    /// messages.
1907    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1908    /// [PriorityPolicy::Overflow] set.
1909    ///
1910    /// # Examples
1911    ///
1912    /// ```no_run
1913    /// # #[tokio::main]
1914    /// # async fn main() -> Result<(), async_nats::Error>  {
1915    /// use async_nats::jetstream::consumer::PullConsumer;
1916    /// use futures::StreamExt;
1917    ///
1918    /// let client = async_nats::connect("localhost:4222").await?;
1919    /// let jetstream = async_nats::jetstream::new(client);
1920    ///
1921    /// let consumer: PullConsumer = jetstream
1922    ///     .get_stream("events")
1923    ///     .await?
1924    ///     .get_consumer("pull")
1925    ///     .await?;
1926    ///
1927    /// let mut messages = consumer
1928    ///     .fetch()
1929    ///     .expires(std::time::Duration::from_secs(30))
1930    ///     .group("A")
1931    ///     .min_ack_pending(100)
1932    ///     .messages()
1933    ///     .await?;
1934    ///
1935    /// while let Some(message) = messages.next().await {
1936    ///     let message = message?;
1937    ///     println!("message: {:?}", message);
1938    ///     message.ack().await?;
1939    /// }
1940    /// # Ok(())
1941    /// # }
1942    /// ```
1943    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1944        self.min_ack_pending = Some(min_ack_pending);
1945        self
1946    }
1947
1948    /// Setting group when using [Consumer] with [PriorityPolicy].
1949    ///
1950    /// # Examples
1951    ///
1952    /// ```no_run
1953    /// # #[tokio::main]
1954    /// # async fn main() -> Result<(), async_nats::Error>  {
1955    /// use async_nats::jetstream::consumer::PullConsumer;
1956    /// use futures::StreamExt;
1957    ///
1958    /// let client = async_nats::connect("localhost:4222").await?;
1959    /// let jetstream = async_nats::jetstream::new(client);
1960    ///
1961    /// let consumer: PullConsumer = jetstream
1962    ///     .get_stream("events")
1963    ///     .await?
1964    ///     .get_consumer("pull")
1965    ///     .await?;
1966    ///
1967    /// let mut messages = consumer
1968    ///     .fetch()
1969    ///     .expires(std::time::Duration::from_secs(30))
1970    ///     .group("A")
1971    ///     .min_ack_pending(100)
1972    ///     .messages()
1973    ///     .await?;
1974    ///
1975    /// while let Some(message) = messages.next().await {
1976    ///     let message = message?;
1977    ///     println!("message: {:?}", message);
1978    ///     message.ack().await?;
1979    /// }
1980    /// # Ok(())
1981    /// # }
1982    /// ```
1983    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1984        self.group = Some(group.into());
1985        self
1986    }
1987
1988    /// Creates actual [Stream] with provided configuration.
1989    ///
1990    /// # Examples
1991    ///
1992    /// ```no_run
1993    /// # #[tokio::main]
1994    /// # async fn main() -> Result<(), async_nats::Error>  {
1995    /// use async_nats::jetstream::consumer::PullConsumer;
1996    /// use futures::StreamExt;
1997    /// let client = async_nats::connect("localhost:4222").await?;
1998    /// let jetstream = async_nats::jetstream::new(client);
1999    ///
2000    /// let consumer: PullConsumer = jetstream
2001    ///     .get_stream("events")
2002    ///     .await?
2003    ///     .get_consumer("pull")
2004    ///     .await?;
2005    ///
2006    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
2007    ///
2008    /// while let Some(message) = messages.next().await {
2009    ///     let message = message?;
2010    ///     println!("message: {:?}", message);
2011    ///     message.ack().await?;
2012    /// }
2013    /// # Ok(())
2014    /// # }
2015    /// ```
2016    pub async fn messages(self) -> Result<Batch, BatchError> {
2017        Batch::batch(
2018            BatchConfig {
2019                batch: self.batch,
2020                expires: self.expires,
2021                no_wait: true,
2022                max_bytes: self.max_bytes,
2023                idle_heartbeat: self.heartbeat,
2024                min_pending: self.min_pending,
2025                min_ack_pending: self.min_ack_pending,
2026                group: self.group,
2027            },
2028            self.consumer,
2029        )
2030        .await
2031    }
2032}
2033
2034/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2035///
2036/// # Examples
2037///
2038/// ```no_run
2039/// # #[tokio::main]
2040/// # async fn main() -> Result<(), async_nats::Error>  {
2041/// use async_nats::jetstream::consumer::PullConsumer;
2042/// use futures::StreamExt;
2043/// let client = async_nats::connect("localhost:4222").await?;
2044/// let jetstream = async_nats::jetstream::new(client);
2045///
2046/// let consumer: PullConsumer = jetstream
2047///     .get_stream("events")
2048///     .await?
2049///     .get_consumer("pull")
2050///     .await?;
2051///
2052/// let mut messages = consumer
2053///     .batch()
2054///     .max_messages(100)
2055///     .max_bytes(1024)
2056///     .messages()
2057///     .await?;
2058///
2059/// while let Some(message) = messages.next().await {
2060///     let message = message?;
2061///     println!("message: {:?}", message);
2062///     message.ack().await?;
2063/// }
2064/// # Ok(())
2065/// # }
2066/// ```
2067pub struct BatchBuilder<'a> {
2068    batch: usize,
2069    max_bytes: usize,
2070    heartbeat: Duration,
2071    expires: Duration,
2072    min_pending: Option<usize>,
2073    min_ack_pending: Option<usize>,
2074    group: Option<String>,
2075    consumer: &'a Consumer<Config>,
2076}
2077
2078impl<'a> BatchBuilder<'a> {
2079    pub fn new(consumer: &'a Consumer<Config>) -> Self {
2080        BatchBuilder {
2081            consumer,
2082            batch: 200,
2083            max_bytes: 0,
2084            expires: Duration::ZERO,
2085            heartbeat: Duration::default(),
2086            min_pending: None,
2087            min_ack_pending: None,
2088            group: None,
2089        }
2090    }
2091
2092    /// Sets max bytes that can be buffered on the Client while processing already received
2093    /// messages.
2094    /// Higher values will yield better performance, but also potentially increase memory usage if
2095    /// application is acknowledging messages much slower than they arrive.
2096    ///
2097    /// Default values should provide reasonable balance between performance and memory usage.
2098    ///
2099    /// # Examples
2100    ///
2101    /// ```no_run
2102    /// # #[tokio::main]
2103    /// # async fn main() -> Result<(), async_nats::Error>  {
2104    /// use async_nats::jetstream::consumer::PullConsumer;
2105    /// use futures::StreamExt;
2106    /// let client = async_nats::connect("localhost:4222").await?;
2107    /// let jetstream = async_nats::jetstream::new(client);
2108    ///
2109    /// let consumer: PullConsumer = jetstream
2110    ///     .get_stream("events")
2111    ///     .await?
2112    ///     .get_consumer("pull")
2113    ///     .await?;
2114    ///
2115    /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2116    ///
2117    /// while let Some(message) = messages.next().await {
2118    ///     let message = message?;
2119    ///     println!("message: {:?}", message);
2120    ///     message.ack().await?;
2121    /// }
2122    /// # Ok(())
2123    /// # }
2124    /// ```
2125    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2126        self.max_bytes = max_bytes;
2127        self
2128    }
2129
2130    /// Sets max number of messages that can be buffered on the Client while processing already received
2131    /// messages.
2132    /// Higher values will yield better performance, but also potentially increase memory usage if
2133    /// application is acknowledging messages much slower than they arrive.
2134    ///
2135    /// Default values should provide reasonable balance between performance and memory usage.
2136    ///
2137    /// # Examples
2138    ///
2139    /// ```no_run
2140    /// # #[tokio::main]
2141    /// # async fn main() -> Result<(), async_nats::Error>  {
2142    /// use async_nats::jetstream::consumer::PullConsumer;
2143    /// use futures::StreamExt;
2144    /// let client = async_nats::connect("localhost:4222").await?;
2145    /// let jetstream = async_nats::jetstream::new(client);
2146    ///
2147    /// let consumer: PullConsumer = jetstream
2148    ///     .get_stream("events")
2149    ///     .await?
2150    ///     .get_consumer("pull")
2151    ///     .await?;
2152    ///
2153    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2154    ///
2155    /// while let Some(message) = messages.next().await {
2156    ///     let message = message?;
2157    ///     println!("message: {:?}", message);
2158    ///     message.ack().await?;
2159    /// }
2160    /// # Ok(())
2161    /// # }
2162    /// ```
2163    pub fn max_messages(mut self, batch: usize) -> Self {
2164        self.batch = batch;
2165        self
2166    }
2167
2168    /// Sets heartbeat which will be send by the server if there are no messages for a given
2169    /// [Consumer] pending.
2170    ///
2171    /// # Examples
2172    ///
2173    /// ```no_run
2174    /// # #[tokio::main]
2175    /// # async fn main() -> Result<(), async_nats::Error>  {
2176    /// use async_nats::jetstream::consumer::PullConsumer;
2177    /// use futures::StreamExt;
2178    /// let client = async_nats::connect("localhost:4222").await?;
2179    /// let jetstream = async_nats::jetstream::new(client);
2180    ///
2181    /// let consumer: PullConsumer = jetstream
2182    ///     .get_stream("events")
2183    ///     .await?
2184    ///     .get_consumer("pull")
2185    ///     .await?;
2186    ///
2187    /// let mut messages = consumer
2188    ///     .batch()
2189    ///     .heartbeat(std::time::Duration::from_secs(10))
2190    ///     .messages()
2191    ///     .await?;
2192    ///
2193    /// while let Some(message) = messages.next().await {
2194    ///     let message = message?;
2195    ///     println!("message: {:?}", message);
2196    ///     message.ack().await?;
2197    /// }
2198    /// # Ok(())
2199    /// # }
2200    /// ```
2201    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2202        self.heartbeat = heartbeat;
2203        self
2204    }
2205
2206    /// Sets overflow threshold for minimum pending messages before this stream will start getting
2207    /// messages.
2208    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2209    /// [PriorityPolicy::Overflow] set.
2210    ///
2211    /// # Examples
2212    ///
2213    /// ```no_run
2214    /// # #[tokio::main]
2215    /// # async fn main() -> Result<(), async_nats::Error>  {
2216    /// use async_nats::jetstream::consumer::PullConsumer;
2217    /// use futures::StreamExt;
2218    ///
2219    /// let client = async_nats::connect("localhost:4222").await?;
2220    /// let jetstream = async_nats::jetstream::new(client);
2221    ///
2222    /// let consumer: PullConsumer = jetstream
2223    ///     .get_stream("events")
2224    ///     .await?
2225    ///     .get_consumer("pull")
2226    ///     .await?;
2227    ///
2228    /// let mut messages = consumer
2229    ///     .batch()
2230    ///     .expires(std::time::Duration::from_secs(30))
2231    ///     .group("A")
2232    ///     .min_pending(100)
2233    ///     .messages()
2234    ///     .await?;
2235    ///
2236    /// while let Some(message) = messages.next().await {
2237    ///     let message = message?;
2238    ///     println!("message: {:?}", message);
2239    ///     message.ack().await?;
2240    /// }
2241    /// # Ok(())
2242    /// # }
2243    /// ```
2244    pub fn min_pending(mut self, min_pending: usize) -> Self {
2245        self.min_pending = Some(min_pending);
2246        self
2247    }
2248
2249    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2250    /// messages.
2251    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2252    /// [PriorityPolicy::Overflow] set.
2253    ///
2254    /// # Examples
2255    ///
2256    /// ```no_run
2257    /// # #[tokio::main]
2258    /// # async fn main() -> Result<(), async_nats::Error>  {
2259    /// use async_nats::jetstream::consumer::PullConsumer;
2260    /// use futures::StreamExt;
2261    ///
2262    /// let client = async_nats::connect("localhost:4222").await?;
2263    /// let jetstream = async_nats::jetstream::new(client);
2264    ///
2265    /// let consumer: PullConsumer = jetstream
2266    ///     .get_stream("events")
2267    ///     .await?
2268    ///     .get_consumer("pull")
2269    ///     .await?;
2270    ///
2271    /// let mut messages = consumer
2272    ///     .batch()
2273    ///     .expires(std::time::Duration::from_secs(30))
2274    ///     .group("A")
2275    ///     .min_ack_pending(100)
2276    ///     .messages()
2277    ///     .await?;
2278    ///
2279    /// while let Some(message) = messages.next().await {
2280    ///     let message = message?;
2281    ///     println!("message: {:?}", message);
2282    ///     message.ack().await?;
2283    /// }
2284    /// # Ok(())
2285    /// # }
2286    /// ```
2287    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2288        self.min_ack_pending = Some(min_ack_pending);
2289        self
2290    }
2291
2292    /// Setting group when using [Consumer] with [PriorityPolicy].
2293    ///
2294    /// # Examples
2295    ///
2296    /// ```no_run
2297    /// # #[tokio::main]
2298    /// # async fn main() -> Result<(), async_nats::Error>  {
2299    /// use async_nats::jetstream::consumer::PullConsumer;
2300    /// use futures::StreamExt;
2301    ///
2302    /// let client = async_nats::connect("localhost:4222").await?;
2303    /// let jetstream = async_nats::jetstream::new(client);
2304    ///
2305    /// let consumer: PullConsumer = jetstream
2306    ///     .get_stream("events")
2307    ///     .await?
2308    ///     .get_consumer("pull")
2309    ///     .await?;
2310    ///
2311    /// let mut messages = consumer
2312    ///     .batch()
2313    ///     .expires(std::time::Duration::from_secs(30))
2314    ///     .group("A")
2315    ///     .min_ack_pending(100)
2316    ///     .messages()
2317    ///     .await?;
2318    ///
2319    /// while let Some(message) = messages.next().await {
2320    ///     let message = message?;
2321    ///     println!("message: {:?}", message);
2322    ///     message.ack().await?;
2323    /// }
2324    /// # Ok(())
2325    /// # }
2326    /// ```
2327    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2328        self.group = Some(group.into());
2329        self
2330    }
2331
2332    /// Low level API that does not need tweaking for most use cases.
2333    /// Sets how long each batch request waits for whole batch of messages before timing out.
2334    /// [Consumer] pending.
2335    ///
2336    /// # Examples
2337    ///
2338    /// ```no_run
2339    /// # #[tokio::main]
2340    /// # async fn main() -> Result<(), async_nats::Error>  {
2341    /// use async_nats::jetstream::consumer::PullConsumer;
2342    /// use futures::StreamExt;
2343    /// let client = async_nats::connect("localhost:4222").await?;
2344    /// let jetstream = async_nats::jetstream::new(client);
2345    ///
2346    /// let consumer: PullConsumer = jetstream
2347    ///     .get_stream("events")
2348    ///     .await?
2349    ///     .get_consumer("pull")
2350    ///     .await?;
2351    ///
2352    /// let mut messages = consumer
2353    ///     .batch()
2354    ///     .expires(std::time::Duration::from_secs(30))
2355    ///     .messages()
2356    ///     .await?;
2357    ///
2358    /// while let Some(message) = messages.next().await {
2359    ///     let message = message?;
2360    ///     println!("message: {:?}", message);
2361    ///     message.ack().await?;
2362    /// }
2363    /// # Ok(())
2364    /// # }
2365    /// ```
2366    pub fn expires(mut self, expires: Duration) -> Self {
2367        self.expires = expires;
2368        self
2369    }
2370
2371    /// Creates actual [Stream] with provided configuration.
2372    ///
2373    /// # Examples
2374    ///
2375    /// ```no_run
2376    /// # #[tokio::main]
2377    /// # async fn main() -> Result<(), async_nats::Error>  {
2378    /// use async_nats::jetstream::consumer::PullConsumer;
2379    /// use futures::StreamExt;
2380    /// let client = async_nats::connect("localhost:4222").await?;
2381    /// let jetstream = async_nats::jetstream::new(client);
2382    ///
2383    /// let consumer: PullConsumer = jetstream
2384    ///     .get_stream("events")
2385    ///     .await?
2386    ///     .get_consumer("pull")
2387    ///     .await?;
2388    ///
2389    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2390    ///
2391    /// while let Some(message) = messages.next().await {
2392    ///     let message = message?;
2393    ///     println!("message: {:?}", message);
2394    ///     message.ack().await?;
2395    /// }
2396    /// # Ok(())
2397    /// # }
2398    /// ```
2399    pub async fn messages(self) -> Result<Batch, BatchError> {
2400        let config = BatchConfig {
2401            batch: self.batch,
2402            expires: Some(self.expires),
2403            no_wait: false,
2404            max_bytes: self.max_bytes,
2405            idle_heartbeat: self.heartbeat,
2406            min_pending: self.min_pending,
2407            min_ack_pending: self.min_ack_pending,
2408            group: self.group,
2409        };
2410        Batch::batch(config, self.consumer).await
2411    }
2412}
2413
2414/// Used for next Pull Request for Pull Consumer
2415#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2416pub struct BatchConfig {
2417    /// The number of messages that are being requested to be delivered.
2418    pub batch: usize,
2419    /// The optional number of nanoseconds that the server will store this next request for
2420    /// before forgetting about the pending batch size.
2421    #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2422    pub expires: Option<Duration>,
2423    /// This optionally causes the server not to store this pending request at all, but when there are no
2424    /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2425    /// can know when you reached the end of the stream for example. A 409 is returned if the
2426    /// Consumer has reached MaxAckPending limits.
2427    #[serde(skip_serializing_if = "is_default")]
2428    pub no_wait: bool,
2429
2430    /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2431    /// Whichever value is reached first, batch will complete.
2432    pub max_bytes: usize,
2433
2434    /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2435    /// client
2436    #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2437    pub idle_heartbeat: Duration,
2438
2439    pub min_pending: Option<usize>,
2440    pub min_ack_pending: Option<usize>,
2441    pub group: Option<String>,
2442}
2443
2444fn is_default<T: Default + Eq>(t: &T) -> bool {
2445    t == &T::default()
2446}
2447
2448#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2449pub struct Config {
2450    /// Setting `durable_name` to `Some(...)` will cause this consumer
2451    /// to be "durable". This may be a good choice for workloads that
2452    /// benefit from the `JetStream` server or cluster remembering the
2453    /// progress of consumers for fault tolerance purposes. If a consumer
2454    /// crashes, the `JetStream` server or cluster will remember which
2455    /// messages the consumer acknowledged. When the consumer recovers,
2456    /// this information will allow the consumer to resume processing
2457    /// where it left off. If you're unsure, set this to `Some(...)`.
2458    ///
2459    /// Setting `durable_name` to `None` will cause this consumer to
2460    /// be "ephemeral". This may be a good choice for workloads where
2461    /// you don't need the `JetStream` server to remember the consumer's
2462    /// progress in the case of a crash, such as certain "high churn"
2463    /// workloads or workloads where a crashed instance is not required
2464    /// to recover.
2465    #[serde(default, skip_serializing_if = "Option::is_none")]
2466    pub durable_name: Option<String>,
2467    /// A name of the consumer. Can be specified for both durable and ephemeral
2468    /// consumers.
2469    #[serde(default, skip_serializing_if = "Option::is_none")]
2470    pub name: Option<String>,
2471    /// A short description of the purpose of this consumer.
2472    #[serde(default, skip_serializing_if = "Option::is_none")]
2473    pub description: Option<String>,
2474    /// Allows for a variety of options that determine how this consumer will receive messages
2475    #[serde(flatten)]
2476    pub deliver_policy: DeliverPolicy,
2477    /// How messages should be acknowledged
2478    pub ack_policy: AckPolicy,
2479    /// How long to allow messages to remain un-acknowledged before attempting redelivery
2480    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2481    pub ack_wait: Duration,
2482    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2483    #[serde(default, skip_serializing_if = "is_default")]
2484    pub max_deliver: i64,
2485    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2486    #[serde(default, skip_serializing_if = "is_default")]
2487    pub filter_subject: String,
2488    #[cfg(feature = "server_2_10")]
2489    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2490    #[serde(default, skip_serializing_if = "is_default")]
2491    pub filter_subjects: Vec<String>,
2492    /// Whether messages are sent as quickly as possible or at the rate of receipt
2493    pub replay_policy: ReplayPolicy,
2494    /// The rate of message delivery in bits per second
2495    #[serde(default, skip_serializing_if = "is_default")]
2496    pub rate_limit: u64,
2497    /// What percentage of acknowledgments should be samples for observability, 0-100
2498    #[serde(
2499        rename = "sample_freq",
2500        with = "super::sample_freq_deser",
2501        default,
2502        skip_serializing_if = "is_default"
2503    )]
2504    pub sample_frequency: u8,
2505    /// The maximum number of waiting consumers.
2506    #[serde(default, skip_serializing_if = "is_default")]
2507    pub max_waiting: i64,
2508    /// The maximum number of unacknowledged messages that may be
2509    /// in-flight before pausing sending additional messages to
2510    /// this consumer.
2511    #[serde(default, skip_serializing_if = "is_default")]
2512    pub max_ack_pending: i64,
2513    /// Only deliver headers without payloads.
2514    #[serde(default, skip_serializing_if = "is_default")]
2515    pub headers_only: bool,
2516    /// Maximum size of a request batch
2517    #[serde(default, skip_serializing_if = "is_default")]
2518    pub max_batch: i64,
2519    /// Maximum value of request max_bytes
2520    #[serde(default, skip_serializing_if = "is_default")]
2521    pub max_bytes: i64,
2522    /// Maximum value for request expiration
2523    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2524    pub max_expires: Duration,
2525    /// Threshold for consumer inactivity
2526    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2527    pub inactive_threshold: Duration,
2528    /// Number of consumer replicas
2529    #[serde(default, skip_serializing_if = "is_default")]
2530    pub num_replicas: usize,
2531    /// Force consumer to use memory storage.
2532    #[serde(default, skip_serializing_if = "is_default")]
2533    pub memory_storage: bool,
2534    #[cfg(feature = "server_2_10")]
2535    // Additional consumer metadata.
2536    #[serde(default, skip_serializing_if = "is_default")]
2537    pub metadata: HashMap<String, String>,
2538    /// Custom backoff for missed acknowledgments.
2539    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2540    pub backoff: Vec<Duration>,
2541
2542    /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2543    #[cfg(feature = "server_2_11")]
2544    #[serde(default, skip_serializing_if = "is_default")]
2545    pub priority_policy: PriorityPolicy,
2546    /// Priority groups for this consumer. Currently only one group is supported and is used
2547    /// in conjunction with [Config::priority_policy].
2548    #[cfg(feature = "server_2_11")]
2549    #[serde(default, skip_serializing_if = "is_default")]
2550    pub priority_groups: Vec<String>,
2551    /// For suspending the consumer until the deadline.
2552    #[cfg(feature = "server_2_11")]
2553    #[serde(
2554        default,
2555        with = "rfc3339::option",
2556        skip_serializing_if = "Option::is_none"
2557    )]
2558    pub pause_until: Option<OffsetDateTime>,
2559}
2560
2561impl IntoConsumerConfig for &Config {
2562    fn into_consumer_config(self) -> consumer::Config {
2563        self.clone().into_consumer_config()
2564    }
2565}
2566
2567impl IntoConsumerConfig for Config {
2568    fn into_consumer_config(self) -> consumer::Config {
2569        jetstream::consumer::Config {
2570            deliver_subject: None,
2571            name: self.name,
2572            durable_name: self.durable_name,
2573            description: self.description,
2574            deliver_group: None,
2575            deliver_policy: self.deliver_policy,
2576            ack_policy: self.ack_policy,
2577            ack_wait: self.ack_wait,
2578            max_deliver: self.max_deliver,
2579            filter_subject: self.filter_subject,
2580            #[cfg(feature = "server_2_10")]
2581            filter_subjects: self.filter_subjects,
2582            replay_policy: self.replay_policy,
2583            rate_limit: self.rate_limit,
2584            sample_frequency: self.sample_frequency,
2585            max_waiting: self.max_waiting,
2586            max_ack_pending: self.max_ack_pending,
2587            headers_only: self.headers_only,
2588            flow_control: false,
2589            idle_heartbeat: Duration::default(),
2590            max_batch: self.max_batch,
2591            max_bytes: self.max_bytes,
2592            max_expires: self.max_expires,
2593            inactive_threshold: self.inactive_threshold,
2594            num_replicas: self.num_replicas,
2595            memory_storage: self.memory_storage,
2596            #[cfg(feature = "server_2_10")]
2597            metadata: self.metadata,
2598            backoff: self.backoff,
2599            #[cfg(feature = "server_2_11")]
2600            priority_policy: self.priority_policy,
2601            #[cfg(feature = "server_2_11")]
2602            priority_groups: self.priority_groups,
2603            #[cfg(feature = "server_2_11")]
2604            pause_until: self.pause_until,
2605        }
2606    }
2607}
2608impl FromConsumer for Config {
2609    fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2610        if config.deliver_subject.is_some() {
2611            return Err(Box::new(std::io::Error::other(
2612                "pull consumer cannot have delivery subject",
2613            )));
2614        }
2615        Ok(Config {
2616            durable_name: config.durable_name,
2617            name: config.name,
2618            description: config.description,
2619            deliver_policy: config.deliver_policy,
2620            ack_policy: config.ack_policy,
2621            ack_wait: config.ack_wait,
2622            max_deliver: config.max_deliver,
2623            filter_subject: config.filter_subject,
2624            #[cfg(feature = "server_2_10")]
2625            filter_subjects: config.filter_subjects,
2626            replay_policy: config.replay_policy,
2627            rate_limit: config.rate_limit,
2628            sample_frequency: config.sample_frequency,
2629            max_waiting: config.max_waiting,
2630            max_ack_pending: config.max_ack_pending,
2631            headers_only: config.headers_only,
2632            max_batch: config.max_batch,
2633            max_bytes: config.max_bytes,
2634            max_expires: config.max_expires,
2635            inactive_threshold: config.inactive_threshold,
2636            num_replicas: config.num_replicas,
2637            memory_storage: config.memory_storage,
2638            #[cfg(feature = "server_2_10")]
2639            metadata: config.metadata,
2640            backoff: config.backoff,
2641            #[cfg(feature = "server_2_11")]
2642            priority_policy: config.priority_policy,
2643            #[cfg(feature = "server_2_11")]
2644            priority_groups: config.priority_groups,
2645            #[cfg(feature = "server_2_11")]
2646            pause_until: config.pause_until,
2647        })
2648    }
2649}
2650
2651#[derive(Clone, Copy, Debug, PartialEq)]
2652pub enum BatchRequestErrorKind {
2653    Publish,
2654    Flush,
2655    Serialize,
2656}
2657
2658impl std::fmt::Display for BatchRequestErrorKind {
2659    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2660        match self {
2661            Self::Publish => write!(f, "publish failed"),
2662            Self::Flush => write!(f, "flush failed"),
2663            Self::Serialize => write!(f, "serialize failed"),
2664        }
2665    }
2666}
2667
2668pub type BatchRequestError = Error<BatchRequestErrorKind>;
2669
2670#[derive(Clone, Copy, Debug, PartialEq)]
2671pub enum BatchErrorKind {
2672    Subscribe,
2673    Pull,
2674    Flush,
2675    Serialize,
2676}
2677
2678impl std::fmt::Display for BatchErrorKind {
2679    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2680        match self {
2681            Self::Pull => write!(f, "pull request failed"),
2682            Self::Flush => write!(f, "flush failed"),
2683            Self::Serialize => write!(f, "serialize failed"),
2684            Self::Subscribe => write!(f, "subscribe failed"),
2685        }
2686    }
2687}
2688
2689pub type BatchError = Error<BatchErrorKind>;
2690
2691impl From<SubscribeError> for BatchError {
2692    fn from(err: SubscribeError) -> Self {
2693        BatchError::with_source(BatchErrorKind::Subscribe, err)
2694    }
2695}
2696
2697impl From<BatchRequestError> for BatchError {
2698    fn from(err: BatchRequestError) -> Self {
2699        BatchError::with_source(BatchErrorKind::Pull, err)
2700    }
2701}
2702
2703#[derive(Clone, Copy, Debug, PartialEq)]
2704pub enum ConsumerRecreateErrorKind {
2705    GetStream,
2706    Recreate,
2707    TimedOut,
2708}
2709
2710impl std::fmt::Display for ConsumerRecreateErrorKind {
2711    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2712        match self {
2713            Self::GetStream => write!(f, "error getting stream"),
2714            Self::Recreate => write!(f, "consumer creation failed"),
2715            Self::TimedOut => write!(f, "timed out"),
2716        }
2717    }
2718}
2719
2720pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2721
2722async fn recreate_consumer_stream(
2723    context: &Context,
2724    config: &OrderedConfig,
2725    stream_name: &str,
2726    consumer_name: &str,
2727    sequence: u64,
2728) -> Result<Stream, ConsumerRecreateError> {
2729    let span = tracing::span!(
2730        tracing::Level::DEBUG,
2731        "recreate_ordered_consumer",
2732        stream_name = stream_name,
2733        consumer_name = consumer_name,
2734        sequence = sequence
2735    );
2736    let _span_handle = span.enter();
2737    let config = config.to_owned();
2738    trace!("delete old consumer before creating new one");
2739
2740    tokio::time::timeout(
2741        Duration::from_secs(5),
2742        context.delete_consumer_from_stream(consumer_name, stream_name),
2743    )
2744    .await
2745    .ok();
2746
2747    let deliver_policy = {
2748        if sequence == 0 {
2749            DeliverPolicy::All
2750        } else {
2751            DeliverPolicy::ByStartSequence {
2752                start_sequence: sequence + 1,
2753            }
2754        }
2755    };
2756    trace!("create the new ordered consumer for sequence {}", sequence);
2757    let consumer = tokio::time::timeout(
2758        Duration::from_secs(5),
2759        context.create_consumer_on_stream(
2760            jetstream::consumer::pull::OrderedConfig {
2761                deliver_policy,
2762                ..config.clone()
2763            },
2764            stream_name,
2765        ),
2766    )
2767    .await
2768    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2769    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2770
2771    let config = Consumer {
2772        config: config.clone().into(),
2773        context: context.clone(),
2774        info: consumer.info,
2775    };
2776
2777    trace!("create iterator");
2778    let stream = tokio::time::timeout(
2779        Duration::from_secs(5),
2780        Stream::stream(
2781            BatchConfig {
2782                batch: 500,
2783                expires: Some(Duration::from_secs(30)),
2784                no_wait: false,
2785                max_bytes: 0,
2786                idle_heartbeat: Duration::from_secs(15),
2787                min_pending: None,
2788                min_ack_pending: None,
2789                group: None,
2790            },
2791            &config,
2792        ),
2793    )
2794    .await
2795    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2796    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2797    trace!("recreated consumer");
2798    stream
2799}