async_nats/jetstream/consumer/
pull.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16    future::{BoxFuture, Either},
17    FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32    connection::State,
33    error::Error,
34    jetstream::{self, Context},
35    StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44    backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45    StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50    /// Returns a stream of messages for Pull Consumer.
51    ///
52    /// # Example
53    ///
54    /// ```no_run
55    /// # #[tokio::main]
56    /// # async fn mains() -> Result<(), async_nats::Error> {
57    /// use futures::StreamExt;
58    /// use futures::TryStreamExt;
59    ///
60    /// let client = async_nats::connect("localhost:4222").await?;
61    /// let jetstream = async_nats::jetstream::new(client);
62    ///
63    /// let stream = jetstream
64    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
65    ///         name: "events".to_string(),
66    ///         max_messages: 10_000,
67    ///         ..Default::default()
68    ///     })
69    ///     .await?;
70    ///
71    /// jetstream.publish("events", "data".into()).await?;
72    ///
73    /// let consumer = stream
74    ///     .get_or_create_consumer(
75    ///         "consumer",
76    ///         async_nats::jetstream::consumer::pull::Config {
77    ///             durable_name: Some("consumer".to_string()),
78    ///             ..Default::default()
79    ///         },
80    ///     )
81    ///     .await?;
82    ///
83    /// let mut messages = consumer.messages().await?.take(100);
84    /// while let Some(Ok(message)) = messages.next().await {
85    ///     println!("got message {:?}", message);
86    ///     message.ack().await?;
87    /// }
88    /// Ok(())
89    /// # }
90    /// ```
91    pub async fn messages(&self) -> Result<Stream, StreamError> {
92        Stream::stream(
93            BatchConfig {
94                batch: 200,
95                expires: Some(Duration::from_secs(30)),
96                no_wait: false,
97                max_bytes: 0,
98                idle_heartbeat: Duration::from_secs(15),
99                min_pending: None,
100                min_ack_pending: None,
101                group: None,
102            },
103            self,
104        )
105        .await
106    }
107
108    /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
109    /// messages or bytes buffered.
110    ///
111    /// # Examples
112    ///
113    /// ```no_run
114    /// # #[tokio::main]
115    /// # async fn main() -> Result<(), async_nats::Error>  {
116    /// use async_nats::jetstream::consumer::PullConsumer;
117    /// use futures::StreamExt;
118    /// let client = async_nats::connect("localhost:4222").await?;
119    /// let jetstream = async_nats::jetstream::new(client);
120    ///
121    /// let consumer: PullConsumer = jetstream
122    ///     .get_stream("events")
123    ///     .await?
124    ///     .get_consumer("pull")
125    ///     .await?;
126    ///
127    /// let mut messages = consumer
128    ///     .stream()
129    ///     .max_messages_per_batch(100)
130    ///     .max_bytes_per_batch(1024)
131    ///     .messages()
132    ///     .await?;
133    ///
134    /// while let Some(message) = messages.next().await {
135    ///     let message = message?;
136    ///     println!("message: {:?}", message);
137    ///     message.ack().await?;
138    /// }
139    /// # Ok(())
140    /// # }
141    /// ```
142    pub fn stream(&self) -> StreamBuilder<'_> {
143        StreamBuilder::new(self)
144    }
145
146    pub async fn request_batch<I: Into<BatchConfig>>(
147        &self,
148        batch: I,
149        inbox: Subject,
150    ) -> Result<(), BatchRequestError> {
151        debug!("sending batch");
152        let subject = format!(
153            "{}.CONSUMER.MSG.NEXT.{}.{}",
154            self.context.prefix, self.info.stream_name, self.info.name
155        );
156
157        let payload = serde_json::to_vec(&batch.into())
158            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
159
160        self.context
161            .client
162            .publish_with_reply(subject, inbox, payload.into())
163            .await
164            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
165        debug!("batch request sent");
166        Ok(())
167    }
168
169    /// Returns a batch of specified number of messages, or if there are less messages on the
170    /// [Stream] than requested, returns all available messages.
171    ///
172    /// # Example
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn mains() -> Result<(), async_nats::Error> {
177    /// use futures::StreamExt;
178    /// use futures::TryStreamExt;
179    ///
180    /// let client = async_nats::connect("localhost:4222").await?;
181    /// let jetstream = async_nats::jetstream::new(client);
182    ///
183    /// let stream = jetstream
184    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
185    ///         name: "events".to_string(),
186    ///         max_messages: 10_000,
187    ///         ..Default::default()
188    ///     })
189    ///     .await?;
190    ///
191    /// jetstream.publish("events", "data".into()).await?;
192    ///
193    /// let consumer = stream
194    ///     .get_or_create_consumer(
195    ///         "consumer",
196    ///         async_nats::jetstream::consumer::pull::Config {
197    ///             durable_name: Some("consumer".to_string()),
198    ///             ..Default::default()
199    ///         },
200    ///     )
201    ///     .await?;
202    ///
203    /// for _ in 0..100 {
204    ///     jetstream.publish("events", "data".into()).await?;
205    /// }
206    ///
207    /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
208    /// // will finish after 100 messages, as that is the number of messages available on the
209    /// // stream.
210    /// while let Some(Ok(message)) = messages.next().await {
211    ///     println!("got message {:?}", message);
212    ///     message.ack().await?;
213    /// }
214    /// Ok(())
215    /// # }
216    /// ```
217    pub fn fetch(&self) -> FetchBuilder {
218        FetchBuilder::new(self)
219    }
220
221    /// Returns a batch of specified number of messages unless timeout happens first.
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// # #[tokio::main]
227    /// # async fn mains() -> Result<(), async_nats::Error> {
228    /// use futures::StreamExt;
229    /// use futures::TryStreamExt;
230    ///
231    /// let client = async_nats::connect("localhost:4222").await?;
232    /// let jetstream = async_nats::jetstream::new(client);
233    ///
234    /// let stream = jetstream
235    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
236    ///         name: "events".to_string(),
237    ///         max_messages: 10_000,
238    ///         ..Default::default()
239    ///     })
240    ///     .await?;
241    ///
242    /// jetstream.publish("events", "data".into()).await?;
243    ///
244    /// let consumer = stream
245    ///     .get_or_create_consumer(
246    ///         "consumer",
247    ///         async_nats::jetstream::consumer::pull::Config {
248    ///             durable_name: Some("consumer".to_string()),
249    ///             ..Default::default()
250    ///         },
251    ///     )
252    ///     .await?;
253    ///
254    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
255    /// while let Some(Ok(message)) = messages.next().await {
256    ///     println!("got message {:?}", message);
257    ///     message.ack().await?;
258    /// }
259    /// Ok(())
260    /// # }
261    /// ```
262    pub fn batch(&self) -> BatchBuilder {
263        BatchBuilder::new(self)
264    }
265
266    /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
267    /// messages in those batches.
268    ///
269    /// # Example
270    ///
271    /// ```no_run
272    /// # #[tokio::main]
273    /// # async fn mains() -> Result<(), async_nats::Error> {
274    /// use futures::StreamExt;
275    /// use futures::TryStreamExt;
276    ///
277    /// let client = async_nats::connect("localhost:4222").await?;
278    /// let jetstream = async_nats::jetstream::new(client);
279    ///
280    /// let stream = jetstream
281    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
282    ///         name: "events".to_string(),
283    ///         max_messages: 10_000,
284    ///         ..Default::default()
285    ///     })
286    ///     .await?;
287    ///
288    /// jetstream.publish("events", "data".into()).await?;
289    ///
290    /// let consumer = stream
291    ///     .get_or_create_consumer(
292    ///         "consumer",
293    ///         async_nats::jetstream::consumer::pull::Config {
294    ///             durable_name: Some("consumer".to_string()),
295    ///             ..Default::default()
296    ///         },
297    ///     )
298    ///     .await?;
299    ///
300    /// let mut iter = consumer.sequence(50).unwrap().take(10);
301    /// while let Ok(Some(mut batch)) = iter.try_next().await {
302    ///     while let Ok(Some(message)) = batch.try_next().await {
303    ///         println!("message received: {:?}", message);
304    ///     }
305    /// }
306    /// Ok(())
307    /// # }
308    /// ```
309    pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
310        let context = self.context.clone();
311        let subject = format!(
312            "{}.CONSUMER.MSG.NEXT.{}.{}",
313            self.context.prefix, self.info.stream_name, self.info.name
314        );
315
316        let request = serde_json::to_vec(&BatchConfig {
317            batch,
318            expires: Some(Duration::from_secs(60)),
319            ..Default::default()
320        })
321        .map(Bytes::from)
322        .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
323
324        Ok(Sequence {
325            context,
326            subject,
327            request,
328            pending_messages: batch,
329            next: None,
330        })
331    }
332}
333
334pub struct Batch {
335    pending_messages: usize,
336    subscriber: Subscriber,
337    context: Context,
338    timeout: Option<Pin<Box<Sleep>>>,
339    terminated: bool,
340}
341
342impl Batch {
343    async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
344        let inbox = Subject::from(consumer.context.client.new_inbox());
345        let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
346        consumer.request_batch(batch.clone(), inbox.clone()).await?;
347
348        let sleep = batch.expires.map(|expires| {
349            Box::pin(tokio::time::sleep(
350                expires.saturating_add(Duration::from_secs(5)),
351            ))
352        });
353
354        Ok(Batch {
355            pending_messages: batch.batch,
356            subscriber: subscription,
357            context: consumer.context.clone(),
358            terminated: false,
359            timeout: sleep,
360        })
361    }
362}
363
364impl futures::Stream for Batch {
365    type Item = Result<jetstream::Message, crate::Error>;
366
367    fn poll_next(
368        mut self: std::pin::Pin<&mut Self>,
369        cx: &mut std::task::Context<'_>,
370    ) -> std::task::Poll<Option<Self::Item>> {
371        if self.terminated {
372            return Poll::Ready(None);
373        }
374        if self.pending_messages == 0 {
375            self.terminated = true;
376            return Poll::Ready(None);
377        }
378        if let Some(sleep) = self.timeout.as_mut() {
379            match sleep.poll_unpin(cx) {
380                Poll::Ready(_) => {
381                    debug!("batch timeout timer triggered");
382                    // TODO(tp): Maybe we can be smarter here and before timing out, check if
383                    // we consumed all the messages from the subscription buffer in case of user
384                    // slowly consuming messages. Keep in mind that we time out here only if
385                    // for some reason we missed timeout from the server and few seconds have
386                    // passed since expected timeout message.
387                    self.terminated = true;
388                    return Poll::Ready(None);
389                }
390                Poll::Pending => (),
391            }
392        }
393        match self.subscriber.receiver.poll_recv(cx) {
394            Poll::Ready(maybe_message) => match maybe_message {
395                Some(message) => match message.status.unwrap_or(StatusCode::OK) {
396                    StatusCode::TIMEOUT => {
397                        debug!("received timeout. Iterator done");
398                        self.terminated = true;
399                        Poll::Ready(None)
400                    }
401                    StatusCode::IDLE_HEARTBEAT => {
402                        debug!("received heartbeat");
403                        Poll::Pending
404                    }
405                    // If this is fetch variant, terminate on no more messages.
406                    // We do not need to check if this is a fetch, not batch,
407                    // as only fetch will send back `NO_MESSAGES` status.
408                    StatusCode::NOT_FOUND => {
409                        debug!("received `NO_MESSAGES`. Iterator done");
410                        self.terminated = true;
411                        Poll::Ready(None)
412                    }
413                    StatusCode::OK => {
414                        debug!("received message");
415                        self.pending_messages -= 1;
416                        Poll::Ready(Some(Ok(jetstream::Message {
417                            context: self.context.clone(),
418                            message,
419                        })))
420                    }
421                    status => {
422                        debug!("received error");
423                        self.terminated = true;
424                        Poll::Ready(Some(Err(Box::new(std::io::Error::new(
425                            std::io::ErrorKind::Other,
426                            format!(
427                                "error while processing messages from the stream: {}, {:?}",
428                                status, message.description
429                            ),
430                        )))))
431                    }
432                },
433                None => Poll::Ready(None),
434            },
435            std::task::Poll::Pending => std::task::Poll::Pending,
436        }
437    }
438}
439
440pub struct Sequence {
441    context: Context,
442    subject: String,
443    request: Bytes,
444    pending_messages: usize,
445    next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
446}
447
448impl futures::Stream for Sequence {
449    type Item = Result<Batch, MessagesError>;
450
451    fn poll_next(
452        mut self: std::pin::Pin<&mut Self>,
453        cx: &mut std::task::Context<'_>,
454    ) -> std::task::Poll<Option<Self::Item>> {
455        match self.next.as_mut() {
456            None => {
457                let context = self.context.clone();
458                let subject = self.subject.clone();
459                let request = self.request.clone();
460                let pending_messages = self.pending_messages;
461
462                let next = self.next.insert(Box::pin(async move {
463                    let inbox = context.client.new_inbox();
464                    let subscriber = context
465                        .client
466                        .subscribe(inbox.clone())
467                        .await
468                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
469
470                    context
471                        .client
472                        .publish_with_reply(subject, inbox, request)
473                        .await
474                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
475
476                    // TODO(tp): Add timeout config and defaults.
477                    Ok(Batch {
478                        pending_messages,
479                        subscriber,
480                        context,
481                        terminated: false,
482                        timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
483                    })
484                }));
485
486                match next.as_mut().poll(cx) {
487                    Poll::Ready(result) => {
488                        self.next = None;
489                        Poll::Ready(Some(result.map_err(|err| {
490                            MessagesError::with_source(MessagesErrorKind::Pull, err)
491                        })))
492                    }
493                    Poll::Pending => Poll::Pending,
494                }
495            }
496
497            Some(next) => match next.as_mut().poll(cx) {
498                Poll::Ready(result) => {
499                    self.next = None;
500                    Poll::Ready(Some(result.map_err(|err| {
501                        MessagesError::with_source(MessagesErrorKind::Pull, err)
502                    })))
503                }
504                Poll::Pending => Poll::Pending,
505            },
506        }
507    }
508}
509
510impl Consumer<OrderedConfig> {
511    /// Returns a stream of messages for Ordered Pull Consumer.
512    ///
513    /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
514    /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
515    /// sees mismatch.
516    ///
517    /// # Example
518    ///
519    /// ```no_run
520    /// # #[tokio::main]
521    /// # async fn mains() -> Result<(), async_nats::Error> {
522    /// use futures::StreamExt;
523    /// use futures::TryStreamExt;
524    ///
525    /// let client = async_nats::connect("localhost:4222").await?;
526    /// let jetstream = async_nats::jetstream::new(client);
527    ///
528    /// let stream = jetstream
529    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
530    ///         name: "events".to_string(),
531    ///         max_messages: 10_000,
532    ///         ..Default::default()
533    ///     })
534    ///     .await?;
535    ///
536    /// jetstream.publish("events", "data".into()).await?;
537    ///
538    /// let consumer = stream
539    ///     .get_or_create_consumer(
540    ///         "consumer",
541    ///         async_nats::jetstream::consumer::pull::OrderedConfig {
542    ///             name: Some("consumer".to_string()),
543    ///             ..Default::default()
544    ///         },
545    ///     )
546    ///     .await?;
547    ///
548    /// let mut messages = consumer.messages().await?.take(100);
549    /// while let Some(Ok(message)) = messages.next().await {
550    ///     println!("got message {:?}", message);
551    /// }
552    /// Ok(())
553    /// # }
554    /// ```
555    pub async fn messages(self) -> Result<Ordered, StreamError> {
556        let config = Consumer {
557            config: self.config.clone().into(),
558            context: self.context.clone(),
559            info: self.info.clone(),
560        };
561        let stream = Stream::stream(
562            BatchConfig {
563                batch: 500,
564                expires: Some(Duration::from_secs(30)),
565                no_wait: false,
566                max_bytes: 0,
567                idle_heartbeat: Duration::from_secs(15),
568                min_pending: None,
569                min_ack_pending: None,
570                group: None,
571            },
572            &config,
573        )
574        .await?;
575
576        Ok(Ordered {
577            consumer_sequence: 0,
578            stream_sequence: 0,
579            missed_heartbeats: false,
580            create_stream: None,
581            context: self.context.clone(),
582            consumer_name: self
583                .config
584                .name
585                .clone()
586                .unwrap_or_else(|| self.context.client.new_inbox()),
587            consumer: self.config,
588            stream: Some(stream),
589            stream_name: self.info.stream_name.clone(),
590        })
591    }
592}
593
594/// Configuration for consumers. From a high level, the
595/// `durable_name` and `deliver_subject` fields have a particularly
596/// strong influence on the consumer's overall behavior.
597#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
598pub struct OrderedConfig {
599    /// A name of the consumer. Can be specified for both durable and ephemeral
600    /// consumers.
601    #[serde(default, skip_serializing_if = "Option::is_none")]
602    pub name: Option<String>,
603    /// A short description of the purpose of this consumer.
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub description: Option<String>,
606    #[serde(default, skip_serializing_if = "is_default")]
607    pub filter_subject: String,
608    #[cfg(feature = "server_2_10")]
609    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
610    #[serde(default, skip_serializing_if = "is_default")]
611    pub filter_subjects: Vec<String>,
612    /// Whether messages are sent as quickly as possible or at the rate of receipt
613    pub replay_policy: ReplayPolicy,
614    /// The rate of message delivery in bits per second
615    #[serde(default, skip_serializing_if = "is_default")]
616    pub rate_limit: u64,
617    /// What percentage of acknowledgments should be samples for observability, 0-100
618    #[serde(
619        rename = "sample_freq",
620        with = "super::sample_freq_deser",
621        default,
622        skip_serializing_if = "is_default"
623    )]
624    pub sample_frequency: u8,
625    /// Only deliver headers without payloads.
626    #[serde(default, skip_serializing_if = "is_default")]
627    pub headers_only: bool,
628    /// Allows for a variety of options that determine how this consumer will receive messages
629    #[serde(flatten)]
630    pub deliver_policy: DeliverPolicy,
631    /// The maximum number of waiting consumers.
632    #[serde(default, skip_serializing_if = "is_default")]
633    pub max_waiting: i64,
634    #[cfg(feature = "server_2_10")]
635    // Additional consumer metadata.
636    #[serde(default, skip_serializing_if = "is_default")]
637    pub metadata: HashMap<String, String>,
638    // Maximum number of messages that can be requested in single Pull Request.
639    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
640    // [stream]
641    pub max_batch: i64,
642    // Maximum number of bytes that can be requested in single Pull Request.
643    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
644    // [stream]
645    pub max_bytes: i64,
646    // Maximum expiry that can be set for a single Pull Request.
647    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
648    // [stream]
649    pub max_expires: Duration,
650}
651
652impl From<OrderedConfig> for Config {
653    fn from(config: OrderedConfig) -> Self {
654        Config {
655            durable_name: None,
656            name: config.name,
657            description: config.description,
658            deliver_policy: config.deliver_policy,
659            ack_policy: AckPolicy::None,
660            ack_wait: Duration::default(),
661            max_deliver: 1,
662            filter_subject: config.filter_subject,
663            #[cfg(feature = "server_2_10")]
664            filter_subjects: config.filter_subjects,
665            replay_policy: config.replay_policy,
666            rate_limit: config.rate_limit,
667            sample_frequency: config.sample_frequency,
668            max_waiting: config.max_waiting,
669            max_ack_pending: 0,
670            headers_only: config.headers_only,
671            max_batch: config.max_batch,
672            max_bytes: config.max_bytes,
673            max_expires: config.max_expires,
674            inactive_threshold: Duration::from_secs(30),
675            num_replicas: 1,
676            memory_storage: true,
677            #[cfg(feature = "server_2_10")]
678            metadata: config.metadata,
679            backoff: Vec::new(),
680            #[cfg(feature = "server_2_11")]
681            priority_policy: PriorityPolicy::None,
682            #[cfg(feature = "server_2_11")]
683            priority_groups: Vec::new(),
684            #[cfg(feature = "server_2_11")]
685            pause_until: None,
686        }
687    }
688}
689
690impl FromConsumer for OrderedConfig {
691    fn try_from_consumer_config(
692        config: crate::jetstream::consumer::Config,
693    ) -> Result<Self, crate::Error>
694    where
695        Self: Sized,
696    {
697        Ok(OrderedConfig {
698            name: config.name,
699            description: config.description,
700            filter_subject: config.filter_subject,
701            #[cfg(feature = "server_2_10")]
702            filter_subjects: config.filter_subjects,
703            replay_policy: config.replay_policy,
704            rate_limit: config.rate_limit,
705            sample_frequency: config.sample_frequency,
706            headers_only: config.headers_only,
707            deliver_policy: config.deliver_policy,
708            max_waiting: config.max_waiting,
709            #[cfg(feature = "server_2_10")]
710            metadata: config.metadata,
711            max_batch: config.max_batch,
712            max_bytes: config.max_bytes,
713            max_expires: config.max_expires,
714        })
715    }
716}
717
718impl IntoConsumerConfig for OrderedConfig {
719    fn into_consumer_config(self) -> super::Config {
720        jetstream::consumer::Config {
721            deliver_subject: None,
722            durable_name: None,
723            name: self.name,
724            description: self.description,
725            deliver_group: None,
726            deliver_policy: self.deliver_policy,
727            ack_policy: AckPolicy::None,
728            ack_wait: Duration::default(),
729            max_deliver: 1,
730            filter_subject: self.filter_subject,
731            #[cfg(feature = "server_2_10")]
732            filter_subjects: self.filter_subjects,
733            replay_policy: self.replay_policy,
734            rate_limit: self.rate_limit,
735            sample_frequency: self.sample_frequency,
736            max_waiting: self.max_waiting,
737            max_ack_pending: 0,
738            headers_only: self.headers_only,
739            flow_control: false,
740            idle_heartbeat: Duration::default(),
741            max_batch: 0,
742            max_bytes: 0,
743            max_expires: Duration::default(),
744            inactive_threshold: Duration::from_secs(30),
745            num_replicas: 1,
746            memory_storage: true,
747            #[cfg(feature = "server_2_10")]
748            metadata: self.metadata,
749            backoff: Vec::new(),
750            #[cfg(feature = "server_2_11")]
751            priority_policy: PriorityPolicy::None,
752            #[cfg(feature = "server_2_11")]
753            priority_groups: Vec::new(),
754            #[cfg(feature = "server_2_11")]
755            pause_until: None,
756        }
757    }
758}
759
760pub struct Ordered {
761    context: Context,
762    stream_name: String,
763    consumer: OrderedConfig,
764    consumer_name: String,
765    stream: Option<Stream>,
766    create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
767    consumer_sequence: u64,
768    stream_sequence: u64,
769    missed_heartbeats: bool,
770}
771
772impl futures::Stream for Ordered {
773    type Item = Result<jetstream::Message, OrderedError>;
774
775    fn poll_next(
776        mut self: Pin<&mut Self>,
777        cx: &mut std::task::Context<'_>,
778    ) -> Poll<Option<Self::Item>> {
779        let mut recreate = false;
780        // Poll messages
781        if let Some(stream) = self.stream.as_mut() {
782            match stream.poll_next_unpin(cx) {
783                Poll::Ready(message) => match message {
784                    Some(message) => match message {
785                        Ok(message) => {
786                            self.missed_heartbeats = false;
787                            let info = message.info().map_err(|err| {
788                                OrderedError::with_source(OrderedErrorKind::Other, err)
789                            })?;
790                            trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
791                                           self.consumer_sequence,
792                                           self.stream_sequence,
793                                           info.consumer_sequence,
794                                           info.stream_sequence);
795                            if info.consumer_sequence != self.consumer_sequence + 1 {
796                                debug!(
797                                    "ordered consumer mismatch. current {}, info: {}",
798                                    self.consumer_sequence, info.consumer_sequence
799                                );
800                                recreate = true;
801                                self.consumer_sequence = 0;
802                            } else {
803                                self.stream_sequence = info.stream_sequence;
804                                self.consumer_sequence = info.consumer_sequence;
805                                return Poll::Ready(Some(Ok(message)));
806                            }
807                        }
808                        Err(err) => match err.kind() {
809                            MessagesErrorKind::MissingHeartbeat => {
810                                // If we have missed heartbeats set, it means this is a second
811                                // missed heartbeat, so we need to recreate consumer.
812                                if self.missed_heartbeats {
813                                    self.consumer_sequence = 0;
814                                    recreate = true;
815                                } else {
816                                    self.missed_heartbeats = true;
817                                }
818                            }
819                            MessagesErrorKind::ConsumerDeleted
820                            | MessagesErrorKind::NoResponders => {
821                                recreate = true;
822                                self.consumer_sequence = 0;
823                            }
824                            MessagesErrorKind::Pull
825                            | MessagesErrorKind::PushBasedConsumer
826                            | MessagesErrorKind::Other => {
827                                return Poll::Ready(Some(Err(err.into())));
828                            }
829                        },
830                    },
831                    None => return Poll::Ready(None),
832                },
833                Poll::Pending => (),
834            }
835        }
836        // Recreate consumer if needed
837        if recreate {
838            self.stream = None;
839            self.create_stream = Some(Box::pin({
840                let context = self.context.clone();
841                let config = self.consumer.clone();
842                let stream_name = self.stream_name.clone();
843                let consumer_name = self.consumer_name.clone();
844                let sequence = self.stream_sequence;
845                async move {
846                    tryhard::retry_fn(|| {
847                        recreate_consumer_stream(
848                            &context,
849                            &config,
850                            &stream_name,
851                            &consumer_name,
852                            sequence,
853                        )
854                    })
855                    .retries(u32::MAX)
856                    .custom_backoff(backoff)
857                    .await
858                }
859            }))
860        }
861        // check for recreation future
862        if let Some(result) = self.create_stream.as_mut() {
863            match result.poll_unpin(cx) {
864                Poll::Ready(result) => match result {
865                    Ok(stream) => {
866                        self.create_stream = None;
867                        self.stream = Some(stream);
868                        return self.poll_next(cx);
869                    }
870                    Err(err) => {
871                        return Poll::Ready(Some(Err(OrderedError::with_source(
872                            OrderedErrorKind::Recreate,
873                            err,
874                        ))))
875                    }
876                },
877                Poll::Pending => (),
878            }
879        }
880        Poll::Pending
881    }
882}
883
884pub struct Stream {
885    pending_messages: usize,
886    pending_bytes: usize,
887    request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
888    request_tx: tokio::sync::watch::Sender<()>,
889    subscriber: Subscriber,
890    batch_config: BatchConfig,
891    context: Context,
892    pending_request: bool,
893    task_handle: JoinHandle<()>,
894    terminated: bool,
895    heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
896}
897
898impl Drop for Stream {
899    fn drop(&mut self) {
900        self.task_handle.abort();
901    }
902}
903
904impl Stream {
905    async fn stream(
906        batch_config: BatchConfig,
907        consumer: &Consumer<Config>,
908    ) -> Result<Stream, StreamError> {
909        let inbox = consumer.context.client.new_inbox();
910        let subscription = consumer
911            .context
912            .client
913            .subscribe(inbox.clone())
914            .await
915            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
916        let subject = format!(
917            "{}.CONSUMER.MSG.NEXT.{}.{}",
918            consumer.context.prefix, consumer.info.stream_name, consumer.info.name
919        );
920
921        let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
922        let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
923        let task_handle = tokio::task::spawn({
924            let batch = batch_config.clone();
925            let consumer = consumer.clone();
926            let mut context = consumer.context.clone();
927            let inbox = inbox.clone();
928            async move {
929                loop {
930                    // this is just in edge case of missing response for some reason.
931                    let expires = batch_config
932                        .expires
933                        .map(|expires| {
934                            if expires.is_zero() {
935                                Either::Left(future::pending())
936                            } else {
937                                Either::Right(tokio::time::sleep(
938                                    expires.saturating_add(Duration::from_secs(5)),
939                                ))
940                            }
941                        })
942                        .unwrap_or_else(|| Either::Left(future::pending()));
943                    // Need to check previous state, as `changed` will always fire on first
944                    // call.
945                    let prev_state = context.client.state.borrow().to_owned();
946                    let mut pending_reset = false;
947
948                    tokio::select! {
949                       _ = context.client.state.changed() => {
950                            let state = context.client.state.borrow().to_owned();
951                            if !(state == crate::connection::State::Connected
952                                && prev_state != State::Connected) {
953                                    continue;
954                                }
955                            debug!("detected !Connected -> Connected state change");
956
957                            match tryhard::retry_fn(|| consumer.fetch_info())
958                                .retries(5).custom_backoff(backoff).await
959                                .map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
960                                    Ok(info) => {
961                                        if info.num_waiting == 0 {
962                                            pending_reset = true;
963                                        }
964                                    }
965                                    Err(err) => {
966                                         if let Err(err) = request_result_tx.send(Err(err)).await {
967                                            debug!("failed to sent request result: {}", err);
968                                        }
969                                    },
970                            }
971                        },
972                        _ = request_rx.changed() => debug!("task received request request"),
973                        _ = expires => {
974                            pending_reset = true;
975                            debug!("expired pull request")},
976                    }
977
978                    let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
979                    let result = context
980                        .client
981                        .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
982                        .await
983                        .map(|_| pending_reset);
984                    // TODO: add tracing instead of ignoring this.
985                    request_result_tx
986                        .send(result.map(|_| pending_reset).map_err(|err| {
987                            crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
988                                .into()
989                        }))
990                        .await
991                        .ok();
992                    trace!("result send over tx");
993                }
994            }
995        });
996
997        Ok(Stream {
998            task_handle,
999            request_result_rx,
1000            request_tx,
1001            batch_config,
1002            pending_messages: 0,
1003            pending_bytes: 0,
1004            subscriber: subscription,
1005            context: consumer.context.clone(),
1006            pending_request: false,
1007            terminated: false,
1008            heartbeat_timeout: None,
1009        })
1010    }
1011}
1012
1013#[derive(Clone, Copy, Debug, PartialEq)]
1014pub enum OrderedErrorKind {
1015    MissingHeartbeat,
1016    ConsumerDeleted,
1017    Pull,
1018    PushBasedConsumer,
1019    Recreate,
1020    NoResponders,
1021    Other,
1022}
1023
1024impl std::fmt::Display for OrderedErrorKind {
1025    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1026        match self {
1027            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1028            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1029            Self::Pull => write!(f, "pull request failed"),
1030            Self::Other => write!(f, "error"),
1031            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1032            Self::Recreate => write!(f, "consumer recreation failed"),
1033            Self::NoResponders => write!(f, "no responders"),
1034        }
1035    }
1036}
1037
1038pub type OrderedError = Error<OrderedErrorKind>;
1039
1040impl From<MessagesError> for OrderedError {
1041    fn from(err: MessagesError) -> Self {
1042        match err.kind() {
1043            MessagesErrorKind::MissingHeartbeat => {
1044                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1045            }
1046            MessagesErrorKind::ConsumerDeleted => {
1047                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1048            }
1049            MessagesErrorKind::Pull => OrderedError {
1050                kind: OrderedErrorKind::Pull,
1051                source: err.source,
1052            },
1053            MessagesErrorKind::PushBasedConsumer => {
1054                OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1055            }
1056            MessagesErrorKind::Other => OrderedError {
1057                kind: OrderedErrorKind::Other,
1058                source: err.source,
1059            },
1060            MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
1061        }
1062    }
1063}
1064
1065#[derive(Clone, Copy, Debug, PartialEq)]
1066pub enum MessagesErrorKind {
1067    MissingHeartbeat,
1068    ConsumerDeleted,
1069    Pull,
1070    PushBasedConsumer,
1071    NoResponders,
1072    Other,
1073}
1074
1075impl std::fmt::Display for MessagesErrorKind {
1076    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1077        match self {
1078            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1079            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1080            Self::Pull => write!(f, "pull request failed"),
1081            Self::Other => write!(f, "error"),
1082            Self::NoResponders => write!(f, "no responders"),
1083            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1084        }
1085    }
1086}
1087
1088pub type MessagesError = Error<MessagesErrorKind>;
1089
1090impl futures::Stream for Stream {
1091    type Item = Result<jetstream::Message, MessagesError>;
1092
1093    fn poll_next(
1094        mut self: std::pin::Pin<&mut Self>,
1095        cx: &mut std::task::Context<'_>,
1096    ) -> std::task::Poll<Option<Self::Item>> {
1097        if self.terminated {
1098            return Poll::Ready(None);
1099        }
1100
1101        if !self.batch_config.idle_heartbeat.is_zero() {
1102            trace!("checking idle hearbeats");
1103            let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1104            match self
1105                .heartbeat_timeout
1106                .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1107                .poll_unpin(cx)
1108            {
1109                Poll::Ready(_) => {
1110                    self.heartbeat_timeout = None;
1111                    return Poll::Ready(Some(Err(MessagesError::new(
1112                        MessagesErrorKind::MissingHeartbeat,
1113                    ))));
1114                }
1115                Poll::Pending => (),
1116            }
1117        }
1118
1119        loop {
1120            trace!("pending messages: {}", self.pending_messages);
1121            if (self.pending_messages <= self.batch_config.batch / 2
1122                || (self.batch_config.max_bytes > 0
1123                    && self.pending_bytes <= self.batch_config.max_bytes / 2))
1124                && !self.pending_request
1125            {
1126                debug!("pending messages reached threshold to send new fetch request");
1127                self.request_tx.send(()).ok();
1128                self.pending_request = true;
1129            }
1130
1131            match self.request_result_rx.poll_recv(cx) {
1132                Poll::Ready(resp) => match resp {
1133                    Some(resp) => match resp {
1134                        Ok(reset) => {
1135                            trace!("request response: {:?}", reset);
1136                            debug!("request sent, setting pending messages");
1137                            if reset {
1138                                self.pending_messages = self.batch_config.batch;
1139                                self.pending_bytes = self.batch_config.max_bytes;
1140                            } else {
1141                                self.pending_messages += self.batch_config.batch;
1142                                self.pending_bytes += self.batch_config.max_bytes;
1143                            }
1144                            self.pending_request = false;
1145                            continue;
1146                        }
1147                        Err(err) => {
1148                            return Poll::Ready(Some(Err(MessagesError::with_source(
1149                                MessagesErrorKind::Pull,
1150                                err,
1151                            ))))
1152                        }
1153                    },
1154                    None => return Poll::Ready(None),
1155                },
1156                Poll::Pending => {
1157                    trace!("pending result");
1158                }
1159            }
1160
1161            trace!("polling subscriber");
1162            match self.subscriber.receiver.poll_recv(cx) {
1163                Poll::Ready(maybe_message) => {
1164                    self.heartbeat_timeout = None;
1165                    match maybe_message {
1166                        Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1167                            StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1168                                debug!("received status message: {:?}", message);
1169                                // If consumer has been deleted, error and shutdown the iterator.
1170                                if message.description.as_deref() == Some("Consumer Deleted") {
1171                                    self.terminated = true;
1172                                    return Poll::Ready(Some(Err(MessagesError::new(
1173                                        MessagesErrorKind::ConsumerDeleted,
1174                                    ))));
1175                                }
1176                                // If consumer is not pull based, error and shutdown the iterator.
1177                                if message.description.as_deref() == Some("Consumer is push based")
1178                                {
1179                                    self.terminated = true;
1180                                    return Poll::Ready(Some(Err(MessagesError::new(
1181                                        MessagesErrorKind::PushBasedConsumer,
1182                                    ))));
1183                                }
1184
1185                                // Do accounting for messages left after terminated/completed pull request.
1186                                let pending_messages = message
1187                                    .headers
1188                                    .as_ref()
1189                                    .and_then(|headers| headers.get("Nats-Pending-Messages"))
1190                                    .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1191                                    .map_err(|err| {
1192                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1193                                    })?;
1194
1195                                let pending_bytes = message
1196                                    .headers
1197                                    .as_ref()
1198                                    .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1199                                    .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1200                                    .map_err(|err| {
1201                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1202                                    })?;
1203
1204                                debug!(
1205                                    "timeout reached. remaining messages: {}, bytes {}",
1206                                    pending_messages, pending_bytes
1207                                );
1208                                self.pending_messages =
1209                                    self.pending_messages.saturating_sub(pending_messages);
1210                                trace!("message bytes len: {}", pending_bytes);
1211                                self.pending_bytes =
1212                                    self.pending_bytes.saturating_sub(pending_bytes);
1213                                continue;
1214                            }
1215                            // Idle Hearbeat means we have no messages, but consumer is fine.
1216                            StatusCode::IDLE_HEARTBEAT => {
1217                                debug!("received idle heartbeat");
1218                                continue;
1219                            }
1220                            // We got an message from a stream.
1221                            StatusCode::OK => {
1222                                trace!("message received");
1223                                self.pending_messages = self.pending_messages.saturating_sub(1);
1224                                self.pending_bytes =
1225                                    self.pending_bytes.saturating_sub(message.length);
1226                                return Poll::Ready(Some(Ok(jetstream::Message {
1227                                    context: self.context.clone(),
1228                                    message,
1229                                })));
1230                            }
1231                            StatusCode::NO_RESPONDERS => {
1232                                debug!("received no responders");
1233                                return Poll::Ready(Some(Err(MessagesError::new(
1234                                    MessagesErrorKind::NoResponders,
1235                                ))));
1236                            }
1237                            status => {
1238                                debug!("received unknown message: {:?}", message);
1239                                return Poll::Ready(Some(Err(MessagesError::with_source(
1240                                    MessagesErrorKind::Other,
1241                                    format!(
1242                                        "error while processing messages from the stream: {}, {:?}",
1243                                        status, message.description
1244                                    ),
1245                                ))));
1246                            }
1247                        },
1248                        None => return Poll::Ready(None),
1249                    }
1250                }
1251                Poll::Pending => {
1252                    debug!("subscriber still pending");
1253                    return std::task::Poll::Pending;
1254                }
1255            }
1256        }
1257    }
1258}
1259
1260/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1261///
1262/// # Examples
1263///
1264/// ```no_run
1265/// # #[tokio::main]
1266/// # async fn main() -> Result<(), async_nats::Error>  {
1267/// use futures::StreamExt;
1268/// use async_nats::jetstream::consumer::PullConsumer;
1269/// let client = async_nats::connect("localhost:4222").await?;
1270/// let jetstream = async_nats::jetstream::new(client);
1271///
1272/// let consumer: PullConsumer = jetstream
1273///     .get_stream("events").await?
1274///     .get_consumer("pull").await?;
1275///
1276/// let mut messages = consumer.stream()
1277///     .max_messages_per_batch(100)
1278///     .max_bytes_per_batch(1024)
1279///     .messages().await?;
1280///
1281/// while let Some(message) = messages.next().await {
1282///     let message = message?;
1283///     println!("message: {:?}", message);
1284///     message.ack().await?;
1285/// }
1286/// # Ok(())
1287/// # }
1288pub struct StreamBuilder<'a> {
1289    batch: usize,
1290    max_bytes: usize,
1291    heartbeat: Duration,
1292    expires: Duration,
1293    group: Option<String>,
1294    min_pending: Option<usize>,
1295    min_ack_pending: Option<usize>,
1296    consumer: &'a Consumer<Config>,
1297}
1298
1299impl<'a> StreamBuilder<'a> {
1300    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1301        StreamBuilder {
1302            consumer,
1303            batch: 200,
1304            max_bytes: 0,
1305            expires: Duration::from_secs(30),
1306            heartbeat: Duration::default(),
1307            group: None,
1308            min_pending: None,
1309            min_ack_pending: None,
1310        }
1311    }
1312
1313    /// Sets max bytes that can be buffered on the Client while processing already received
1314    /// messages.
1315    /// Higher values will yield better performance, but also potentially increase memory usage if
1316    /// application is acknowledging messages much slower than they arrive.
1317    ///
1318    /// Default values should provide reasonable balance between performance and memory usage.
1319    ///
1320    /// # Examples
1321    ///
1322    /// ```no_run
1323    /// # #[tokio::main]
1324    /// # async fn main() -> Result<(), async_nats::Error>  {
1325    /// use async_nats::jetstream::consumer::PullConsumer;
1326    /// use futures::StreamExt;
1327    /// let client = async_nats::connect("localhost:4222").await?;
1328    /// let jetstream = async_nats::jetstream::new(client);
1329    ///
1330    /// let consumer: PullConsumer = jetstream
1331    ///     .get_stream("events")
1332    ///     .await?
1333    ///     .get_consumer("pull")
1334    ///     .await?;
1335    ///
1336    /// let mut messages = consumer
1337    ///     .stream()
1338    ///     .max_bytes_per_batch(1024)
1339    ///     .messages()
1340    ///     .await?;
1341    ///
1342    /// while let Some(message) = messages.next().await {
1343    ///     let message = message?;
1344    ///     println!("message: {:?}", message);
1345    ///     message.ack().await?;
1346    /// }
1347    /// # Ok(())
1348    /// # }
1349    /// ```
1350    pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1351        self.max_bytes = max_bytes;
1352        self
1353    }
1354
1355    /// Sets max number of messages that can be buffered on the Client while processing already received
1356    /// messages.
1357    /// Higher values will yield better performance, but also potentially increase memory usage if
1358    /// application is acknowledging messages much slower than they arrive.
1359    ///
1360    /// Default values should provide reasonable balance between performance and memory usage.
1361    ///
1362    /// # Examples
1363    ///
1364    /// ```no_run
1365    /// # #[tokio::main]
1366    /// # async fn main() -> Result<(), async_nats::Error>  {
1367    /// use async_nats::jetstream::consumer::PullConsumer;
1368    /// use futures::StreamExt;
1369    /// let client = async_nats::connect("localhost:4222").await?;
1370    /// let jetstream = async_nats::jetstream::new(client);
1371    ///
1372    /// let consumer: PullConsumer = jetstream
1373    ///     .get_stream("events")
1374    ///     .await?
1375    ///     .get_consumer("pull")
1376    ///     .await?;
1377    ///
1378    /// let mut messages = consumer
1379    ///     .stream()
1380    ///     .max_messages_per_batch(100)
1381    ///     .messages()
1382    ///     .await?;
1383    ///
1384    /// while let Some(message) = messages.next().await {
1385    ///     let message = message?;
1386    ///     println!("message: {:?}", message);
1387    ///     message.ack().await?;
1388    /// }
1389    /// # Ok(())
1390    /// # }
1391    /// ```
1392    pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1393        self.batch = batch;
1394        self
1395    }
1396
1397    /// Sets heartbeat which will be send by the server if there are no messages for a given
1398    /// [Consumer] pending.
1399    ///
1400    /// # Examples
1401    ///
1402    /// ```no_run
1403    /// # #[tokio::main]
1404    /// # async fn main() -> Result<(), async_nats::Error>  {
1405    /// use async_nats::jetstream::consumer::PullConsumer;
1406    /// use futures::StreamExt;
1407    /// let client = async_nats::connect("localhost:4222").await?;
1408    /// let jetstream = async_nats::jetstream::new(client);
1409    ///
1410    /// let consumer: PullConsumer = jetstream
1411    ///     .get_stream("events")
1412    ///     .await?
1413    ///     .get_consumer("pull")
1414    ///     .await?;
1415    ///
1416    /// let mut messages = consumer
1417    ///     .stream()
1418    ///     .heartbeat(std::time::Duration::from_secs(10))
1419    ///     .messages()
1420    ///     .await?;
1421    ///
1422    /// while let Some(message) = messages.next().await {
1423    ///     let message = message?;
1424    ///     println!("message: {:?}", message);
1425    ///     message.ack().await?;
1426    /// }
1427    /// # Ok(())
1428    /// # }
1429    /// ```
1430    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1431        self.heartbeat = heartbeat;
1432        self
1433    }
1434
1435    /// Low level API that does not need tweaking for most use cases.
1436    /// Sets how long each batch request waits for whole batch of messages before timing out.
1437    /// [Consumer] pending.
1438    ///
1439    /// # Examples
1440    ///
1441    /// ```no_run
1442    /// # #[tokio::main]
1443    /// # async fn main() -> Result<(), async_nats::Error>  {
1444    /// use async_nats::jetstream::consumer::PullConsumer;
1445    /// use futures::StreamExt;
1446    /// let client = async_nats::connect("localhost:4222").await?;
1447    /// let jetstream = async_nats::jetstream::new(client);
1448    ///
1449    /// let consumer: PullConsumer = jetstream
1450    ///     .get_stream("events")
1451    ///     .await?
1452    ///     .get_consumer("pull")
1453    ///     .await?;
1454    ///
1455    /// let mut messages = consumer
1456    ///     .stream()
1457    ///     .expires(std::time::Duration::from_secs(30))
1458    ///     .messages()
1459    ///     .await?;
1460    ///
1461    /// while let Some(message) = messages.next().await {
1462    ///     let message = message?;
1463    ///     println!("message: {:?}", message);
1464    ///     message.ack().await?;
1465    /// }
1466    /// # Ok(())
1467    /// # }
1468    /// ```
1469    pub fn expires(mut self, expires: Duration) -> Self {
1470        self.expires = expires;
1471        self
1472    }
1473
1474    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1475    /// messages for a [Consumer].
1476    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1477    ///
1478    /// # Examples
1479    ///
1480    /// ```no_run
1481    /// # #[tokio::main]
1482    /// # async fn main() -> Result<(), async_nats::Error>  {
1483    /// use async_nats::jetstream::consumer::PullConsumer;
1484    /// use futures::StreamExt;
1485    /// let client = async_nats::connect("localhost:4222").await?;
1486    /// let jetstream = async_nats::jetstream::new(client);
1487    ///
1488    /// let consumer: PullConsumer = jetstream
1489    ///     .get_stream("events")
1490    ///     .await?
1491    ///     .get_consumer("pull")
1492    ///     .await?;
1493    ///
1494    /// let mut messages = consumer
1495    ///     .stream()
1496    ///     .expires(std::time::Duration::from_secs(30))
1497    ///     .group("A")
1498    ///     .min_pending(100)
1499    ///     .messages()
1500    ///     .await?;
1501    ///
1502    /// while let Some(message) = messages.next().await {
1503    ///     let message = message?;
1504    ///     println!("message: {:?}", message);
1505    ///     message.ack().await?;
1506    /// }
1507    /// # Ok(())
1508    /// # }
1509    /// ```
1510    pub fn min_pending(mut self, min_pending: usize) -> Self {
1511        self.min_pending = Some(min_pending);
1512        self
1513    }
1514
1515    /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1516    /// messages for a [Consumer].
1517    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1518    ///
1519    /// # Examples
1520    ///
1521    /// ```no_run
1522    /// # #[tokio::main]
1523    /// # async fn main() -> Result<(), async_nats::Error>  {
1524    /// use async_nats::jetstream::consumer::PullConsumer;
1525    /// use futures::StreamExt;
1526    /// let client = async_nats::connect("localhost:4222").await?;
1527    /// let jetstream = async_nats::jetstream::new(client);
1528    ///
1529    /// let consumer: PullConsumer = jetstream
1530    ///     .get_stream("events")
1531    ///     .await?
1532    ///     .get_consumer("pull")
1533    ///     .await?;
1534    ///
1535    /// let mut messages = consumer
1536    ///     .stream()
1537    ///     .expires(std::time::Duration::from_secs(30))
1538    ///     .group("A")
1539    ///     .min_ack_pending(100)
1540    ///     .messages()
1541    ///     .await?;
1542    ///
1543    /// while let Some(message) = messages.next().await {
1544    ///     let message = message?;
1545    ///     println!("message: {:?}", message);
1546    ///     message.ack().await?;
1547    /// }
1548    /// # Ok(())
1549    /// # }
1550    /// ```
1551    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1552        self.min_ack_pending = Some(min_ack_pending);
1553        self
1554    }
1555
1556    /// Setting group when using [Consumer] with [Config::priority_groups].
1557    ///
1558    /// # Examples
1559    ///
1560    /// ```no_run
1561    /// # #[tokio::main]
1562    /// # async fn main() -> Result<(), async_nats::Error>  {
1563    /// use async_nats::jetstream::consumer::PullConsumer;
1564    /// use futures::StreamExt;
1565    /// let client = async_nats::connect("localhost:4222").await?;
1566    /// let jetstream = async_nats::jetstream::new(client);
1567    ///
1568    /// let consumer: PullConsumer = jetstream
1569    ///     .get_stream("events")
1570    ///     .await?
1571    ///     .get_consumer("pull")
1572    ///     .await?;
1573    ///
1574    /// let mut messages = consumer
1575    ///     .stream()
1576    ///     .expires(std::time::Duration::from_secs(30))
1577    ///     .group("A")
1578    ///     .min_ack_pending(100)
1579    ///     .messages()
1580    ///     .await?;
1581    ///
1582    /// while let Some(message) = messages.next().await {
1583    ///     let message = message?;
1584    ///     println!("message: {:?}", message);
1585    ///     message.ack().await?;
1586    /// }
1587    /// # Ok(())
1588    /// # }
1589    /// ```
1590    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1591        self.group = Some(group.into());
1592        self
1593    }
1594
1595    /// Creates actual [Stream] with provided configuration.
1596    ///
1597    /// # Examples
1598    ///
1599    /// ```no_run
1600    /// # #[tokio::main]
1601    /// # async fn main() -> Result<(), async_nats::Error>  {
1602    /// use async_nats::jetstream::consumer::PullConsumer;
1603    /// use futures::StreamExt;
1604    /// let client = async_nats::connect("localhost:4222").await?;
1605    /// let jetstream = async_nats::jetstream::new(client);
1606    ///
1607    /// let consumer: PullConsumer = jetstream
1608    ///     .get_stream("events")
1609    ///     .await?
1610    ///     .get_consumer("pull")
1611    ///     .await?;
1612    ///
1613    /// let mut messages = consumer
1614    ///     .stream()
1615    ///     .max_messages_per_batch(100)
1616    ///     .messages()
1617    ///     .await?;
1618    ///
1619    /// while let Some(message) = messages.next().await {
1620    ///     let message = message?;
1621    ///     println!("message: {:?}", message);
1622    ///     message.ack().await?;
1623    /// }
1624    /// # Ok(())
1625    /// # }
1626    /// ```
1627    pub async fn messages(self) -> Result<Stream, StreamError> {
1628        Stream::stream(
1629            BatchConfig {
1630                batch: self.batch,
1631                expires: Some(self.expires),
1632                no_wait: false,
1633                max_bytes: self.max_bytes,
1634                idle_heartbeat: self.heartbeat,
1635                min_pending: self.min_pending,
1636                group: self.group,
1637                min_ack_pending: self.min_ack_pending,
1638            },
1639            self.consumer,
1640        )
1641        .await
1642    }
1643}
1644
1645/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1646///
1647/// # Examples
1648///
1649/// ```no_run
1650/// # #[tokio::main]
1651/// # async fn main() -> Result<(), async_nats::Error>  {
1652/// use async_nats::jetstream::consumer::PullConsumer;
1653/// use futures::StreamExt;
1654/// let client = async_nats::connect("localhost:4222").await?;
1655/// let jetstream = async_nats::jetstream::new(client);
1656///
1657/// let consumer: PullConsumer = jetstream
1658///     .get_stream("events")
1659///     .await?
1660///     .get_consumer("pull")
1661///     .await?;
1662///
1663/// let mut messages = consumer
1664///     .fetch()
1665///     .max_messages(100)
1666///     .max_bytes(1024)
1667///     .messages()
1668///     .await?;
1669///
1670/// while let Some(message) = messages.next().await {
1671///     let message = message?;
1672///     println!("message: {:?}", message);
1673///     message.ack().await?;
1674/// }
1675/// # Ok(())
1676/// # }
1677/// ```
1678pub struct FetchBuilder<'a> {
1679    batch: usize,
1680    max_bytes: usize,
1681    heartbeat: Duration,
1682    expires: Option<Duration>,
1683    min_pending: Option<usize>,
1684    min_ack_pending: Option<usize>,
1685    group: Option<String>,
1686    consumer: &'a Consumer<Config>,
1687}
1688
1689impl<'a> FetchBuilder<'a> {
1690    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1691        FetchBuilder {
1692            consumer,
1693            batch: 200,
1694            max_bytes: 0,
1695            expires: None,
1696            min_pending: None,
1697            min_ack_pending: None,
1698            group: None,
1699            heartbeat: Duration::default(),
1700        }
1701    }
1702
1703    /// Sets max bytes that can be buffered on the Client while processing already received
1704    /// messages.
1705    /// Higher values will yield better performance, but also potentially increase memory usage if
1706    /// application is acknowledging messages much slower than they arrive.
1707    ///
1708    /// Default values should provide reasonable balance between performance and memory usage.
1709    ///
1710    /// # Examples
1711    ///
1712    /// ```no_run
1713    /// # #[tokio::main]
1714    /// # async fn main() -> Result<(), async_nats::Error>  {
1715    /// use futures::StreamExt;
1716    /// let client = async_nats::connect("localhost:4222").await?;
1717    /// let jetstream = async_nats::jetstream::new(client);
1718    ///
1719    /// let consumer = jetstream
1720    ///     .get_stream("events")
1721    ///     .await?
1722    ///     .get_consumer("pull")
1723    ///     .await?;
1724    ///
1725    /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1726    ///
1727    /// while let Some(message) = messages.next().await {
1728    ///     let message = message?;
1729    ///     println!("message: {:?}", message);
1730    ///     message.ack().await?;
1731    /// }
1732    /// # Ok(())
1733    /// # }
1734    /// ```
1735    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1736        self.max_bytes = max_bytes;
1737        self
1738    }
1739
1740    /// Sets max number of messages that can be buffered on the Client while processing already received
1741    /// messages.
1742    /// Higher values will yield better performance, but also potentially increase memory usage if
1743    /// application is acknowledging messages much slower than they arrive.
1744    ///
1745    /// Default values should provide reasonable balance between performance and memory usage.
1746    ///
1747    /// # Examples
1748    ///
1749    /// ```no_run
1750    /// # #[tokio::main]
1751    /// # async fn main() -> Result<(), async_nats::Error>  {
1752    /// use futures::StreamExt;
1753    /// let client = async_nats::connect("localhost:4222").await?;
1754    /// let jetstream = async_nats::jetstream::new(client);
1755    ///
1756    /// let consumer = jetstream
1757    ///     .get_stream("events")
1758    ///     .await?
1759    ///     .get_consumer("pull")
1760    ///     .await?;
1761    ///
1762    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1763    ///
1764    /// while let Some(message) = messages.next().await {
1765    ///     let message = message?;
1766    ///     println!("message: {:?}", message);
1767    ///     message.ack().await?;
1768    /// }
1769    /// # Ok(())
1770    /// # }
1771    /// ```
1772    pub fn max_messages(mut self, batch: usize) -> Self {
1773        self.batch = batch;
1774        self
1775    }
1776
1777    /// Sets heartbeat which will be send by the server if there are no messages for a given
1778    /// [Consumer] pending.
1779    ///
1780    /// # Examples
1781    ///
1782    /// ```no_run
1783    /// # #[tokio::main]
1784    /// # async fn main() -> Result<(), async_nats::Error>  {
1785    /// use async_nats::jetstream::consumer::PullConsumer;
1786    /// use futures::StreamExt;
1787    /// let client = async_nats::connect("localhost:4222").await?;
1788    /// let jetstream = async_nats::jetstream::new(client);
1789    ///
1790    /// let consumer = jetstream
1791    ///     .get_stream("events")
1792    ///     .await?
1793    ///     .get_consumer("pull")
1794    ///     .await?;
1795    ///
1796    /// let mut messages = consumer
1797    ///     .fetch()
1798    ///     .heartbeat(std::time::Duration::from_secs(10))
1799    ///     .messages()
1800    ///     .await?;
1801    ///
1802    /// while let Some(message) = messages.next().await {
1803    ///     let message = message?;
1804    ///     println!("message: {:?}", message);
1805    ///     message.ack().await?;
1806    /// }
1807    /// # Ok(())
1808    /// # }
1809    /// ```
1810    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1811        self.heartbeat = heartbeat;
1812        self
1813    }
1814
1815    /// Low level API that does not need tweaking for most use cases.
1816    /// Sets how long each batch request waits for whole batch of messages before timing out.
1817    /// [Consumer] pending.
1818    ///
1819    /// # Examples
1820    ///
1821    /// ```no_run
1822    /// # #[tokio::main]
1823    /// # async fn main() -> Result<(), async_nats::Error>  {
1824    /// use async_nats::jetstream::consumer::PullConsumer;
1825    /// use futures::StreamExt;
1826    ///
1827    /// let client = async_nats::connect("localhost:4222").await?;
1828    /// let jetstream = async_nats::jetstream::new(client);
1829    ///
1830    /// let consumer: PullConsumer = jetstream
1831    ///     .get_stream("events")
1832    ///     .await?
1833    ///     .get_consumer("pull")
1834    ///     .await?;
1835    ///
1836    /// let mut messages = consumer
1837    ///     .fetch()
1838    ///     .expires(std::time::Duration::from_secs(30))
1839    ///     .messages()
1840    ///     .await?;
1841    ///
1842    /// while let Some(message) = messages.next().await {
1843    ///     let message = message?;
1844    ///     println!("message: {:?}", message);
1845    ///     message.ack().await?;
1846    /// }
1847    /// # Ok(())
1848    /// # }
1849    /// ```
1850    pub fn expires(mut self, expires: Duration) -> Self {
1851        self.expires = Some(expires);
1852        self
1853    }
1854
1855    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1856    /// messages.
1857    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1858    /// [PriorityPolicy::Overflow] set.
1859    ///
1860    /// # Examples
1861    ///
1862    /// ```no_run
1863    /// # #[tokio::main]
1864    /// # async fn main() -> Result<(), async_nats::Error>  {
1865    /// use async_nats::jetstream::consumer::PullConsumer;
1866    /// use futures::StreamExt;
1867    ///
1868    /// let client = async_nats::connect("localhost:4222").await?;
1869    /// let jetstream = async_nats::jetstream::new(client);
1870    ///
1871    /// let consumer: PullConsumer = jetstream
1872    ///     .get_stream("events")
1873    ///     .await?
1874    ///     .get_consumer("pull")
1875    ///     .await?;
1876    ///
1877    /// let mut messages = consumer
1878    ///     .fetch()
1879    ///     .expires(std::time::Duration::from_secs(30))
1880    ///     .group("A")
1881    ///     .min_pending(100)
1882    ///     .messages()
1883    ///     .await?;
1884    ///
1885    /// while let Some(message) = messages.next().await {
1886    ///     let message = message?;
1887    ///     println!("message: {:?}", message);
1888    ///     message.ack().await?;
1889    /// }
1890    /// # Ok(())
1891    /// # }
1892    /// ```
1893    pub fn min_pending(mut self, min_pending: usize) -> Self {
1894        self.min_pending = Some(min_pending);
1895        self
1896    }
1897
1898    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1899    /// messages.
1900    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1901    /// [PriorityPolicy::Overflow] set.
1902    ///
1903    /// # Examples
1904    ///
1905    /// ```no_run
1906    /// # #[tokio::main]
1907    /// # async fn main() -> Result<(), async_nats::Error>  {
1908    /// use async_nats::jetstream::consumer::PullConsumer;
1909    /// use futures::StreamExt;
1910    ///
1911    /// let client = async_nats::connect("localhost:4222").await?;
1912    /// let jetstream = async_nats::jetstream::new(client);
1913    ///
1914    /// let consumer: PullConsumer = jetstream
1915    ///     .get_stream("events")
1916    ///     .await?
1917    ///     .get_consumer("pull")
1918    ///     .await?;
1919    ///
1920    /// let mut messages = consumer
1921    ///     .fetch()
1922    ///     .expires(std::time::Duration::from_secs(30))
1923    ///     .group("A")
1924    ///     .min_ack_pending(100)
1925    ///     .messages()
1926    ///     .await?;
1927    ///
1928    /// while let Some(message) = messages.next().await {
1929    ///     let message = message?;
1930    ///     println!("message: {:?}", message);
1931    ///     message.ack().await?;
1932    /// }
1933    /// # Ok(())
1934    /// # }
1935    /// ```
1936    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1937        self.min_ack_pending = Some(min_ack_pending);
1938        self
1939    }
1940
1941    /// Setting group when using [Consumer] with [PriorityPolicy].
1942    ///
1943    /// # Examples
1944    ///
1945    /// ```no_run
1946    /// # #[tokio::main]
1947    /// # async fn main() -> Result<(), async_nats::Error>  {
1948    /// use async_nats::jetstream::consumer::PullConsumer;
1949    /// use futures::StreamExt;
1950    ///
1951    /// let client = async_nats::connect("localhost:4222").await?;
1952    /// let jetstream = async_nats::jetstream::new(client);
1953    ///
1954    /// let consumer: PullConsumer = jetstream
1955    ///     .get_stream("events")
1956    ///     .await?
1957    ///     .get_consumer("pull")
1958    ///     .await?;
1959    ///
1960    /// let mut messages = consumer
1961    ///     .fetch()
1962    ///     .expires(std::time::Duration::from_secs(30))
1963    ///     .group("A")
1964    ///     .min_ack_pending(100)
1965    ///     .messages()
1966    ///     .await?;
1967    ///
1968    /// while let Some(message) = messages.next().await {
1969    ///     let message = message?;
1970    ///     println!("message: {:?}", message);
1971    ///     message.ack().await?;
1972    /// }
1973    /// # Ok(())
1974    /// # }
1975    /// ```
1976    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1977        self.group = Some(group.into());
1978        self
1979    }
1980
1981    /// Creates actual [Stream] with provided configuration.
1982    ///
1983    /// # Examples
1984    ///
1985    /// ```no_run
1986    /// # #[tokio::main]
1987    /// # async fn main() -> Result<(), async_nats::Error>  {
1988    /// use async_nats::jetstream::consumer::PullConsumer;
1989    /// use futures::StreamExt;
1990    /// let client = async_nats::connect("localhost:4222").await?;
1991    /// let jetstream = async_nats::jetstream::new(client);
1992    ///
1993    /// let consumer: PullConsumer = jetstream
1994    ///     .get_stream("events")
1995    ///     .await?
1996    ///     .get_consumer("pull")
1997    ///     .await?;
1998    ///
1999    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
2000    ///
2001    /// while let Some(message) = messages.next().await {
2002    ///     let message = message?;
2003    ///     println!("message: {:?}", message);
2004    ///     message.ack().await?;
2005    /// }
2006    /// # Ok(())
2007    /// # }
2008    /// ```
2009    pub async fn messages(self) -> Result<Batch, BatchError> {
2010        Batch::batch(
2011            BatchConfig {
2012                batch: self.batch,
2013                expires: self.expires,
2014                no_wait: true,
2015                max_bytes: self.max_bytes,
2016                idle_heartbeat: self.heartbeat,
2017                min_pending: self.min_pending,
2018                min_ack_pending: self.min_ack_pending,
2019                group: self.group,
2020            },
2021            self.consumer,
2022        )
2023        .await
2024    }
2025}
2026
2027/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2028///
2029/// # Examples
2030///
2031/// ```no_run
2032/// # #[tokio::main]
2033/// # async fn main() -> Result<(), async_nats::Error>  {
2034/// use async_nats::jetstream::consumer::PullConsumer;
2035/// use futures::StreamExt;
2036/// let client = async_nats::connect("localhost:4222").await?;
2037/// let jetstream = async_nats::jetstream::new(client);
2038///
2039/// let consumer: PullConsumer = jetstream
2040///     .get_stream("events")
2041///     .await?
2042///     .get_consumer("pull")
2043///     .await?;
2044///
2045/// let mut messages = consumer
2046///     .batch()
2047///     .max_messages(100)
2048///     .max_bytes(1024)
2049///     .messages()
2050///     .await?;
2051///
2052/// while let Some(message) = messages.next().await {
2053///     let message = message?;
2054///     println!("message: {:?}", message);
2055///     message.ack().await?;
2056/// }
2057/// # Ok(())
2058/// # }
2059/// ```
2060pub struct BatchBuilder<'a> {
2061    batch: usize,
2062    max_bytes: usize,
2063    heartbeat: Duration,
2064    expires: Duration,
2065    min_pending: Option<usize>,
2066    min_ack_pending: Option<usize>,
2067    group: Option<String>,
2068    consumer: &'a Consumer<Config>,
2069}
2070
2071impl<'a> BatchBuilder<'a> {
2072    pub fn new(consumer: &'a Consumer<Config>) -> Self {
2073        BatchBuilder {
2074            consumer,
2075            batch: 200,
2076            max_bytes: 0,
2077            expires: Duration::ZERO,
2078            heartbeat: Duration::default(),
2079            min_pending: None,
2080            min_ack_pending: None,
2081            group: None,
2082        }
2083    }
2084
2085    /// Sets max bytes that can be buffered on the Client while processing already received
2086    /// messages.
2087    /// Higher values will yield better performance, but also potentially increase memory usage if
2088    /// application is acknowledging messages much slower than they arrive.
2089    ///
2090    /// Default values should provide reasonable balance between performance and memory usage.
2091    ///
2092    /// # Examples
2093    ///
2094    /// ```no_run
2095    /// # #[tokio::main]
2096    /// # async fn main() -> Result<(), async_nats::Error>  {
2097    /// use async_nats::jetstream::consumer::PullConsumer;
2098    /// use futures::StreamExt;
2099    /// let client = async_nats::connect("localhost:4222").await?;
2100    /// let jetstream = async_nats::jetstream::new(client);
2101    ///
2102    /// let consumer: PullConsumer = jetstream
2103    ///     .get_stream("events")
2104    ///     .await?
2105    ///     .get_consumer("pull")
2106    ///     .await?;
2107    ///
2108    /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2109    ///
2110    /// while let Some(message) = messages.next().await {
2111    ///     let message = message?;
2112    ///     println!("message: {:?}", message);
2113    ///     message.ack().await?;
2114    /// }
2115    /// # Ok(())
2116    /// # }
2117    /// ```
2118    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2119        self.max_bytes = max_bytes;
2120        self
2121    }
2122
2123    /// Sets max number of messages that can be buffered on the Client while processing already received
2124    /// messages.
2125    /// Higher values will yield better performance, but also potentially increase memory usage if
2126    /// application is acknowledging messages much slower than they arrive.
2127    ///
2128    /// Default values should provide reasonable balance between performance and memory usage.
2129    ///
2130    /// # Examples
2131    ///
2132    /// ```no_run
2133    /// # #[tokio::main]
2134    /// # async fn main() -> Result<(), async_nats::Error>  {
2135    /// use async_nats::jetstream::consumer::PullConsumer;
2136    /// use futures::StreamExt;
2137    /// let client = async_nats::connect("localhost:4222").await?;
2138    /// let jetstream = async_nats::jetstream::new(client);
2139    ///
2140    /// let consumer: PullConsumer = jetstream
2141    ///     .get_stream("events")
2142    ///     .await?
2143    ///     .get_consumer("pull")
2144    ///     .await?;
2145    ///
2146    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2147    ///
2148    /// while let Some(message) = messages.next().await {
2149    ///     let message = message?;
2150    ///     println!("message: {:?}", message);
2151    ///     message.ack().await?;
2152    /// }
2153    /// # Ok(())
2154    /// # }
2155    /// ```
2156    pub fn max_messages(mut self, batch: usize) -> Self {
2157        self.batch = batch;
2158        self
2159    }
2160
2161    /// Sets heartbeat which will be send by the server if there are no messages for a given
2162    /// [Consumer] pending.
2163    ///
2164    /// # Examples
2165    ///
2166    /// ```no_run
2167    /// # #[tokio::main]
2168    /// # async fn main() -> Result<(), async_nats::Error>  {
2169    /// use async_nats::jetstream::consumer::PullConsumer;
2170    /// use futures::StreamExt;
2171    /// let client = async_nats::connect("localhost:4222").await?;
2172    /// let jetstream = async_nats::jetstream::new(client);
2173    ///
2174    /// let consumer: PullConsumer = jetstream
2175    ///     .get_stream("events")
2176    ///     .await?
2177    ///     .get_consumer("pull")
2178    ///     .await?;
2179    ///
2180    /// let mut messages = consumer
2181    ///     .batch()
2182    ///     .heartbeat(std::time::Duration::from_secs(10))
2183    ///     .messages()
2184    ///     .await?;
2185    ///
2186    /// while let Some(message) = messages.next().await {
2187    ///     let message = message?;
2188    ///     println!("message: {:?}", message);
2189    ///     message.ack().await?;
2190    /// }
2191    /// # Ok(())
2192    /// # }
2193    /// ```
2194    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2195        self.heartbeat = heartbeat;
2196        self
2197    }
2198
2199    /// Sets overflow threshold for minimum pending messages before this stream will start getting
2200    /// messages.
2201    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2202    /// [PriorityPolicy::Overflow] set.
2203    ///
2204    /// # Examples
2205    ///
2206    /// ```no_run
2207    /// # #[tokio::main]
2208    /// # async fn main() -> Result<(), async_nats::Error>  {
2209    /// use async_nats::jetstream::consumer::PullConsumer;
2210    /// use futures::StreamExt;
2211    ///
2212    /// let client = async_nats::connect("localhost:4222").await?;
2213    /// let jetstream = async_nats::jetstream::new(client);
2214    ///
2215    /// let consumer: PullConsumer = jetstream
2216    ///     .get_stream("events")
2217    ///     .await?
2218    ///     .get_consumer("pull")
2219    ///     .await?;
2220    ///
2221    /// let mut messages = consumer
2222    ///     .batch()
2223    ///     .expires(std::time::Duration::from_secs(30))
2224    ///     .group("A")
2225    ///     .min_pending(100)
2226    ///     .messages()
2227    ///     .await?;
2228    ///
2229    /// while let Some(message) = messages.next().await {
2230    ///     let message = message?;
2231    ///     println!("message: {:?}", message);
2232    ///     message.ack().await?;
2233    /// }
2234    /// # Ok(())
2235    /// # }
2236    /// ```
2237    pub fn min_pending(mut self, min_pending: usize) -> Self {
2238        self.min_pending = Some(min_pending);
2239        self
2240    }
2241
2242    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2243    /// messages.
2244    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2245    /// [PriorityPolicy::Overflow] set.
2246    ///
2247    /// # Examples
2248    ///
2249    /// ```no_run
2250    /// # #[tokio::main]
2251    /// # async fn main() -> Result<(), async_nats::Error>  {
2252    /// use async_nats::jetstream::consumer::PullConsumer;
2253    /// use futures::StreamExt;
2254    ///
2255    /// let client = async_nats::connect("localhost:4222").await?;
2256    /// let jetstream = async_nats::jetstream::new(client);
2257    ///
2258    /// let consumer: PullConsumer = jetstream
2259    ///     .get_stream("events")
2260    ///     .await?
2261    ///     .get_consumer("pull")
2262    ///     .await?;
2263    ///
2264    /// let mut messages = consumer
2265    ///     .batch()
2266    ///     .expires(std::time::Duration::from_secs(30))
2267    ///     .group("A")
2268    ///     .min_ack_pending(100)
2269    ///     .messages()
2270    ///     .await?;
2271    ///
2272    /// while let Some(message) = messages.next().await {
2273    ///     let message = message?;
2274    ///     println!("message: {:?}", message);
2275    ///     message.ack().await?;
2276    /// }
2277    /// # Ok(())
2278    /// # }
2279    /// ```
2280    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2281        self.min_ack_pending = Some(min_ack_pending);
2282        self
2283    }
2284
2285    /// Setting group when using [Consumer] with [PriorityPolicy].
2286    ///
2287    /// # Examples
2288    ///
2289    /// ```no_run
2290    /// # #[tokio::main]
2291    /// # async fn main() -> Result<(), async_nats::Error>  {
2292    /// use async_nats::jetstream::consumer::PullConsumer;
2293    /// use futures::StreamExt;
2294    ///
2295    /// let client = async_nats::connect("localhost:4222").await?;
2296    /// let jetstream = async_nats::jetstream::new(client);
2297    ///
2298    /// let consumer: PullConsumer = jetstream
2299    ///     .get_stream("events")
2300    ///     .await?
2301    ///     .get_consumer("pull")
2302    ///     .await?;
2303    ///
2304    /// let mut messages = consumer
2305    ///     .batch()
2306    ///     .expires(std::time::Duration::from_secs(30))
2307    ///     .group("A")
2308    ///     .min_ack_pending(100)
2309    ///     .messages()
2310    ///     .await?;
2311    ///
2312    /// while let Some(message) = messages.next().await {
2313    ///     let message = message?;
2314    ///     println!("message: {:?}", message);
2315    ///     message.ack().await?;
2316    /// }
2317    /// # Ok(())
2318    /// # }
2319    /// ```
2320    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2321        self.group = Some(group.into());
2322        self
2323    }
2324
2325    /// Low level API that does not need tweaking for most use cases.
2326    /// Sets how long each batch request waits for whole batch of messages before timing out.
2327    /// [Consumer] pending.
2328    ///
2329    /// # Examples
2330    ///
2331    /// ```no_run
2332    /// # #[tokio::main]
2333    /// # async fn main() -> Result<(), async_nats::Error>  {
2334    /// use async_nats::jetstream::consumer::PullConsumer;
2335    /// use futures::StreamExt;
2336    /// let client = async_nats::connect("localhost:4222").await?;
2337    /// let jetstream = async_nats::jetstream::new(client);
2338    ///
2339    /// let consumer: PullConsumer = jetstream
2340    ///     .get_stream("events")
2341    ///     .await?
2342    ///     .get_consumer("pull")
2343    ///     .await?;
2344    ///
2345    /// let mut messages = consumer
2346    ///     .batch()
2347    ///     .expires(std::time::Duration::from_secs(30))
2348    ///     .messages()
2349    ///     .await?;
2350    ///
2351    /// while let Some(message) = messages.next().await {
2352    ///     let message = message?;
2353    ///     println!("message: {:?}", message);
2354    ///     message.ack().await?;
2355    /// }
2356    /// # Ok(())
2357    /// # }
2358    /// ```
2359    pub fn expires(mut self, expires: Duration) -> Self {
2360        self.expires = expires;
2361        self
2362    }
2363
2364    /// Creates actual [Stream] with provided configuration.
2365    ///
2366    /// # Examples
2367    ///
2368    /// ```no_run
2369    /// # #[tokio::main]
2370    /// # async fn main() -> Result<(), async_nats::Error>  {
2371    /// use async_nats::jetstream::consumer::PullConsumer;
2372    /// use futures::StreamExt;
2373    /// let client = async_nats::connect("localhost:4222").await?;
2374    /// let jetstream = async_nats::jetstream::new(client);
2375    ///
2376    /// let consumer: PullConsumer = jetstream
2377    ///     .get_stream("events")
2378    ///     .await?
2379    ///     .get_consumer("pull")
2380    ///     .await?;
2381    ///
2382    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2383    ///
2384    /// while let Some(message) = messages.next().await {
2385    ///     let message = message?;
2386    ///     println!("message: {:?}", message);
2387    ///     message.ack().await?;
2388    /// }
2389    /// # Ok(())
2390    /// # }
2391    /// ```
2392    pub async fn messages(self) -> Result<Batch, BatchError> {
2393        let config = BatchConfig {
2394            batch: self.batch,
2395            expires: Some(self.expires),
2396            no_wait: false,
2397            max_bytes: self.max_bytes,
2398            idle_heartbeat: self.heartbeat,
2399            min_pending: self.min_pending,
2400            min_ack_pending: self.min_ack_pending,
2401            group: self.group,
2402        };
2403        Batch::batch(config, self.consumer).await
2404    }
2405}
2406
2407/// Used for next Pull Request for Pull Consumer
2408#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2409pub struct BatchConfig {
2410    /// The number of messages that are being requested to be delivered.
2411    pub batch: usize,
2412    /// The optional number of nanoseconds that the server will store this next request for
2413    /// before forgetting about the pending batch size.
2414    #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2415    pub expires: Option<Duration>,
2416    /// This optionally causes the server not to store this pending request at all, but when there are no
2417    /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2418    /// can know when you reached the end of the stream for example. A 409 is returned if the
2419    /// Consumer has reached MaxAckPending limits.
2420    #[serde(skip_serializing_if = "is_default")]
2421    pub no_wait: bool,
2422
2423    /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2424    /// Whichever value is reached first, batch will complete.
2425    pub max_bytes: usize,
2426
2427    /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2428    /// client
2429    #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2430    pub idle_heartbeat: Duration,
2431
2432    pub min_pending: Option<usize>,
2433    pub min_ack_pending: Option<usize>,
2434    pub group: Option<String>,
2435}
2436
2437fn is_default<T: Default + Eq>(t: &T) -> bool {
2438    t == &T::default()
2439}
2440
2441#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2442pub struct Config {
2443    /// Setting `durable_name` to `Some(...)` will cause this consumer
2444    /// to be "durable". This may be a good choice for workloads that
2445    /// benefit from the `JetStream` server or cluster remembering the
2446    /// progress of consumers for fault tolerance purposes. If a consumer
2447    /// crashes, the `JetStream` server or cluster will remember which
2448    /// messages the consumer acknowledged. When the consumer recovers,
2449    /// this information will allow the consumer to resume processing
2450    /// where it left off. If you're unsure, set this to `Some(...)`.
2451    ///
2452    /// Setting `durable_name` to `None` will cause this consumer to
2453    /// be "ephemeral". This may be a good choice for workloads where
2454    /// you don't need the `JetStream` server to remember the consumer's
2455    /// progress in the case of a crash, such as certain "high churn"
2456    /// workloads or workloads where a crashed instance is not required
2457    /// to recover.
2458    #[serde(default, skip_serializing_if = "Option::is_none")]
2459    pub durable_name: Option<String>,
2460    /// A name of the consumer. Can be specified for both durable and ephemeral
2461    /// consumers.
2462    #[serde(default, skip_serializing_if = "Option::is_none")]
2463    pub name: Option<String>,
2464    /// A short description of the purpose of this consumer.
2465    #[serde(default, skip_serializing_if = "Option::is_none")]
2466    pub description: Option<String>,
2467    /// Allows for a variety of options that determine how this consumer will receive messages
2468    #[serde(flatten)]
2469    pub deliver_policy: DeliverPolicy,
2470    /// How messages should be acknowledged
2471    pub ack_policy: AckPolicy,
2472    /// How long to allow messages to remain un-acknowledged before attempting redelivery
2473    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2474    pub ack_wait: Duration,
2475    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2476    #[serde(default, skip_serializing_if = "is_default")]
2477    pub max_deliver: i64,
2478    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2479    #[serde(default, skip_serializing_if = "is_default")]
2480    pub filter_subject: String,
2481    #[cfg(feature = "server_2_10")]
2482    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2483    #[serde(default, skip_serializing_if = "is_default")]
2484    pub filter_subjects: Vec<String>,
2485    /// Whether messages are sent as quickly as possible or at the rate of receipt
2486    pub replay_policy: ReplayPolicy,
2487    /// The rate of message delivery in bits per second
2488    #[serde(default, skip_serializing_if = "is_default")]
2489    pub rate_limit: u64,
2490    /// What percentage of acknowledgments should be samples for observability, 0-100
2491    #[serde(
2492        rename = "sample_freq",
2493        with = "super::sample_freq_deser",
2494        default,
2495        skip_serializing_if = "is_default"
2496    )]
2497    pub sample_frequency: u8,
2498    /// The maximum number of waiting consumers.
2499    #[serde(default, skip_serializing_if = "is_default")]
2500    pub max_waiting: i64,
2501    /// The maximum number of unacknowledged messages that may be
2502    /// in-flight before pausing sending additional messages to
2503    /// this consumer.
2504    #[serde(default, skip_serializing_if = "is_default")]
2505    pub max_ack_pending: i64,
2506    /// Only deliver headers without payloads.
2507    #[serde(default, skip_serializing_if = "is_default")]
2508    pub headers_only: bool,
2509    /// Maximum size of a request batch
2510    #[serde(default, skip_serializing_if = "is_default")]
2511    pub max_batch: i64,
2512    /// Maximum value of request max_bytes
2513    #[serde(default, skip_serializing_if = "is_default")]
2514    pub max_bytes: i64,
2515    /// Maximum value for request expiration
2516    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2517    pub max_expires: Duration,
2518    /// Threshold for consumer inactivity
2519    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2520    pub inactive_threshold: Duration,
2521    /// Number of consumer replicas
2522    #[serde(default, skip_serializing_if = "is_default")]
2523    pub num_replicas: usize,
2524    /// Force consumer to use memory storage.
2525    #[serde(default, skip_serializing_if = "is_default")]
2526    pub memory_storage: bool,
2527    #[cfg(feature = "server_2_10")]
2528    // Additional consumer metadata.
2529    #[serde(default, skip_serializing_if = "is_default")]
2530    pub metadata: HashMap<String, String>,
2531    /// Custom backoff for missed acknowledgments.
2532    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2533    pub backoff: Vec<Duration>,
2534
2535    /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2536    #[cfg(feature = "server_2_11")]
2537    #[serde(default, skip_serializing_if = "is_default")]
2538    pub priority_policy: PriorityPolicy,
2539    /// Priority groups for this consumer. Currently only one group is supported and is used
2540    /// in conjunction with [Config::priority_policy].
2541    #[cfg(feature = "server_2_11")]
2542    #[serde(default, skip_serializing_if = "is_default")]
2543    pub priority_groups: Vec<String>,
2544    /// For suspending the consumer until the deadline.
2545    #[cfg(feature = "server_2_11")]
2546    #[serde(
2547        default,
2548        with = "rfc3339::option",
2549        skip_serializing_if = "Option::is_none"
2550    )]
2551    pub pause_until: Option<OffsetDateTime>,
2552}
2553
2554impl IntoConsumerConfig for &Config {
2555    fn into_consumer_config(self) -> consumer::Config {
2556        self.clone().into_consumer_config()
2557    }
2558}
2559
2560impl IntoConsumerConfig for Config {
2561    fn into_consumer_config(self) -> consumer::Config {
2562        jetstream::consumer::Config {
2563            deliver_subject: None,
2564            name: self.name,
2565            durable_name: self.durable_name,
2566            description: self.description,
2567            deliver_group: None,
2568            deliver_policy: self.deliver_policy,
2569            ack_policy: self.ack_policy,
2570            ack_wait: self.ack_wait,
2571            max_deliver: self.max_deliver,
2572            filter_subject: self.filter_subject,
2573            #[cfg(feature = "server_2_10")]
2574            filter_subjects: self.filter_subjects,
2575            replay_policy: self.replay_policy,
2576            rate_limit: self.rate_limit,
2577            sample_frequency: self.sample_frequency,
2578            max_waiting: self.max_waiting,
2579            max_ack_pending: self.max_ack_pending,
2580            headers_only: self.headers_only,
2581            flow_control: false,
2582            idle_heartbeat: Duration::default(),
2583            max_batch: self.max_batch,
2584            max_bytes: self.max_bytes,
2585            max_expires: self.max_expires,
2586            inactive_threshold: self.inactive_threshold,
2587            num_replicas: self.num_replicas,
2588            memory_storage: self.memory_storage,
2589            #[cfg(feature = "server_2_10")]
2590            metadata: self.metadata,
2591            backoff: self.backoff,
2592            #[cfg(feature = "server_2_11")]
2593            priority_policy: self.priority_policy,
2594            #[cfg(feature = "server_2_11")]
2595            priority_groups: self.priority_groups,
2596            #[cfg(feature = "server_2_11")]
2597            pause_until: self.pause_until,
2598        }
2599    }
2600}
2601impl FromConsumer for Config {
2602    fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2603        if config.deliver_subject.is_some() {
2604            return Err(Box::new(std::io::Error::new(
2605                std::io::ErrorKind::Other,
2606                "pull consumer cannot have delivery subject",
2607            )));
2608        }
2609        Ok(Config {
2610            durable_name: config.durable_name,
2611            name: config.name,
2612            description: config.description,
2613            deliver_policy: config.deliver_policy,
2614            ack_policy: config.ack_policy,
2615            ack_wait: config.ack_wait,
2616            max_deliver: config.max_deliver,
2617            filter_subject: config.filter_subject,
2618            #[cfg(feature = "server_2_10")]
2619            filter_subjects: config.filter_subjects,
2620            replay_policy: config.replay_policy,
2621            rate_limit: config.rate_limit,
2622            sample_frequency: config.sample_frequency,
2623            max_waiting: config.max_waiting,
2624            max_ack_pending: config.max_ack_pending,
2625            headers_only: config.headers_only,
2626            max_batch: config.max_batch,
2627            max_bytes: config.max_bytes,
2628            max_expires: config.max_expires,
2629            inactive_threshold: config.inactive_threshold,
2630            num_replicas: config.num_replicas,
2631            memory_storage: config.memory_storage,
2632            #[cfg(feature = "server_2_10")]
2633            metadata: config.metadata,
2634            backoff: config.backoff,
2635            #[cfg(feature = "server_2_11")]
2636            priority_policy: config.priority_policy,
2637            #[cfg(feature = "server_2_11")]
2638            priority_groups: config.priority_groups,
2639            #[cfg(feature = "server_2_11")]
2640            pause_until: config.pause_until,
2641        })
2642    }
2643}
2644
2645#[derive(Clone, Copy, Debug, PartialEq)]
2646pub enum BatchRequestErrorKind {
2647    Publish,
2648    Flush,
2649    Serialize,
2650}
2651
2652impl std::fmt::Display for BatchRequestErrorKind {
2653    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2654        match self {
2655            Self::Publish => write!(f, "publish failed"),
2656            Self::Flush => write!(f, "flush failed"),
2657            Self::Serialize => write!(f, "serialize failed"),
2658        }
2659    }
2660}
2661
2662pub type BatchRequestError = Error<BatchRequestErrorKind>;
2663
2664#[derive(Clone, Copy, Debug, PartialEq)]
2665pub enum BatchErrorKind {
2666    Subscribe,
2667    Pull,
2668    Flush,
2669    Serialize,
2670}
2671
2672impl std::fmt::Display for BatchErrorKind {
2673    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2674        match self {
2675            Self::Pull => write!(f, "pull request failed"),
2676            Self::Flush => write!(f, "flush failed"),
2677            Self::Serialize => write!(f, "serialize failed"),
2678            Self::Subscribe => write!(f, "subscribe failed"),
2679        }
2680    }
2681}
2682
2683pub type BatchError = Error<BatchErrorKind>;
2684
2685impl From<SubscribeError> for BatchError {
2686    fn from(err: SubscribeError) -> Self {
2687        BatchError::with_source(BatchErrorKind::Subscribe, err)
2688    }
2689}
2690
2691impl From<BatchRequestError> for BatchError {
2692    fn from(err: BatchRequestError) -> Self {
2693        BatchError::with_source(BatchErrorKind::Pull, err)
2694    }
2695}
2696
2697#[derive(Clone, Copy, Debug, PartialEq)]
2698pub enum ConsumerRecreateErrorKind {
2699    GetStream,
2700    Recreate,
2701    TimedOut,
2702}
2703
2704impl std::fmt::Display for ConsumerRecreateErrorKind {
2705    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2706        match self {
2707            Self::GetStream => write!(f, "error getting stream"),
2708            Self::Recreate => write!(f, "consumer creation failed"),
2709            Self::TimedOut => write!(f, "timed out"),
2710        }
2711    }
2712}
2713
2714pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2715
2716async fn recreate_consumer_stream(
2717    context: &Context,
2718    config: &OrderedConfig,
2719    stream_name: &str,
2720    consumer_name: &str,
2721    sequence: u64,
2722) -> Result<Stream, ConsumerRecreateError> {
2723    let span = tracing::span!(
2724        tracing::Level::DEBUG,
2725        "recreate_ordered_consumer",
2726        stream_name = stream_name,
2727        consumer_name = consumer_name,
2728        sequence = sequence
2729    );
2730    let _span_handle = span.enter();
2731    let config = config.to_owned();
2732    trace!("delete old consumer before creating new one");
2733
2734    tokio::time::timeout(
2735        Duration::from_secs(5),
2736        context.delete_consumer_from_stream(consumer_name, stream_name),
2737    )
2738    .await
2739    .ok();
2740
2741    let deliver_policy = {
2742        if sequence == 0 {
2743            DeliverPolicy::All
2744        } else {
2745            DeliverPolicy::ByStartSequence {
2746                start_sequence: sequence + 1,
2747            }
2748        }
2749    };
2750    trace!("create the new ordered consumer for sequence {}", sequence);
2751    let consumer = tokio::time::timeout(
2752        Duration::from_secs(5),
2753        context.create_consumer_on_stream(
2754            jetstream::consumer::pull::OrderedConfig {
2755                deliver_policy,
2756                ..config.clone()
2757            },
2758            stream_name,
2759        ),
2760    )
2761    .await
2762    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2763    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2764
2765    let config = Consumer {
2766        config: config.clone().into(),
2767        context: context.clone(),
2768        info: consumer.info,
2769    };
2770
2771    trace!("create iterator");
2772    let stream = tokio::time::timeout(
2773        Duration::from_secs(5),
2774        Stream::stream(
2775            BatchConfig {
2776                batch: 500,
2777                expires: Some(Duration::from_secs(30)),
2778                no_wait: false,
2779                max_bytes: 0,
2780                idle_heartbeat: Duration::from_secs(15),
2781                min_pending: None,
2782                min_ack_pending: None,
2783                group: None,
2784            },
2785            &config,
2786        ),
2787    )
2788    .await
2789    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2790    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2791    trace!("recreated consumer");
2792    stream
2793}