async_nats/jetstream/consumer/
pull.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16    future::{BoxFuture, Either},
17    FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32    connection::State,
33    error::Error,
34    jetstream::{self, Context},
35    StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44    AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45    StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50    /// Returns a stream of messages for Pull Consumer.
51    ///
52    /// # Example
53    ///
54    /// ```no_run
55    /// # #[tokio::main]
56    /// # async fn mains() -> Result<(), async_nats::Error> {
57    /// use futures::StreamExt;
58    /// use futures::TryStreamExt;
59    ///
60    /// let client = async_nats::connect("localhost:4222").await?;
61    /// let jetstream = async_nats::jetstream::new(client);
62    ///
63    /// let stream = jetstream
64    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
65    ///         name: "events".to_string(),
66    ///         max_messages: 10_000,
67    ///         ..Default::default()
68    ///     })
69    ///     .await?;
70    ///
71    /// jetstream.publish("events", "data".into()).await?;
72    ///
73    /// let consumer = stream
74    ///     .get_or_create_consumer(
75    ///         "consumer",
76    ///         async_nats::jetstream::consumer::pull::Config {
77    ///             durable_name: Some("consumer".to_string()),
78    ///             ..Default::default()
79    ///         },
80    ///     )
81    ///     .await?;
82    ///
83    /// let mut messages = consumer.messages().await?.take(100);
84    /// while let Some(Ok(message)) = messages.next().await {
85    ///     println!("got message {:?}", message);
86    ///     message.ack().await?;
87    /// }
88    /// Ok(())
89    /// # }
90    /// ```
91    pub async fn messages(&self) -> Result<Stream, StreamError> {
92        Stream::stream(
93            BatchConfig {
94                batch: 200,
95                expires: Some(Duration::from_secs(30)),
96                no_wait: false,
97                max_bytes: 0,
98                idle_heartbeat: Duration::from_secs(15),
99                min_pending: None,
100                min_ack_pending: None,
101                group: None,
102            },
103            self,
104        )
105        .await
106    }
107
108    /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
109    /// messages or bytes buffered.
110    ///
111    /// # Examples
112    ///
113    /// ```no_run
114    /// # #[tokio::main]
115    /// # async fn main() -> Result<(), async_nats::Error>  {
116    /// use async_nats::jetstream::consumer::PullConsumer;
117    /// use futures::StreamExt;
118    /// let client = async_nats::connect("localhost:4222").await?;
119    /// let jetstream = async_nats::jetstream::new(client);
120    ///
121    /// let consumer: PullConsumer = jetstream
122    ///     .get_stream("events")
123    ///     .await?
124    ///     .get_consumer("pull")
125    ///     .await?;
126    ///
127    /// let mut messages = consumer
128    ///     .stream()
129    ///     .max_messages_per_batch(100)
130    ///     .max_bytes_per_batch(1024)
131    ///     .messages()
132    ///     .await?;
133    ///
134    /// while let Some(message) = messages.next().await {
135    ///     let message = message?;
136    ///     println!("message: {:?}", message);
137    ///     message.ack().await?;
138    /// }
139    /// # Ok(())
140    /// # }
141    /// ```
142    pub fn stream(&self) -> StreamBuilder<'_> {
143        StreamBuilder::new(self)
144    }
145
146    pub async fn request_batch<I: Into<BatchConfig>>(
147        &self,
148        batch: I,
149        inbox: Subject,
150    ) -> Result<(), BatchRequestError> {
151        debug!("sending batch");
152        let subject = format!(
153            "{}.CONSUMER.MSG.NEXT.{}.{}",
154            self.context.prefix, self.info.stream_name, self.info.name
155        );
156
157        let payload = serde_json::to_vec(&batch.into())
158            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
159
160        self.context
161            .client
162            .publish_with_reply(subject, inbox, payload.into())
163            .await
164            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
165        debug!("batch request sent");
166        Ok(())
167    }
168
169    /// Returns a batch of specified number of messages, or if there are less messages on the
170    /// [Stream] than requested, returns all available messages.
171    ///
172    /// # Example
173    ///
174    /// ```no_run
175    /// # #[tokio::main]
176    /// # async fn mains() -> Result<(), async_nats::Error> {
177    /// use futures::StreamExt;
178    /// use futures::TryStreamExt;
179    ///
180    /// let client = async_nats::connect("localhost:4222").await?;
181    /// let jetstream = async_nats::jetstream::new(client);
182    ///
183    /// let stream = jetstream
184    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
185    ///         name: "events".to_string(),
186    ///         max_messages: 10_000,
187    ///         ..Default::default()
188    ///     })
189    ///     .await?;
190    ///
191    /// jetstream.publish("events", "data".into()).await?;
192    ///
193    /// let consumer = stream
194    ///     .get_or_create_consumer(
195    ///         "consumer",
196    ///         async_nats::jetstream::consumer::pull::Config {
197    ///             durable_name: Some("consumer".to_string()),
198    ///             ..Default::default()
199    ///         },
200    ///     )
201    ///     .await?;
202    ///
203    /// for _ in 0..100 {
204    ///     jetstream.publish("events", "data".into()).await?;
205    /// }
206    ///
207    /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
208    /// // will finish after 100 messages, as that is the number of messages available on the
209    /// // stream.
210    /// while let Some(Ok(message)) = messages.next().await {
211    ///     println!("got message {:?}", message);
212    ///     message.ack().await?;
213    /// }
214    /// Ok(())
215    /// # }
216    /// ```
217    pub fn fetch(&self) -> FetchBuilder {
218        FetchBuilder::new(self)
219    }
220
221    /// Returns a batch of specified number of messages unless timeout happens first.
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// # #[tokio::main]
227    /// # async fn mains() -> Result<(), async_nats::Error> {
228    /// use futures::StreamExt;
229    /// use futures::TryStreamExt;
230    ///
231    /// let client = async_nats::connect("localhost:4222").await?;
232    /// let jetstream = async_nats::jetstream::new(client);
233    ///
234    /// let stream = jetstream
235    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
236    ///         name: "events".to_string(),
237    ///         max_messages: 10_000,
238    ///         ..Default::default()
239    ///     })
240    ///     .await?;
241    ///
242    /// jetstream.publish("events", "data".into()).await?;
243    ///
244    /// let consumer = stream
245    ///     .get_or_create_consumer(
246    ///         "consumer",
247    ///         async_nats::jetstream::consumer::pull::Config {
248    ///             durable_name: Some("consumer".to_string()),
249    ///             ..Default::default()
250    ///         },
251    ///     )
252    ///     .await?;
253    ///
254    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
255    /// while let Some(Ok(message)) = messages.next().await {
256    ///     println!("got message {:?}", message);
257    ///     message.ack().await?;
258    /// }
259    /// Ok(())
260    /// # }
261    /// ```
262    pub fn batch(&self) -> BatchBuilder {
263        BatchBuilder::new(self)
264    }
265
266    /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
267    /// messages in those batches.
268    ///
269    /// # Example
270    ///
271    /// ```no_run
272    /// # #[tokio::main]
273    /// # async fn mains() -> Result<(), async_nats::Error> {
274    /// use futures::StreamExt;
275    /// use futures::TryStreamExt;
276    ///
277    /// let client = async_nats::connect("localhost:4222").await?;
278    /// let jetstream = async_nats::jetstream::new(client);
279    ///
280    /// let stream = jetstream
281    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
282    ///         name: "events".to_string(),
283    ///         max_messages: 10_000,
284    ///         ..Default::default()
285    ///     })
286    ///     .await?;
287    ///
288    /// jetstream.publish("events", "data".into()).await?;
289    ///
290    /// let consumer = stream
291    ///     .get_or_create_consumer(
292    ///         "consumer",
293    ///         async_nats::jetstream::consumer::pull::Config {
294    ///             durable_name: Some("consumer".to_string()),
295    ///             ..Default::default()
296    ///         },
297    ///     )
298    ///     .await?;
299    ///
300    /// let mut iter = consumer.sequence(50).unwrap().take(10);
301    /// while let Ok(Some(mut batch)) = iter.try_next().await {
302    ///     while let Ok(Some(message)) = batch.try_next().await {
303    ///         println!("message received: {:?}", message);
304    ///     }
305    /// }
306    /// Ok(())
307    /// # }
308    /// ```
309    pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
310        let context = self.context.clone();
311        let subject = format!(
312            "{}.CONSUMER.MSG.NEXT.{}.{}",
313            self.context.prefix, self.info.stream_name, self.info.name
314        );
315
316        let request = serde_json::to_vec(&BatchConfig {
317            batch,
318            expires: Some(Duration::from_secs(60)),
319            ..Default::default()
320        })
321        .map(Bytes::from)
322        .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
323
324        Ok(Sequence {
325            context,
326            subject,
327            request,
328            pending_messages: batch,
329            next: None,
330        })
331    }
332}
333
334pub struct Batch {
335    pending_messages: usize,
336    subscriber: Subscriber,
337    context: Context,
338    timeout: Option<Pin<Box<Sleep>>>,
339    terminated: bool,
340}
341
342impl Batch {
343    async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
344        let inbox = Subject::from(consumer.context.client.new_inbox());
345        let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
346        consumer.request_batch(batch.clone(), inbox.clone()).await?;
347
348        let sleep = batch.expires.map(|expires| {
349            Box::pin(tokio::time::sleep(
350                expires.saturating_add(Duration::from_secs(5)),
351            ))
352        });
353
354        Ok(Batch {
355            pending_messages: batch.batch,
356            subscriber: subscription,
357            context: consumer.context.clone(),
358            terminated: false,
359            timeout: sleep,
360        })
361    }
362}
363
364impl futures::Stream for Batch {
365    type Item = Result<jetstream::Message, crate::Error>;
366
367    fn poll_next(
368        mut self: std::pin::Pin<&mut Self>,
369        cx: &mut std::task::Context<'_>,
370    ) -> std::task::Poll<Option<Self::Item>> {
371        if self.terminated {
372            return Poll::Ready(None);
373        }
374        if self.pending_messages == 0 {
375            self.terminated = true;
376            return Poll::Ready(None);
377        }
378        if let Some(sleep) = self.timeout.as_mut() {
379            match sleep.poll_unpin(cx) {
380                Poll::Ready(_) => {
381                    debug!("batch timeout timer triggered");
382                    // TODO(tp): Maybe we can be smarter here and before timing out, check if
383                    // we consumed all the messages from the subscription buffer in case of user
384                    // slowly consuming messages. Keep in mind that we time out here only if
385                    // for some reason we missed timeout from the server and few seconds have
386                    // passed since expected timeout message.
387                    self.terminated = true;
388                    return Poll::Ready(None);
389                }
390                Poll::Pending => (),
391            }
392        }
393        match self.subscriber.receiver.poll_recv(cx) {
394            Poll::Ready(maybe_message) => match maybe_message {
395                Some(message) => match message.status.unwrap_or(StatusCode::OK) {
396                    StatusCode::TIMEOUT => {
397                        debug!("received timeout. Iterator done");
398                        self.terminated = true;
399                        Poll::Ready(None)
400                    }
401                    StatusCode::IDLE_HEARTBEAT => {
402                        debug!("received heartbeat");
403                        Poll::Pending
404                    }
405                    // If this is fetch variant, terminate on no more messages.
406                    // We do not need to check if this is a fetch, not batch,
407                    // as only fetch will send back `NO_MESSAGES` status.
408                    StatusCode::NOT_FOUND => {
409                        debug!("received `NO_MESSAGES`. Iterator done");
410                        self.terminated = true;
411                        Poll::Ready(None)
412                    }
413                    StatusCode::OK => {
414                        debug!("received message");
415                        self.pending_messages -= 1;
416                        Poll::Ready(Some(Ok(jetstream::Message {
417                            context: self.context.clone(),
418                            message,
419                        })))
420                    }
421                    status => {
422                        debug!("received error");
423                        self.terminated = true;
424                        Poll::Ready(Some(Err(Box::new(std::io::Error::new(
425                            std::io::ErrorKind::Other,
426                            format!(
427                                "error while processing messages from the stream: {}, {:?}",
428                                status, message.description
429                            ),
430                        )))))
431                    }
432                },
433                None => Poll::Ready(None),
434            },
435            std::task::Poll::Pending => std::task::Poll::Pending,
436        }
437    }
438}
439
440pub struct Sequence {
441    context: Context,
442    subject: String,
443    request: Bytes,
444    pending_messages: usize,
445    next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
446}
447
448impl futures::Stream for Sequence {
449    type Item = Result<Batch, MessagesError>;
450
451    fn poll_next(
452        mut self: std::pin::Pin<&mut Self>,
453        cx: &mut std::task::Context<'_>,
454    ) -> std::task::Poll<Option<Self::Item>> {
455        match self.next.as_mut() {
456            None => {
457                let context = self.context.clone();
458                let subject = self.subject.clone();
459                let request = self.request.clone();
460                let pending_messages = self.pending_messages;
461
462                let next = self.next.insert(Box::pin(async move {
463                    let inbox = context.client.new_inbox();
464                    let subscriber = context
465                        .client
466                        .subscribe(inbox.clone())
467                        .await
468                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
469
470                    context
471                        .client
472                        .publish_with_reply(subject, inbox, request)
473                        .await
474                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
475
476                    // TODO(tp): Add timeout config and defaults.
477                    Ok(Batch {
478                        pending_messages,
479                        subscriber,
480                        context,
481                        terminated: false,
482                        timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
483                    })
484                }));
485
486                match next.as_mut().poll(cx) {
487                    Poll::Ready(result) => {
488                        self.next = None;
489                        Poll::Ready(Some(result.map_err(|err| {
490                            MessagesError::with_source(MessagesErrorKind::Pull, err)
491                        })))
492                    }
493                    Poll::Pending => Poll::Pending,
494                }
495            }
496
497            Some(next) => match next.as_mut().poll(cx) {
498                Poll::Ready(result) => {
499                    self.next = None;
500                    Poll::Ready(Some(result.map_err(|err| {
501                        MessagesError::with_source(MessagesErrorKind::Pull, err)
502                    })))
503                }
504                Poll::Pending => Poll::Pending,
505            },
506        }
507    }
508}
509
510impl Consumer<OrderedConfig> {
511    /// Returns a stream of messages for Ordered Pull Consumer.
512    ///
513    /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
514    /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
515    /// sees mismatch.
516    ///
517    /// # Example
518    ///
519    /// ```no_run
520    /// # #[tokio::main]
521    /// # async fn mains() -> Result<(), async_nats::Error> {
522    /// use futures::StreamExt;
523    /// use futures::TryStreamExt;
524    ///
525    /// let client = async_nats::connect("localhost:4222").await?;
526    /// let jetstream = async_nats::jetstream::new(client);
527    ///
528    /// let stream = jetstream
529    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
530    ///         name: "events".to_string(),
531    ///         max_messages: 10_000,
532    ///         ..Default::default()
533    ///     })
534    ///     .await?;
535    ///
536    /// jetstream.publish("events", "data".into()).await?;
537    ///
538    /// let consumer = stream
539    ///     .get_or_create_consumer(
540    ///         "consumer",
541    ///         async_nats::jetstream::consumer::pull::OrderedConfig {
542    ///             name: Some("consumer".to_string()),
543    ///             ..Default::default()
544    ///         },
545    ///     )
546    ///     .await?;
547    ///
548    /// let mut messages = consumer.messages().await?.take(100);
549    /// while let Some(Ok(message)) = messages.next().await {
550    ///     println!("got message {:?}", message);
551    /// }
552    /// Ok(())
553    /// # }
554    /// ```
555    pub async fn messages(self) -> Result<Ordered, StreamError> {
556        let config = Consumer {
557            config: self.config.clone().into(),
558            context: self.context.clone(),
559            info: self.info.clone(),
560        };
561        let stream = Stream::stream(
562            BatchConfig {
563                batch: 500,
564                expires: Some(Duration::from_secs(30)),
565                no_wait: false,
566                max_bytes: 0,
567                idle_heartbeat: Duration::from_secs(15),
568                min_pending: None,
569                min_ack_pending: None,
570                group: None,
571            },
572            &config,
573        )
574        .await?;
575
576        Ok(Ordered {
577            consumer_sequence: 0,
578            stream_sequence: 0,
579            missed_heartbeats: false,
580            create_stream: None,
581            context: self.context.clone(),
582            consumer_name: self
583                .config
584                .name
585                .clone()
586                .unwrap_or_else(|| self.context.client.new_inbox()),
587            consumer: self.config,
588            stream: Some(stream),
589            stream_name: self.info.stream_name.clone(),
590        })
591    }
592}
593
594/// Configuration for consumers. From a high level, the
595/// `durable_name` and `deliver_subject` fields have a particularly
596/// strong influence on the consumer's overall behavior.
597#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
598pub struct OrderedConfig {
599    /// A name of the consumer. Can be specified for both durable and ephemeral
600    /// consumers.
601    #[serde(default, skip_serializing_if = "Option::is_none")]
602    pub name: Option<String>,
603    /// A short description of the purpose of this consumer.
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub description: Option<String>,
606    #[serde(default, skip_serializing_if = "is_default")]
607    pub filter_subject: String,
608    #[cfg(feature = "server_2_10")]
609    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
610    #[serde(default, skip_serializing_if = "is_default")]
611    pub filter_subjects: Vec<String>,
612    /// Whether messages are sent as quickly as possible or at the rate of receipt
613    pub replay_policy: ReplayPolicy,
614    /// The rate of message delivery in bits per second
615    #[serde(default, skip_serializing_if = "is_default")]
616    pub rate_limit: u64,
617    /// What percentage of acknowledgments should be samples for observability, 0-100
618    #[serde(
619        rename = "sample_freq",
620        with = "super::sample_freq_deser",
621        default,
622        skip_serializing_if = "is_default"
623    )]
624    pub sample_frequency: u8,
625    /// Only deliver headers without payloads.
626    #[serde(default, skip_serializing_if = "is_default")]
627    pub headers_only: bool,
628    /// Allows for a variety of options that determine how this consumer will receive messages
629    #[serde(flatten)]
630    pub deliver_policy: DeliverPolicy,
631    /// The maximum number of waiting consumers.
632    #[serde(default, skip_serializing_if = "is_default")]
633    pub max_waiting: i64,
634    #[cfg(feature = "server_2_10")]
635    // Additional consumer metadata.
636    #[serde(default, skip_serializing_if = "is_default")]
637    pub metadata: HashMap<String, String>,
638    // Maximum number of messages that can be requested in single Pull Request.
639    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
640    // [stream]
641    pub max_batch: i64,
642    // Maximum number of bytes that can be requested in single Pull Request.
643    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
644    // [stream]
645    pub max_bytes: i64,
646    // Maximum expiry that can be set for a single Pull Request.
647    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
648    // [stream]
649    pub max_expires: Duration,
650}
651
652impl From<OrderedConfig> for Config {
653    fn from(config: OrderedConfig) -> Self {
654        Config {
655            durable_name: None,
656            name: config.name,
657            description: config.description,
658            deliver_policy: config.deliver_policy,
659            ack_policy: AckPolicy::None,
660            ack_wait: Duration::default(),
661            max_deliver: 1,
662            filter_subject: config.filter_subject,
663            #[cfg(feature = "server_2_10")]
664            filter_subjects: config.filter_subjects,
665            replay_policy: config.replay_policy,
666            rate_limit: config.rate_limit,
667            sample_frequency: config.sample_frequency,
668            max_waiting: config.max_waiting,
669            max_ack_pending: 0,
670            headers_only: config.headers_only,
671            max_batch: config.max_batch,
672            max_bytes: config.max_bytes,
673            max_expires: config.max_expires,
674            inactive_threshold: Duration::from_secs(30),
675            num_replicas: 1,
676            memory_storage: true,
677            #[cfg(feature = "server_2_10")]
678            metadata: config.metadata,
679            backoff: Vec::new(),
680            #[cfg(feature = "server_2_11")]
681            priority_policy: PriorityPolicy::None,
682            #[cfg(feature = "server_2_11")]
683            priority_groups: Vec::new(),
684            #[cfg(feature = "server_2_11")]
685            pause_until: None,
686        }
687    }
688}
689
690impl FromConsumer for OrderedConfig {
691    fn try_from_consumer_config(
692        config: crate::jetstream::consumer::Config,
693    ) -> Result<Self, crate::Error>
694    where
695        Self: Sized,
696    {
697        Ok(OrderedConfig {
698            name: config.name,
699            description: config.description,
700            filter_subject: config.filter_subject,
701            #[cfg(feature = "server_2_10")]
702            filter_subjects: config.filter_subjects,
703            replay_policy: config.replay_policy,
704            rate_limit: config.rate_limit,
705            sample_frequency: config.sample_frequency,
706            headers_only: config.headers_only,
707            deliver_policy: config.deliver_policy,
708            max_waiting: config.max_waiting,
709            #[cfg(feature = "server_2_10")]
710            metadata: config.metadata,
711            max_batch: config.max_batch,
712            max_bytes: config.max_bytes,
713            max_expires: config.max_expires,
714        })
715    }
716}
717
718impl IntoConsumerConfig for OrderedConfig {
719    fn into_consumer_config(self) -> super::Config {
720        jetstream::consumer::Config {
721            deliver_subject: None,
722            durable_name: None,
723            name: self.name,
724            description: self.description,
725            deliver_group: None,
726            deliver_policy: self.deliver_policy,
727            ack_policy: AckPolicy::None,
728            ack_wait: Duration::default(),
729            max_deliver: 1,
730            filter_subject: self.filter_subject,
731            #[cfg(feature = "server_2_10")]
732            filter_subjects: self.filter_subjects,
733            replay_policy: self.replay_policy,
734            rate_limit: self.rate_limit,
735            sample_frequency: self.sample_frequency,
736            max_waiting: self.max_waiting,
737            max_ack_pending: 0,
738            headers_only: self.headers_only,
739            flow_control: false,
740            idle_heartbeat: Duration::default(),
741            max_batch: 0,
742            max_bytes: 0,
743            max_expires: Duration::default(),
744            inactive_threshold: Duration::from_secs(30),
745            num_replicas: 1,
746            memory_storage: true,
747            #[cfg(feature = "server_2_10")]
748            metadata: self.metadata,
749            backoff: Vec::new(),
750            #[cfg(feature = "server_2_11")]
751            priority_policy: PriorityPolicy::None,
752            #[cfg(feature = "server_2_11")]
753            priority_groups: Vec::new(),
754            #[cfg(feature = "server_2_11")]
755            pause_until: None,
756        }
757    }
758}
759
760pub struct Ordered {
761    context: Context,
762    stream_name: String,
763    consumer: OrderedConfig,
764    consumer_name: String,
765    stream: Option<Stream>,
766    create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
767    consumer_sequence: u64,
768    stream_sequence: u64,
769    missed_heartbeats: bool,
770}
771
772impl futures::Stream for Ordered {
773    type Item = Result<jetstream::Message, OrderedError>;
774
775    fn poll_next(
776        mut self: Pin<&mut Self>,
777        cx: &mut std::task::Context<'_>,
778    ) -> Poll<Option<Self::Item>> {
779        let mut recreate = false;
780        // Poll messages
781        if let Some(stream) = self.stream.as_mut() {
782            match stream.poll_next_unpin(cx) {
783                Poll::Ready(message) => match message {
784                    Some(message) => match message {
785                        Ok(message) => {
786                            self.missed_heartbeats = false;
787                            let info = message.info().map_err(|err| {
788                                OrderedError::with_source(OrderedErrorKind::Other, err)
789                            })?;
790                            trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
791                                           self.consumer_sequence,
792                                           self.stream_sequence,
793                                           info.consumer_sequence,
794                                           info.stream_sequence);
795                            if info.consumer_sequence != self.consumer_sequence + 1 {
796                                debug!(
797                                    "ordered consumer mismatch. current {}, info: {}",
798                                    self.consumer_sequence, info.consumer_sequence
799                                );
800                                recreate = true;
801                                self.consumer_sequence = 0;
802                            } else {
803                                self.stream_sequence = info.stream_sequence;
804                                self.consumer_sequence = info.consumer_sequence;
805                                return Poll::Ready(Some(Ok(message)));
806                            }
807                        }
808                        Err(err) => match err.kind() {
809                            MessagesErrorKind::MissingHeartbeat => {
810                                // If we have missed heartbeats set, it means this is a second
811                                // missed heartbeat, so we need to recreate consumer.
812                                if self.missed_heartbeats {
813                                    self.consumer_sequence = 0;
814                                    recreate = true;
815                                } else {
816                                    self.missed_heartbeats = true;
817                                }
818                            }
819                            MessagesErrorKind::ConsumerDeleted => {
820                                recreate = true;
821                                self.consumer_sequence = 0;
822                            }
823                            MessagesErrorKind::Pull
824                            | MessagesErrorKind::PushBasedConsumer
825                            | MessagesErrorKind::Other => {
826                                return Poll::Ready(Some(Err(err.into())));
827                            }
828                        },
829                    },
830                    None => return Poll::Ready(None),
831                },
832                Poll::Pending => (),
833            }
834        }
835        // Recreate consumer if needed
836        if recreate {
837            self.stream = None;
838            self.create_stream = Some(Box::pin({
839                let context = self.context.clone();
840                let config = self.consumer.clone();
841                let stream_name = self.stream_name.clone();
842                let consumer_name = self.consumer_name.clone();
843                let sequence = self.stream_sequence;
844                async move {
845                    tryhard::retry_fn(|| {
846                        recreate_consumer_stream(
847                            &context,
848                            &config,
849                            &stream_name,
850                            &consumer_name,
851                            sequence,
852                        )
853                    })
854                    .retries(5)
855                    .exponential_backoff(Duration::from_millis(500))
856                    .await
857                }
858            }))
859        }
860        // check for recreation future
861        if let Some(result) = self.create_stream.as_mut() {
862            match result.poll_unpin(cx) {
863                Poll::Ready(result) => match result {
864                    Ok(stream) => {
865                        self.create_stream = None;
866                        self.stream = Some(stream);
867                        return self.poll_next(cx);
868                    }
869                    Err(err) => {
870                        return Poll::Ready(Some(Err(OrderedError::with_source(
871                            OrderedErrorKind::Recreate,
872                            err,
873                        ))))
874                    }
875                },
876                Poll::Pending => (),
877            }
878        }
879        Poll::Pending
880    }
881}
882
883pub struct Stream {
884    pending_messages: usize,
885    pending_bytes: usize,
886    request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
887    request_tx: tokio::sync::watch::Sender<()>,
888    subscriber: Subscriber,
889    batch_config: BatchConfig,
890    context: Context,
891    pending_request: bool,
892    task_handle: JoinHandle<()>,
893    terminated: bool,
894    heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
895}
896
897impl Drop for Stream {
898    fn drop(&mut self) {
899        self.task_handle.abort();
900    }
901}
902
903impl Stream {
904    async fn stream(
905        batch_config: BatchConfig,
906        consumer: &Consumer<Config>,
907    ) -> Result<Stream, StreamError> {
908        let inbox = consumer.context.client.new_inbox();
909        let subscription = consumer
910            .context
911            .client
912            .subscribe(inbox.clone())
913            .await
914            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
915        let subject = format!(
916            "{}.CONSUMER.MSG.NEXT.{}.{}",
917            consumer.context.prefix, consumer.info.stream_name, consumer.info.name
918        );
919
920        let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
921        let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
922        let task_handle = tokio::task::spawn({
923            let batch = batch_config.clone();
924            let consumer = consumer.clone();
925            let mut context = consumer.context.clone();
926            let inbox = inbox.clone();
927            async move {
928                loop {
929                    // this is just in edge case of missing response for some reason.
930                    let expires = batch_config
931                        .expires
932                        .map(|expires| {
933                            if expires.is_zero() {
934                                Either::Left(future::pending())
935                            } else {
936                                Either::Right(tokio::time::sleep(
937                                    expires.saturating_add(Duration::from_secs(5)),
938                                ))
939                            }
940                        })
941                        .unwrap_or_else(|| Either::Left(future::pending()));
942                    // Need to check previous state, as `changed` will always fire on first
943                    // call.
944                    let prev_state = context.client.state.borrow().to_owned();
945                    let mut pending_reset = false;
946
947                    tokio::select! {
948                       _ = context.client.state.changed() => {
949                            let state = context.client.state.borrow().to_owned();
950                            if !(state == crate::connection::State::Connected
951                                && prev_state != State::Connected) {
952                                    continue;
953                                }
954                            debug!("detected !Connected -> Connected state change");
955
956                            match tryhard::retry_fn(|| consumer.fetch_info()).retries(5).exponential_backoff(Duration::from_millis(500)).await {
957                                Ok(info) => {
958                                    if info.num_waiting == 0 {
959                                        pending_reset = true;
960                                    }
961                                }
962                                Err(err) => {
963                                     if let Err(err) = request_result_tx.send(Err(err)).await {
964                                        debug!("failed to sent request result: {}", err);
965                                    }
966                                },
967                            }
968                        },
969                        _ = request_rx.changed() => debug!("task received request request"),
970                        _ = expires => {
971                            pending_reset = true;
972                            debug!("expired pull request")},
973                    }
974
975                    let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
976                    let result = context
977                        .client
978                        .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
979                        .await
980                        .map(|_| pending_reset);
981                    // TODO: add tracing instead of ignoring this.
982                    request_result_tx
983                        .send(result.map(|_| pending_reset).map_err(|err| {
984                            crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
985                                .into()
986                        }))
987                        .await
988                        .ok();
989                    trace!("result send over tx");
990                }
991            }
992        });
993
994        Ok(Stream {
995            task_handle,
996            request_result_rx,
997            request_tx,
998            batch_config,
999            pending_messages: 0,
1000            pending_bytes: 0,
1001            subscriber: subscription,
1002            context: consumer.context.clone(),
1003            pending_request: false,
1004            terminated: false,
1005            heartbeat_timeout: None,
1006        })
1007    }
1008}
1009
1010#[derive(Clone, Copy, Debug, PartialEq)]
1011pub enum OrderedErrorKind {
1012    MissingHeartbeat,
1013    ConsumerDeleted,
1014    Pull,
1015    PushBasedConsumer,
1016    Recreate,
1017    Other,
1018}
1019
1020impl std::fmt::Display for OrderedErrorKind {
1021    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1022        match self {
1023            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1024            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1025            Self::Pull => write!(f, "pull request failed"),
1026            Self::Other => write!(f, "error"),
1027            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1028            Self::Recreate => write!(f, "consumer recreation failed"),
1029        }
1030    }
1031}
1032
1033pub type OrderedError = Error<OrderedErrorKind>;
1034
1035impl From<MessagesError> for OrderedError {
1036    fn from(err: MessagesError) -> Self {
1037        match err.kind() {
1038            MessagesErrorKind::MissingHeartbeat => {
1039                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1040            }
1041            MessagesErrorKind::ConsumerDeleted => {
1042                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1043            }
1044            MessagesErrorKind::Pull => OrderedError {
1045                kind: OrderedErrorKind::Pull,
1046                source: err.source,
1047            },
1048            MessagesErrorKind::PushBasedConsumer => {
1049                OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1050            }
1051            MessagesErrorKind::Other => OrderedError {
1052                kind: OrderedErrorKind::Other,
1053                source: err.source,
1054            },
1055        }
1056    }
1057}
1058
1059#[derive(Clone, Copy, Debug, PartialEq)]
1060pub enum MessagesErrorKind {
1061    MissingHeartbeat,
1062    ConsumerDeleted,
1063    Pull,
1064    PushBasedConsumer,
1065    Other,
1066}
1067
1068impl std::fmt::Display for MessagesErrorKind {
1069    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1070        match self {
1071            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1072            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1073            Self::Pull => write!(f, "pull request failed"),
1074            Self::Other => write!(f, "error"),
1075            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1076        }
1077    }
1078}
1079
1080pub type MessagesError = Error<MessagesErrorKind>;
1081
1082impl futures::Stream for Stream {
1083    type Item = Result<jetstream::Message, MessagesError>;
1084
1085    fn poll_next(
1086        mut self: std::pin::Pin<&mut Self>,
1087        cx: &mut std::task::Context<'_>,
1088    ) -> std::task::Poll<Option<Self::Item>> {
1089        if self.terminated {
1090            return Poll::Ready(None);
1091        }
1092
1093        if !self.batch_config.idle_heartbeat.is_zero() {
1094            trace!("checking idle hearbeats");
1095            let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1096            match self
1097                .heartbeat_timeout
1098                .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1099                .poll_unpin(cx)
1100            {
1101                Poll::Ready(_) => {
1102                    self.heartbeat_timeout = None;
1103                    return Poll::Ready(Some(Err(MessagesError::new(
1104                        MessagesErrorKind::MissingHeartbeat,
1105                    ))));
1106                }
1107                Poll::Pending => (),
1108            }
1109        }
1110
1111        loop {
1112            trace!("pending messages: {}", self.pending_messages);
1113            if (self.pending_messages <= self.batch_config.batch / 2
1114                || (self.batch_config.max_bytes > 0
1115                    && self.pending_bytes <= self.batch_config.max_bytes / 2))
1116                && !self.pending_request
1117            {
1118                debug!("pending messages reached threshold to send new fetch request");
1119                self.request_tx.send(()).ok();
1120                self.pending_request = true;
1121            }
1122
1123            match self.request_result_rx.poll_recv(cx) {
1124                Poll::Ready(resp) => match resp {
1125                    Some(resp) => match resp {
1126                        Ok(reset) => {
1127                            trace!("request response: {:?}", reset);
1128                            debug!("request sent, setting pending messages");
1129                            if reset {
1130                                self.pending_messages = self.batch_config.batch;
1131                                self.pending_bytes = self.batch_config.max_bytes;
1132                            } else {
1133                                self.pending_messages += self.batch_config.batch;
1134                                self.pending_bytes += self.batch_config.max_bytes;
1135                            }
1136                            self.pending_request = false;
1137                            continue;
1138                        }
1139                        Err(err) => {
1140                            return Poll::Ready(Some(Err(MessagesError::with_source(
1141                                MessagesErrorKind::Pull,
1142                                err,
1143                            ))))
1144                        }
1145                    },
1146                    None => return Poll::Ready(None),
1147                },
1148                Poll::Pending => {
1149                    trace!("pending result");
1150                }
1151            }
1152
1153            trace!("polling subscriber");
1154            match self.subscriber.receiver.poll_recv(cx) {
1155                Poll::Ready(maybe_message) => {
1156                    self.heartbeat_timeout = None;
1157                    match maybe_message {
1158                        Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1159                            StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1160                                debug!("received status message: {:?}", message);
1161                                // If consumer has been deleted, error and shutdown the iterator.
1162                                if message.description.as_deref() == Some("Consumer Deleted") {
1163                                    self.terminated = true;
1164                                    return Poll::Ready(Some(Err(MessagesError::new(
1165                                        MessagesErrorKind::ConsumerDeleted,
1166                                    ))));
1167                                }
1168                                // If consumer is not pull based, error and shutdown the iterator.
1169                                if message.description.as_deref() == Some("Consumer is push based")
1170                                {
1171                                    self.terminated = true;
1172                                    return Poll::Ready(Some(Err(MessagesError::new(
1173                                        MessagesErrorKind::PushBasedConsumer,
1174                                    ))));
1175                                }
1176
1177                                // Do accounting for messages left after terminated/completed pull request.
1178                                let pending_messages = message
1179                                    .headers
1180                                    .as_ref()
1181                                    .and_then(|headers| headers.get("Nats-Pending-Messages"))
1182                                    .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1183                                    .map_err(|err| {
1184                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1185                                    })?;
1186
1187                                let pending_bytes = message
1188                                    .headers
1189                                    .as_ref()
1190                                    .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1191                                    .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1192                                    .map_err(|err| {
1193                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1194                                    })?;
1195
1196                                debug!(
1197                                    "timeout reached. remaining messages: {}, bytes {}",
1198                                    pending_messages, pending_bytes
1199                                );
1200                                self.pending_messages =
1201                                    self.pending_messages.saturating_sub(pending_messages);
1202                                trace!("message bytes len: {}", pending_bytes);
1203                                self.pending_bytes =
1204                                    self.pending_bytes.saturating_sub(pending_bytes);
1205                                continue;
1206                            }
1207                            // Idle Hearbeat means we have no messages, but consumer is fine.
1208                            StatusCode::IDLE_HEARTBEAT => {
1209                                debug!("received idle heartbeat");
1210                                continue;
1211                            }
1212                            // We got an message from a stream.
1213                            StatusCode::OK => {
1214                                trace!("message received");
1215                                self.pending_messages = self.pending_messages.saturating_sub(1);
1216                                self.pending_bytes =
1217                                    self.pending_bytes.saturating_sub(message.length);
1218                                return Poll::Ready(Some(Ok(jetstream::Message {
1219                                    context: self.context.clone(),
1220                                    message,
1221                                })));
1222                            }
1223                            status => {
1224                                debug!("received unknown message: {:?}", message);
1225                                return Poll::Ready(Some(Err(MessagesError::with_source(
1226                                    MessagesErrorKind::Other,
1227                                    format!(
1228                                        "error while processing messages from the stream: {}, {:?}",
1229                                        status, message.description
1230                                    ),
1231                                ))));
1232                            }
1233                        },
1234                        None => return Poll::Ready(None),
1235                    }
1236                }
1237                Poll::Pending => {
1238                    debug!("subscriber still pending");
1239                    return std::task::Poll::Pending;
1240                }
1241            }
1242        }
1243    }
1244}
1245
1246/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1247///
1248/// # Examples
1249///
1250/// ```no_run
1251/// # #[tokio::main]
1252/// # async fn main() -> Result<(), async_nats::Error>  {
1253/// use futures::StreamExt;
1254/// use async_nats::jetstream::consumer::PullConsumer;
1255/// let client = async_nats::connect("localhost:4222").await?;
1256/// let jetstream = async_nats::jetstream::new(client);
1257///
1258/// let consumer: PullConsumer = jetstream
1259///     .get_stream("events").await?
1260///     .get_consumer("pull").await?;
1261///
1262/// let mut messages = consumer.stream()
1263///     .max_messages_per_batch(100)
1264///     .max_bytes_per_batch(1024)
1265///     .messages().await?;
1266///
1267/// while let Some(message) = messages.next().await {
1268///     let message = message?;
1269///     println!("message: {:?}", message);
1270///     message.ack().await?;
1271/// }
1272/// # Ok(())
1273/// # }
1274pub struct StreamBuilder<'a> {
1275    batch: usize,
1276    max_bytes: usize,
1277    heartbeat: Duration,
1278    expires: Duration,
1279    group: Option<String>,
1280    min_pending: Option<usize>,
1281    min_ack_pending: Option<usize>,
1282    consumer: &'a Consumer<Config>,
1283}
1284
1285impl<'a> StreamBuilder<'a> {
1286    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1287        StreamBuilder {
1288            consumer,
1289            batch: 200,
1290            max_bytes: 0,
1291            expires: Duration::from_secs(30),
1292            heartbeat: Duration::default(),
1293            group: None,
1294            min_pending: None,
1295            min_ack_pending: None,
1296        }
1297    }
1298
1299    /// Sets max bytes that can be buffered on the Client while processing already received
1300    /// messages.
1301    /// Higher values will yield better performance, but also potentially increase memory usage if
1302    /// application is acknowledging messages much slower than they arrive.
1303    ///
1304    /// Default values should provide reasonable balance between performance and memory usage.
1305    ///
1306    /// # Examples
1307    ///
1308    /// ```no_run
1309    /// # #[tokio::main]
1310    /// # async fn main() -> Result<(), async_nats::Error>  {
1311    /// use async_nats::jetstream::consumer::PullConsumer;
1312    /// use futures::StreamExt;
1313    /// let client = async_nats::connect("localhost:4222").await?;
1314    /// let jetstream = async_nats::jetstream::new(client);
1315    ///
1316    /// let consumer: PullConsumer = jetstream
1317    ///     .get_stream("events")
1318    ///     .await?
1319    ///     .get_consumer("pull")
1320    ///     .await?;
1321    ///
1322    /// let mut messages = consumer
1323    ///     .stream()
1324    ///     .max_bytes_per_batch(1024)
1325    ///     .messages()
1326    ///     .await?;
1327    ///
1328    /// while let Some(message) = messages.next().await {
1329    ///     let message = message?;
1330    ///     println!("message: {:?}", message);
1331    ///     message.ack().await?;
1332    /// }
1333    /// # Ok(())
1334    /// # }
1335    /// ```
1336    pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1337        self.max_bytes = max_bytes;
1338        self
1339    }
1340
1341    /// Sets max number of messages that can be buffered on the Client while processing already received
1342    /// messages.
1343    /// Higher values will yield better performance, but also potentially increase memory usage if
1344    /// application is acknowledging messages much slower than they arrive.
1345    ///
1346    /// Default values should provide reasonable balance between performance and memory usage.
1347    ///
1348    /// # Examples
1349    ///
1350    /// ```no_run
1351    /// # #[tokio::main]
1352    /// # async fn main() -> Result<(), async_nats::Error>  {
1353    /// use async_nats::jetstream::consumer::PullConsumer;
1354    /// use futures::StreamExt;
1355    /// let client = async_nats::connect("localhost:4222").await?;
1356    /// let jetstream = async_nats::jetstream::new(client);
1357    ///
1358    /// let consumer: PullConsumer = jetstream
1359    ///     .get_stream("events")
1360    ///     .await?
1361    ///     .get_consumer("pull")
1362    ///     .await?;
1363    ///
1364    /// let mut messages = consumer
1365    ///     .stream()
1366    ///     .max_messages_per_batch(100)
1367    ///     .messages()
1368    ///     .await?;
1369    ///
1370    /// while let Some(message) = messages.next().await {
1371    ///     let message = message?;
1372    ///     println!("message: {:?}", message);
1373    ///     message.ack().await?;
1374    /// }
1375    /// # Ok(())
1376    /// # }
1377    /// ```
1378    pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1379        self.batch = batch;
1380        self
1381    }
1382
1383    /// Sets heartbeat which will be send by the server if there are no messages for a given
1384    /// [Consumer] pending.
1385    ///
1386    /// # Examples
1387    ///
1388    /// ```no_run
1389    /// # #[tokio::main]
1390    /// # async fn main() -> Result<(), async_nats::Error>  {
1391    /// use async_nats::jetstream::consumer::PullConsumer;
1392    /// use futures::StreamExt;
1393    /// let client = async_nats::connect("localhost:4222").await?;
1394    /// let jetstream = async_nats::jetstream::new(client);
1395    ///
1396    /// let consumer: PullConsumer = jetstream
1397    ///     .get_stream("events")
1398    ///     .await?
1399    ///     .get_consumer("pull")
1400    ///     .await?;
1401    ///
1402    /// let mut messages = consumer
1403    ///     .stream()
1404    ///     .heartbeat(std::time::Duration::from_secs(10))
1405    ///     .messages()
1406    ///     .await?;
1407    ///
1408    /// while let Some(message) = messages.next().await {
1409    ///     let message = message?;
1410    ///     println!("message: {:?}", message);
1411    ///     message.ack().await?;
1412    /// }
1413    /// # Ok(())
1414    /// # }
1415    /// ```
1416    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1417        self.heartbeat = heartbeat;
1418        self
1419    }
1420
1421    /// Low level API that does not need tweaking for most use cases.
1422    /// Sets how long each batch request waits for whole batch of messages before timing out.
1423    /// [Consumer] pending.
1424    ///
1425    /// # Examples
1426    ///
1427    /// ```no_run
1428    /// # #[tokio::main]
1429    /// # async fn main() -> Result<(), async_nats::Error>  {
1430    /// use async_nats::jetstream::consumer::PullConsumer;
1431    /// use futures::StreamExt;
1432    /// let client = async_nats::connect("localhost:4222").await?;
1433    /// let jetstream = async_nats::jetstream::new(client);
1434    ///
1435    /// let consumer: PullConsumer = jetstream
1436    ///     .get_stream("events")
1437    ///     .await?
1438    ///     .get_consumer("pull")
1439    ///     .await?;
1440    ///
1441    /// let mut messages = consumer
1442    ///     .stream()
1443    ///     .expires(std::time::Duration::from_secs(30))
1444    ///     .messages()
1445    ///     .await?;
1446    ///
1447    /// while let Some(message) = messages.next().await {
1448    ///     let message = message?;
1449    ///     println!("message: {:?}", message);
1450    ///     message.ack().await?;
1451    /// }
1452    /// # Ok(())
1453    /// # }
1454    /// ```
1455    pub fn expires(mut self, expires: Duration) -> Self {
1456        self.expires = expires;
1457        self
1458    }
1459
1460    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1461    /// messages for a [Consumer].
1462    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1463    ///
1464    /// # Examples
1465    ///
1466    /// ```no_run
1467    /// # #[tokio::main]
1468    /// # async fn main() -> Result<(), async_nats::Error>  {
1469    /// use async_nats::jetstream::consumer::PullConsumer;
1470    /// use futures::StreamExt;
1471    /// let client = async_nats::connect("localhost:4222").await?;
1472    /// let jetstream = async_nats::jetstream::new(client);
1473    ///
1474    /// let consumer: PullConsumer = jetstream
1475    ///     .get_stream("events")
1476    ///     .await?
1477    ///     .get_consumer("pull")
1478    ///     .await?;
1479    ///
1480    /// let mut messages = consumer
1481    ///     .stream()
1482    ///     .expires(std::time::Duration::from_secs(30))
1483    ///     .group("A")
1484    ///     .min_pending(100)
1485    ///     .messages()
1486    ///     .await?;
1487    ///
1488    /// while let Some(message) = messages.next().await {
1489    ///     let message = message?;
1490    ///     println!("message: {:?}", message);
1491    ///     message.ack().await?;
1492    /// }
1493    /// # Ok(())
1494    /// # }
1495    /// ```
1496    pub fn min_pending(mut self, min_pending: usize) -> Self {
1497        self.min_pending = Some(min_pending);
1498        self
1499    }
1500
1501    /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1502    /// messages for a [Consumer].
1503    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1504    ///
1505    /// # Examples
1506    ///
1507    /// ```no_run
1508    /// # #[tokio::main]
1509    /// # async fn main() -> Result<(), async_nats::Error>  {
1510    /// use async_nats::jetstream::consumer::PullConsumer;
1511    /// use futures::StreamExt;
1512    /// let client = async_nats::connect("localhost:4222").await?;
1513    /// let jetstream = async_nats::jetstream::new(client);
1514    ///
1515    /// let consumer: PullConsumer = jetstream
1516    ///     .get_stream("events")
1517    ///     .await?
1518    ///     .get_consumer("pull")
1519    ///     .await?;
1520    ///
1521    /// let mut messages = consumer
1522    ///     .stream()
1523    ///     .expires(std::time::Duration::from_secs(30))
1524    ///     .group("A")
1525    ///     .min_ack_pending(100)
1526    ///     .messages()
1527    ///     .await?;
1528    ///
1529    /// while let Some(message) = messages.next().await {
1530    ///     let message = message?;
1531    ///     println!("message: {:?}", message);
1532    ///     message.ack().await?;
1533    /// }
1534    /// # Ok(())
1535    /// # }
1536    /// ```
1537    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1538        self.min_ack_pending = Some(min_ack_pending);
1539        self
1540    }
1541
1542    /// Setting group when using [Consumer] with [Config::priority_groups].
1543    ///
1544    /// # Examples
1545    ///
1546    /// ```no_run
1547    /// # #[tokio::main]
1548    /// # async fn main() -> Result<(), async_nats::Error>  {
1549    /// use async_nats::jetstream::consumer::PullConsumer;
1550    /// use futures::StreamExt;
1551    /// let client = async_nats::connect("localhost:4222").await?;
1552    /// let jetstream = async_nats::jetstream::new(client);
1553    ///
1554    /// let consumer: PullConsumer = jetstream
1555    ///     .get_stream("events")
1556    ///     .await?
1557    ///     .get_consumer("pull")
1558    ///     .await?;
1559    ///
1560    /// let mut messages = consumer
1561    ///     .stream()
1562    ///     .expires(std::time::Duration::from_secs(30))
1563    ///     .group("A")
1564    ///     .min_ack_pending(100)
1565    ///     .messages()
1566    ///     .await?;
1567    ///
1568    /// while let Some(message) = messages.next().await {
1569    ///     let message = message?;
1570    ///     println!("message: {:?}", message);
1571    ///     message.ack().await?;
1572    /// }
1573    /// # Ok(())
1574    /// # }
1575    /// ```
1576    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1577        self.group = Some(group.into());
1578        self
1579    }
1580
1581    /// Creates actual [Stream] with provided configuration.
1582    ///
1583    /// # Examples
1584    ///
1585    /// ```no_run
1586    /// # #[tokio::main]
1587    /// # async fn main() -> Result<(), async_nats::Error>  {
1588    /// use async_nats::jetstream::consumer::PullConsumer;
1589    /// use futures::StreamExt;
1590    /// let client = async_nats::connect("localhost:4222").await?;
1591    /// let jetstream = async_nats::jetstream::new(client);
1592    ///
1593    /// let consumer: PullConsumer = jetstream
1594    ///     .get_stream("events")
1595    ///     .await?
1596    ///     .get_consumer("pull")
1597    ///     .await?;
1598    ///
1599    /// let mut messages = consumer
1600    ///     .stream()
1601    ///     .max_messages_per_batch(100)
1602    ///     .messages()
1603    ///     .await?;
1604    ///
1605    /// while let Some(message) = messages.next().await {
1606    ///     let message = message?;
1607    ///     println!("message: {:?}", message);
1608    ///     message.ack().await?;
1609    /// }
1610    /// # Ok(())
1611    /// # }
1612    /// ```
1613    pub async fn messages(self) -> Result<Stream, StreamError> {
1614        Stream::stream(
1615            BatchConfig {
1616                batch: self.batch,
1617                expires: Some(self.expires),
1618                no_wait: false,
1619                max_bytes: self.max_bytes,
1620                idle_heartbeat: self.heartbeat,
1621                min_pending: self.min_pending,
1622                group: self.group,
1623                min_ack_pending: self.min_ack_pending,
1624            },
1625            self.consumer,
1626        )
1627        .await
1628    }
1629}
1630
1631/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1632///
1633/// # Examples
1634///
1635/// ```no_run
1636/// # #[tokio::main]
1637/// # async fn main() -> Result<(), async_nats::Error>  {
1638/// use async_nats::jetstream::consumer::PullConsumer;
1639/// use futures::StreamExt;
1640/// let client = async_nats::connect("localhost:4222").await?;
1641/// let jetstream = async_nats::jetstream::new(client);
1642///
1643/// let consumer: PullConsumer = jetstream
1644///     .get_stream("events")
1645///     .await?
1646///     .get_consumer("pull")
1647///     .await?;
1648///
1649/// let mut messages = consumer
1650///     .fetch()
1651///     .max_messages(100)
1652///     .max_bytes(1024)
1653///     .messages()
1654///     .await?;
1655///
1656/// while let Some(message) = messages.next().await {
1657///     let message = message?;
1658///     println!("message: {:?}", message);
1659///     message.ack().await?;
1660/// }
1661/// # Ok(())
1662/// # }
1663/// ```
1664pub struct FetchBuilder<'a> {
1665    batch: usize,
1666    max_bytes: usize,
1667    heartbeat: Duration,
1668    expires: Option<Duration>,
1669    min_pending: Option<usize>,
1670    min_ack_pending: Option<usize>,
1671    group: Option<String>,
1672    consumer: &'a Consumer<Config>,
1673}
1674
1675impl<'a> FetchBuilder<'a> {
1676    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1677        FetchBuilder {
1678            consumer,
1679            batch: 200,
1680            max_bytes: 0,
1681            expires: None,
1682            min_pending: None,
1683            min_ack_pending: None,
1684            group: None,
1685            heartbeat: Duration::default(),
1686        }
1687    }
1688
1689    /// Sets max bytes that can be buffered on the Client while processing already received
1690    /// messages.
1691    /// Higher values will yield better performance, but also potentially increase memory usage if
1692    /// application is acknowledging messages much slower than they arrive.
1693    ///
1694    /// Default values should provide reasonable balance between performance and memory usage.
1695    ///
1696    /// # Examples
1697    ///
1698    /// ```no_run
1699    /// # #[tokio::main]
1700    /// # async fn main() -> Result<(), async_nats::Error>  {
1701    /// use futures::StreamExt;
1702    /// let client = async_nats::connect("localhost:4222").await?;
1703    /// let jetstream = async_nats::jetstream::new(client);
1704    ///
1705    /// let consumer = jetstream
1706    ///     .get_stream("events")
1707    ///     .await?
1708    ///     .get_consumer("pull")
1709    ///     .await?;
1710    ///
1711    /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1712    ///
1713    /// while let Some(message) = messages.next().await {
1714    ///     let message = message?;
1715    ///     println!("message: {:?}", message);
1716    ///     message.ack().await?;
1717    /// }
1718    /// # Ok(())
1719    /// # }
1720    /// ```
1721    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1722        self.max_bytes = max_bytes;
1723        self
1724    }
1725
1726    /// Sets max number of messages that can be buffered on the Client while processing already received
1727    /// messages.
1728    /// Higher values will yield better performance, but also potentially increase memory usage if
1729    /// application is acknowledging messages much slower than they arrive.
1730    ///
1731    /// Default values should provide reasonable balance between performance and memory usage.
1732    ///
1733    /// # Examples
1734    ///
1735    /// ```no_run
1736    /// # #[tokio::main]
1737    /// # async fn main() -> Result<(), async_nats::Error>  {
1738    /// use futures::StreamExt;
1739    /// let client = async_nats::connect("localhost:4222").await?;
1740    /// let jetstream = async_nats::jetstream::new(client);
1741    ///
1742    /// let consumer = jetstream
1743    ///     .get_stream("events")
1744    ///     .await?
1745    ///     .get_consumer("pull")
1746    ///     .await?;
1747    ///
1748    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1749    ///
1750    /// while let Some(message) = messages.next().await {
1751    ///     let message = message?;
1752    ///     println!("message: {:?}", message);
1753    ///     message.ack().await?;
1754    /// }
1755    /// # Ok(())
1756    /// # }
1757    /// ```
1758    pub fn max_messages(mut self, batch: usize) -> Self {
1759        self.batch = batch;
1760        self
1761    }
1762
1763    /// Sets heartbeat which will be send by the server if there are no messages for a given
1764    /// [Consumer] pending.
1765    ///
1766    /// # Examples
1767    ///
1768    /// ```no_run
1769    /// # #[tokio::main]
1770    /// # async fn main() -> Result<(), async_nats::Error>  {
1771    /// use async_nats::jetstream::consumer::PullConsumer;
1772    /// use futures::StreamExt;
1773    /// let client = async_nats::connect("localhost:4222").await?;
1774    /// let jetstream = async_nats::jetstream::new(client);
1775    ///
1776    /// let consumer = jetstream
1777    ///     .get_stream("events")
1778    ///     .await?
1779    ///     .get_consumer("pull")
1780    ///     .await?;
1781    ///
1782    /// let mut messages = consumer
1783    ///     .fetch()
1784    ///     .heartbeat(std::time::Duration::from_secs(10))
1785    ///     .messages()
1786    ///     .await?;
1787    ///
1788    /// while let Some(message) = messages.next().await {
1789    ///     let message = message?;
1790    ///     println!("message: {:?}", message);
1791    ///     message.ack().await?;
1792    /// }
1793    /// # Ok(())
1794    /// # }
1795    /// ```
1796    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1797        self.heartbeat = heartbeat;
1798        self
1799    }
1800
1801    /// Low level API that does not need tweaking for most use cases.
1802    /// Sets how long each batch request waits for whole batch of messages before timing out.
1803    /// [Consumer] pending.
1804    ///
1805    /// # Examples
1806    ///
1807    /// ```no_run
1808    /// # #[tokio::main]
1809    /// # async fn main() -> Result<(), async_nats::Error>  {
1810    /// use async_nats::jetstream::consumer::PullConsumer;
1811    /// use futures::StreamExt;
1812    ///
1813    /// let client = async_nats::connect("localhost:4222").await?;
1814    /// let jetstream = async_nats::jetstream::new(client);
1815    ///
1816    /// let consumer: PullConsumer = jetstream
1817    ///     .get_stream("events")
1818    ///     .await?
1819    ///     .get_consumer("pull")
1820    ///     .await?;
1821    ///
1822    /// let mut messages = consumer
1823    ///     .fetch()
1824    ///     .expires(std::time::Duration::from_secs(30))
1825    ///     .messages()
1826    ///     .await?;
1827    ///
1828    /// while let Some(message) = messages.next().await {
1829    ///     let message = message?;
1830    ///     println!("message: {:?}", message);
1831    ///     message.ack().await?;
1832    /// }
1833    /// # Ok(())
1834    /// # }
1835    /// ```
1836    pub fn expires(mut self, expires: Duration) -> Self {
1837        self.expires = Some(expires);
1838        self
1839    }
1840
1841    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1842    /// messages.
1843    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1844    /// [PriorityPolicy::Overflow] set.
1845    ///
1846    /// # Examples
1847    ///
1848    /// ```no_run
1849    /// # #[tokio::main]
1850    /// # async fn main() -> Result<(), async_nats::Error>  {
1851    /// use async_nats::jetstream::consumer::PullConsumer;
1852    /// use futures::StreamExt;
1853    ///
1854    /// let client = async_nats::connect("localhost:4222").await?;
1855    /// let jetstream = async_nats::jetstream::new(client);
1856    ///
1857    /// let consumer: PullConsumer = jetstream
1858    ///     .get_stream("events")
1859    ///     .await?
1860    ///     .get_consumer("pull")
1861    ///     .await?;
1862    ///
1863    /// let mut messages = consumer
1864    ///     .fetch()
1865    ///     .expires(std::time::Duration::from_secs(30))
1866    ///     .group("A")
1867    ///     .min_pending(100)
1868    ///     .messages()
1869    ///     .await?;
1870    ///
1871    /// while let Some(message) = messages.next().await {
1872    ///     let message = message?;
1873    ///     println!("message: {:?}", message);
1874    ///     message.ack().await?;
1875    /// }
1876    /// # Ok(())
1877    /// # }
1878    /// ```
1879    pub fn min_pending(mut self, min_pending: usize) -> Self {
1880        self.min_pending = Some(min_pending);
1881        self
1882    }
1883
1884    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1885    /// messages.
1886    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1887    /// [PriorityPolicy::Overflow] set.
1888    ///
1889    /// # Examples
1890    ///
1891    /// ```no_run
1892    /// # #[tokio::main]
1893    /// # async fn main() -> Result<(), async_nats::Error>  {
1894    /// use async_nats::jetstream::consumer::PullConsumer;
1895    /// use futures::StreamExt;
1896    ///
1897    /// let client = async_nats::connect("localhost:4222").await?;
1898    /// let jetstream = async_nats::jetstream::new(client);
1899    ///
1900    /// let consumer: PullConsumer = jetstream
1901    ///     .get_stream("events")
1902    ///     .await?
1903    ///     .get_consumer("pull")
1904    ///     .await?;
1905    ///
1906    /// let mut messages = consumer
1907    ///     .fetch()
1908    ///     .expires(std::time::Duration::from_secs(30))
1909    ///     .group("A")
1910    ///     .min_ack_pending(100)
1911    ///     .messages()
1912    ///     .await?;
1913    ///
1914    /// while let Some(message) = messages.next().await {
1915    ///     let message = message?;
1916    ///     println!("message: {:?}", message);
1917    ///     message.ack().await?;
1918    /// }
1919    /// # Ok(())
1920    /// # }
1921    /// ```
1922    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1923        self.min_ack_pending = Some(min_ack_pending);
1924        self
1925    }
1926
1927    /// Setting group when using [Consumer] with [PriorityPolicy].
1928    ///
1929    /// # Examples
1930    ///
1931    /// ```no_run
1932    /// # #[tokio::main]
1933    /// # async fn main() -> Result<(), async_nats::Error>  {
1934    /// use async_nats::jetstream::consumer::PullConsumer;
1935    /// use futures::StreamExt;
1936    ///
1937    /// let client = async_nats::connect("localhost:4222").await?;
1938    /// let jetstream = async_nats::jetstream::new(client);
1939    ///
1940    /// let consumer: PullConsumer = jetstream
1941    ///     .get_stream("events")
1942    ///     .await?
1943    ///     .get_consumer("pull")
1944    ///     .await?;
1945    ///
1946    /// let mut messages = consumer
1947    ///     .fetch()
1948    ///     .expires(std::time::Duration::from_secs(30))
1949    ///     .group("A")
1950    ///     .min_ack_pending(100)
1951    ///     .messages()
1952    ///     .await?;
1953    ///
1954    /// while let Some(message) = messages.next().await {
1955    ///     let message = message?;
1956    ///     println!("message: {:?}", message);
1957    ///     message.ack().await?;
1958    /// }
1959    /// # Ok(())
1960    /// # }
1961    /// ```
1962    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1963        self.group = Some(group.into());
1964        self
1965    }
1966
1967    /// Creates actual [Stream] with provided configuration.
1968    ///
1969    /// # Examples
1970    ///
1971    /// ```no_run
1972    /// # #[tokio::main]
1973    /// # async fn main() -> Result<(), async_nats::Error>  {
1974    /// use async_nats::jetstream::consumer::PullConsumer;
1975    /// use futures::StreamExt;
1976    /// let client = async_nats::connect("localhost:4222").await?;
1977    /// let jetstream = async_nats::jetstream::new(client);
1978    ///
1979    /// let consumer: PullConsumer = jetstream
1980    ///     .get_stream("events")
1981    ///     .await?
1982    ///     .get_consumer("pull")
1983    ///     .await?;
1984    ///
1985    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1986    ///
1987    /// while let Some(message) = messages.next().await {
1988    ///     let message = message?;
1989    ///     println!("message: {:?}", message);
1990    ///     message.ack().await?;
1991    /// }
1992    /// # Ok(())
1993    /// # }
1994    /// ```
1995    pub async fn messages(self) -> Result<Batch, BatchError> {
1996        Batch::batch(
1997            BatchConfig {
1998                batch: self.batch,
1999                expires: self.expires,
2000                no_wait: true,
2001                max_bytes: self.max_bytes,
2002                idle_heartbeat: self.heartbeat,
2003                min_pending: self.min_pending,
2004                min_ack_pending: self.min_ack_pending,
2005                group: self.group,
2006            },
2007            self.consumer,
2008        )
2009        .await
2010    }
2011}
2012
2013/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2014///
2015/// # Examples
2016///
2017/// ```no_run
2018/// # #[tokio::main]
2019/// # async fn main() -> Result<(), async_nats::Error>  {
2020/// use async_nats::jetstream::consumer::PullConsumer;
2021/// use futures::StreamExt;
2022/// let client = async_nats::connect("localhost:4222").await?;
2023/// let jetstream = async_nats::jetstream::new(client);
2024///
2025/// let consumer: PullConsumer = jetstream
2026///     .get_stream("events")
2027///     .await?
2028///     .get_consumer("pull")
2029///     .await?;
2030///
2031/// let mut messages = consumer
2032///     .batch()
2033///     .max_messages(100)
2034///     .max_bytes(1024)
2035///     .messages()
2036///     .await?;
2037///
2038/// while let Some(message) = messages.next().await {
2039///     let message = message?;
2040///     println!("message: {:?}", message);
2041///     message.ack().await?;
2042/// }
2043/// # Ok(())
2044/// # }
2045/// ```
2046pub struct BatchBuilder<'a> {
2047    batch: usize,
2048    max_bytes: usize,
2049    heartbeat: Duration,
2050    expires: Duration,
2051    min_pending: Option<usize>,
2052    min_ack_pending: Option<usize>,
2053    group: Option<String>,
2054    consumer: &'a Consumer<Config>,
2055}
2056
2057impl<'a> BatchBuilder<'a> {
2058    pub fn new(consumer: &'a Consumer<Config>) -> Self {
2059        BatchBuilder {
2060            consumer,
2061            batch: 200,
2062            max_bytes: 0,
2063            expires: Duration::ZERO,
2064            heartbeat: Duration::default(),
2065            min_pending: None,
2066            min_ack_pending: None,
2067            group: None,
2068        }
2069    }
2070
2071    /// Sets max bytes that can be buffered on the Client while processing already received
2072    /// messages.
2073    /// Higher values will yield better performance, but also potentially increase memory usage if
2074    /// application is acknowledging messages much slower than they arrive.
2075    ///
2076    /// Default values should provide reasonable balance between performance and memory usage.
2077    ///
2078    /// # Examples
2079    ///
2080    /// ```no_run
2081    /// # #[tokio::main]
2082    /// # async fn main() -> Result<(), async_nats::Error>  {
2083    /// use async_nats::jetstream::consumer::PullConsumer;
2084    /// use futures::StreamExt;
2085    /// let client = async_nats::connect("localhost:4222").await?;
2086    /// let jetstream = async_nats::jetstream::new(client);
2087    ///
2088    /// let consumer: PullConsumer = jetstream
2089    ///     .get_stream("events")
2090    ///     .await?
2091    ///     .get_consumer("pull")
2092    ///     .await?;
2093    ///
2094    /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2095    ///
2096    /// while let Some(message) = messages.next().await {
2097    ///     let message = message?;
2098    ///     println!("message: {:?}", message);
2099    ///     message.ack().await?;
2100    /// }
2101    /// # Ok(())
2102    /// # }
2103    /// ```
2104    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2105        self.max_bytes = max_bytes;
2106        self
2107    }
2108
2109    /// Sets max number of messages that can be buffered on the Client while processing already received
2110    /// messages.
2111    /// Higher values will yield better performance, but also potentially increase memory usage if
2112    /// application is acknowledging messages much slower than they arrive.
2113    ///
2114    /// Default values should provide reasonable balance between performance and memory usage.
2115    ///
2116    /// # Examples
2117    ///
2118    /// ```no_run
2119    /// # #[tokio::main]
2120    /// # async fn main() -> Result<(), async_nats::Error>  {
2121    /// use async_nats::jetstream::consumer::PullConsumer;
2122    /// use futures::StreamExt;
2123    /// let client = async_nats::connect("localhost:4222").await?;
2124    /// let jetstream = async_nats::jetstream::new(client);
2125    ///
2126    /// let consumer: PullConsumer = jetstream
2127    ///     .get_stream("events")
2128    ///     .await?
2129    ///     .get_consumer("pull")
2130    ///     .await?;
2131    ///
2132    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2133    ///
2134    /// while let Some(message) = messages.next().await {
2135    ///     let message = message?;
2136    ///     println!("message: {:?}", message);
2137    ///     message.ack().await?;
2138    /// }
2139    /// # Ok(())
2140    /// # }
2141    /// ```
2142    pub fn max_messages(mut self, batch: usize) -> Self {
2143        self.batch = batch;
2144        self
2145    }
2146
2147    /// Sets heartbeat which will be send by the server if there are no messages for a given
2148    /// [Consumer] pending.
2149    ///
2150    /// # Examples
2151    ///
2152    /// ```no_run
2153    /// # #[tokio::main]
2154    /// # async fn main() -> Result<(), async_nats::Error>  {
2155    /// use async_nats::jetstream::consumer::PullConsumer;
2156    /// use futures::StreamExt;
2157    /// let client = async_nats::connect("localhost:4222").await?;
2158    /// let jetstream = async_nats::jetstream::new(client);
2159    ///
2160    /// let consumer: PullConsumer = jetstream
2161    ///     .get_stream("events")
2162    ///     .await?
2163    ///     .get_consumer("pull")
2164    ///     .await?;
2165    ///
2166    /// let mut messages = consumer
2167    ///     .batch()
2168    ///     .heartbeat(std::time::Duration::from_secs(10))
2169    ///     .messages()
2170    ///     .await?;
2171    ///
2172    /// while let Some(message) = messages.next().await {
2173    ///     let message = message?;
2174    ///     println!("message: {:?}", message);
2175    ///     message.ack().await?;
2176    /// }
2177    /// # Ok(())
2178    /// # }
2179    /// ```
2180    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2181        self.heartbeat = heartbeat;
2182        self
2183    }
2184
2185    /// Sets overflow threshold for minimum pending messages before this stream will start getting
2186    /// messages.
2187    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2188    /// [PriorityPolicy::Overflow] set.
2189    ///
2190    /// # Examples
2191    ///
2192    /// ```no_run
2193    /// # #[tokio::main]
2194    /// # async fn main() -> Result<(), async_nats::Error>  {
2195    /// use async_nats::jetstream::consumer::PullConsumer;
2196    /// use futures::StreamExt;
2197    ///
2198    /// let client = async_nats::connect("localhost:4222").await?;
2199    /// let jetstream = async_nats::jetstream::new(client);
2200    ///
2201    /// let consumer: PullConsumer = jetstream
2202    ///     .get_stream("events")
2203    ///     .await?
2204    ///     .get_consumer("pull")
2205    ///     .await?;
2206    ///
2207    /// let mut messages = consumer
2208    ///     .batch()
2209    ///     .expires(std::time::Duration::from_secs(30))
2210    ///     .group("A")
2211    ///     .min_pending(100)
2212    ///     .messages()
2213    ///     .await?;
2214    ///
2215    /// while let Some(message) = messages.next().await {
2216    ///     let message = message?;
2217    ///     println!("message: {:?}", message);
2218    ///     message.ack().await?;
2219    /// }
2220    /// # Ok(())
2221    /// # }
2222    /// ```
2223    pub fn min_pending(mut self, min_pending: usize) -> Self {
2224        self.min_pending = Some(min_pending);
2225        self
2226    }
2227
2228    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2229    /// messages.
2230    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2231    /// [PriorityPolicy::Overflow] set.
2232    ///
2233    /// # Examples
2234    ///
2235    /// ```no_run
2236    /// # #[tokio::main]
2237    /// # async fn main() -> Result<(), async_nats::Error>  {
2238    /// use async_nats::jetstream::consumer::PullConsumer;
2239    /// use futures::StreamExt;
2240    ///
2241    /// let client = async_nats::connect("localhost:4222").await?;
2242    /// let jetstream = async_nats::jetstream::new(client);
2243    ///
2244    /// let consumer: PullConsumer = jetstream
2245    ///     .get_stream("events")
2246    ///     .await?
2247    ///     .get_consumer("pull")
2248    ///     .await?;
2249    ///
2250    /// let mut messages = consumer
2251    ///     .batch()
2252    ///     .expires(std::time::Duration::from_secs(30))
2253    ///     .group("A")
2254    ///     .min_ack_pending(100)
2255    ///     .messages()
2256    ///     .await?;
2257    ///
2258    /// while let Some(message) = messages.next().await {
2259    ///     let message = message?;
2260    ///     println!("message: {:?}", message);
2261    ///     message.ack().await?;
2262    /// }
2263    /// # Ok(())
2264    /// # }
2265    /// ```
2266    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2267        self.min_ack_pending = Some(min_ack_pending);
2268        self
2269    }
2270
2271    /// Setting group when using [Consumer] with [PriorityPolicy].
2272    ///
2273    /// # Examples
2274    ///
2275    /// ```no_run
2276    /// # #[tokio::main]
2277    /// # async fn main() -> Result<(), async_nats::Error>  {
2278    /// use async_nats::jetstream::consumer::PullConsumer;
2279    /// use futures::StreamExt;
2280    ///
2281    /// let client = async_nats::connect("localhost:4222").await?;
2282    /// let jetstream = async_nats::jetstream::new(client);
2283    ///
2284    /// let consumer: PullConsumer = jetstream
2285    ///     .get_stream("events")
2286    ///     .await?
2287    ///     .get_consumer("pull")
2288    ///     .await?;
2289    ///
2290    /// let mut messages = consumer
2291    ///     .batch()
2292    ///     .expires(std::time::Duration::from_secs(30))
2293    ///     .group("A")
2294    ///     .min_ack_pending(100)
2295    ///     .messages()
2296    ///     .await?;
2297    ///
2298    /// while let Some(message) = messages.next().await {
2299    ///     let message = message?;
2300    ///     println!("message: {:?}", message);
2301    ///     message.ack().await?;
2302    /// }
2303    /// # Ok(())
2304    /// # }
2305    /// ```
2306    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2307        self.group = Some(group.into());
2308        self
2309    }
2310
2311    /// Low level API that does not need tweaking for most use cases.
2312    /// Sets how long each batch request waits for whole batch of messages before timing out.
2313    /// [Consumer] pending.
2314    ///
2315    /// # Examples
2316    ///
2317    /// ```no_run
2318    /// # #[tokio::main]
2319    /// # async fn main() -> Result<(), async_nats::Error>  {
2320    /// use async_nats::jetstream::consumer::PullConsumer;
2321    /// use futures::StreamExt;
2322    /// let client = async_nats::connect("localhost:4222").await?;
2323    /// let jetstream = async_nats::jetstream::new(client);
2324    ///
2325    /// let consumer: PullConsumer = jetstream
2326    ///     .get_stream("events")
2327    ///     .await?
2328    ///     .get_consumer("pull")
2329    ///     .await?;
2330    ///
2331    /// let mut messages = consumer
2332    ///     .batch()
2333    ///     .expires(std::time::Duration::from_secs(30))
2334    ///     .messages()
2335    ///     .await?;
2336    ///
2337    /// while let Some(message) = messages.next().await {
2338    ///     let message = message?;
2339    ///     println!("message: {:?}", message);
2340    ///     message.ack().await?;
2341    /// }
2342    /// # Ok(())
2343    /// # }
2344    /// ```
2345    pub fn expires(mut self, expires: Duration) -> Self {
2346        self.expires = expires;
2347        self
2348    }
2349
2350    /// Creates actual [Stream] with provided configuration.
2351    ///
2352    /// # Examples
2353    ///
2354    /// ```no_run
2355    /// # #[tokio::main]
2356    /// # async fn main() -> Result<(), async_nats::Error>  {
2357    /// use async_nats::jetstream::consumer::PullConsumer;
2358    /// use futures::StreamExt;
2359    /// let client = async_nats::connect("localhost:4222").await?;
2360    /// let jetstream = async_nats::jetstream::new(client);
2361    ///
2362    /// let consumer: PullConsumer = jetstream
2363    ///     .get_stream("events")
2364    ///     .await?
2365    ///     .get_consumer("pull")
2366    ///     .await?;
2367    ///
2368    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2369    ///
2370    /// while let Some(message) = messages.next().await {
2371    ///     let message = message?;
2372    ///     println!("message: {:?}", message);
2373    ///     message.ack().await?;
2374    /// }
2375    /// # Ok(())
2376    /// # }
2377    /// ```
2378    pub async fn messages(self) -> Result<Batch, BatchError> {
2379        let config = BatchConfig {
2380            batch: self.batch,
2381            expires: Some(self.expires),
2382            no_wait: false,
2383            max_bytes: self.max_bytes,
2384            idle_heartbeat: self.heartbeat,
2385            min_pending: self.min_pending,
2386            min_ack_pending: self.min_ack_pending,
2387            group: self.group,
2388        };
2389        Batch::batch(config, self.consumer).await
2390    }
2391}
2392
2393/// Used for next Pull Request for Pull Consumer
2394#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2395pub struct BatchConfig {
2396    /// The number of messages that are being requested to be delivered.
2397    pub batch: usize,
2398    /// The optional number of nanoseconds that the server will store this next request for
2399    /// before forgetting about the pending batch size.
2400    #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2401    pub expires: Option<Duration>,
2402    /// This optionally causes the server not to store this pending request at all, but when there are no
2403    /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2404    /// can know when you reached the end of the stream for example. A 409 is returned if the
2405    /// Consumer has reached MaxAckPending limits.
2406    #[serde(skip_serializing_if = "is_default")]
2407    pub no_wait: bool,
2408
2409    /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2410    /// Whichever value is reached first, batch will complete.
2411    pub max_bytes: usize,
2412
2413    /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2414    /// client
2415    #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2416    pub idle_heartbeat: Duration,
2417
2418    pub min_pending: Option<usize>,
2419    pub min_ack_pending: Option<usize>,
2420    pub group: Option<String>,
2421}
2422
2423fn is_default<T: Default + Eq>(t: &T) -> bool {
2424    t == &T::default()
2425}
2426
2427#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2428pub struct Config {
2429    /// Setting `durable_name` to `Some(...)` will cause this consumer
2430    /// to be "durable". This may be a good choice for workloads that
2431    /// benefit from the `JetStream` server or cluster remembering the
2432    /// progress of consumers for fault tolerance purposes. If a consumer
2433    /// crashes, the `JetStream` server or cluster will remember which
2434    /// messages the consumer acknowledged. When the consumer recovers,
2435    /// this information will allow the consumer to resume processing
2436    /// where it left off. If you're unsure, set this to `Some(...)`.
2437    ///
2438    /// Setting `durable_name` to `None` will cause this consumer to
2439    /// be "ephemeral". This may be a good choice for workloads where
2440    /// you don't need the `JetStream` server to remember the consumer's
2441    /// progress in the case of a crash, such as certain "high churn"
2442    /// workloads or workloads where a crashed instance is not required
2443    /// to recover.
2444    #[serde(default, skip_serializing_if = "Option::is_none")]
2445    pub durable_name: Option<String>,
2446    /// A name of the consumer. Can be specified for both durable and ephemeral
2447    /// consumers.
2448    #[serde(default, skip_serializing_if = "Option::is_none")]
2449    pub name: Option<String>,
2450    /// A short description of the purpose of this consumer.
2451    #[serde(default, skip_serializing_if = "Option::is_none")]
2452    pub description: Option<String>,
2453    /// Allows for a variety of options that determine how this consumer will receive messages
2454    #[serde(flatten)]
2455    pub deliver_policy: DeliverPolicy,
2456    /// How messages should be acknowledged
2457    pub ack_policy: AckPolicy,
2458    /// How long to allow messages to remain un-acknowledged before attempting redelivery
2459    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2460    pub ack_wait: Duration,
2461    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2462    #[serde(default, skip_serializing_if = "is_default")]
2463    pub max_deliver: i64,
2464    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2465    #[serde(default, skip_serializing_if = "is_default")]
2466    pub filter_subject: String,
2467    #[cfg(feature = "server_2_10")]
2468    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2469    #[serde(default, skip_serializing_if = "is_default")]
2470    pub filter_subjects: Vec<String>,
2471    /// Whether messages are sent as quickly as possible or at the rate of receipt
2472    pub replay_policy: ReplayPolicy,
2473    /// The rate of message delivery in bits per second
2474    #[serde(default, skip_serializing_if = "is_default")]
2475    pub rate_limit: u64,
2476    /// What percentage of acknowledgments should be samples for observability, 0-100
2477    #[serde(
2478        rename = "sample_freq",
2479        with = "super::sample_freq_deser",
2480        default,
2481        skip_serializing_if = "is_default"
2482    )]
2483    pub sample_frequency: u8,
2484    /// The maximum number of waiting consumers.
2485    #[serde(default, skip_serializing_if = "is_default")]
2486    pub max_waiting: i64,
2487    /// The maximum number of unacknowledged messages that may be
2488    /// in-flight before pausing sending additional messages to
2489    /// this consumer.
2490    #[serde(default, skip_serializing_if = "is_default")]
2491    pub max_ack_pending: i64,
2492    /// Only deliver headers without payloads.
2493    #[serde(default, skip_serializing_if = "is_default")]
2494    pub headers_only: bool,
2495    /// Maximum size of a request batch
2496    #[serde(default, skip_serializing_if = "is_default")]
2497    pub max_batch: i64,
2498    /// Maximum value of request max_bytes
2499    #[serde(default, skip_serializing_if = "is_default")]
2500    pub max_bytes: i64,
2501    /// Maximum value for request expiration
2502    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2503    pub max_expires: Duration,
2504    /// Threshold for consumer inactivity
2505    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2506    pub inactive_threshold: Duration,
2507    /// Number of consumer replicas
2508    #[serde(default, skip_serializing_if = "is_default")]
2509    pub num_replicas: usize,
2510    /// Force consumer to use memory storage.
2511    #[serde(default, skip_serializing_if = "is_default")]
2512    pub memory_storage: bool,
2513    #[cfg(feature = "server_2_10")]
2514    // Additional consumer metadata.
2515    #[serde(default, skip_serializing_if = "is_default")]
2516    pub metadata: HashMap<String, String>,
2517    /// Custom backoff for missed acknowledgments.
2518    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2519    pub backoff: Vec<Duration>,
2520
2521    /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2522    #[cfg(feature = "server_2_11")]
2523    #[serde(default, skip_serializing_if = "is_default")]
2524    pub priority_policy: PriorityPolicy,
2525    /// Priority groups for this consumer. Currently only one group is supported and is used
2526    /// in conjunction with [Config::priority_policy].
2527    #[cfg(feature = "server_2_11")]
2528    #[serde(default, skip_serializing_if = "is_default")]
2529    pub priority_groups: Vec<String>,
2530    /// For suspending the consumer until the deadline.
2531    #[cfg(feature = "server_2_11")]
2532    #[serde(
2533        default,
2534        with = "rfc3339::option",
2535        skip_serializing_if = "Option::is_none"
2536    )]
2537    pub pause_until: Option<OffsetDateTime>,
2538}
2539
2540impl IntoConsumerConfig for &Config {
2541    fn into_consumer_config(self) -> consumer::Config {
2542        self.clone().into_consumer_config()
2543    }
2544}
2545
2546impl IntoConsumerConfig for Config {
2547    fn into_consumer_config(self) -> consumer::Config {
2548        jetstream::consumer::Config {
2549            deliver_subject: None,
2550            name: self.name,
2551            durable_name: self.durable_name,
2552            description: self.description,
2553            deliver_group: None,
2554            deliver_policy: self.deliver_policy,
2555            ack_policy: self.ack_policy,
2556            ack_wait: self.ack_wait,
2557            max_deliver: self.max_deliver,
2558            filter_subject: self.filter_subject,
2559            #[cfg(feature = "server_2_10")]
2560            filter_subjects: self.filter_subjects,
2561            replay_policy: self.replay_policy,
2562            rate_limit: self.rate_limit,
2563            sample_frequency: self.sample_frequency,
2564            max_waiting: self.max_waiting,
2565            max_ack_pending: self.max_ack_pending,
2566            headers_only: self.headers_only,
2567            flow_control: false,
2568            idle_heartbeat: Duration::default(),
2569            max_batch: self.max_batch,
2570            max_bytes: self.max_bytes,
2571            max_expires: self.max_expires,
2572            inactive_threshold: self.inactive_threshold,
2573            num_replicas: self.num_replicas,
2574            memory_storage: self.memory_storage,
2575            #[cfg(feature = "server_2_10")]
2576            metadata: self.metadata,
2577            backoff: self.backoff,
2578            #[cfg(feature = "server_2_11")]
2579            priority_policy: self.priority_policy,
2580            #[cfg(feature = "server_2_11")]
2581            priority_groups: self.priority_groups,
2582            #[cfg(feature = "server_2_11")]
2583            pause_until: self.pause_until,
2584        }
2585    }
2586}
2587impl FromConsumer for Config {
2588    fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2589        if config.deliver_subject.is_some() {
2590            return Err(Box::new(std::io::Error::new(
2591                std::io::ErrorKind::Other,
2592                "pull consumer cannot have delivery subject",
2593            )));
2594        }
2595        Ok(Config {
2596            durable_name: config.durable_name,
2597            name: config.name,
2598            description: config.description,
2599            deliver_policy: config.deliver_policy,
2600            ack_policy: config.ack_policy,
2601            ack_wait: config.ack_wait,
2602            max_deliver: config.max_deliver,
2603            filter_subject: config.filter_subject,
2604            #[cfg(feature = "server_2_10")]
2605            filter_subjects: config.filter_subjects,
2606            replay_policy: config.replay_policy,
2607            rate_limit: config.rate_limit,
2608            sample_frequency: config.sample_frequency,
2609            max_waiting: config.max_waiting,
2610            max_ack_pending: config.max_ack_pending,
2611            headers_only: config.headers_only,
2612            max_batch: config.max_batch,
2613            max_bytes: config.max_bytes,
2614            max_expires: config.max_expires,
2615            inactive_threshold: config.inactive_threshold,
2616            num_replicas: config.num_replicas,
2617            memory_storage: config.memory_storage,
2618            #[cfg(feature = "server_2_10")]
2619            metadata: config.metadata,
2620            backoff: config.backoff,
2621            #[cfg(feature = "server_2_11")]
2622            priority_policy: config.priority_policy,
2623            #[cfg(feature = "server_2_11")]
2624            priority_groups: config.priority_groups,
2625            #[cfg(feature = "server_2_11")]
2626            pause_until: config.pause_until,
2627        })
2628    }
2629}
2630
2631#[derive(Clone, Copy, Debug, PartialEq)]
2632pub enum BatchRequestErrorKind {
2633    Publish,
2634    Flush,
2635    Serialize,
2636}
2637
2638impl std::fmt::Display for BatchRequestErrorKind {
2639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2640        match self {
2641            Self::Publish => write!(f, "publish failed"),
2642            Self::Flush => write!(f, "flush failed"),
2643            Self::Serialize => write!(f, "serialize failed"),
2644        }
2645    }
2646}
2647
2648pub type BatchRequestError = Error<BatchRequestErrorKind>;
2649
2650#[derive(Clone, Copy, Debug, PartialEq)]
2651pub enum BatchErrorKind {
2652    Subscribe,
2653    Pull,
2654    Flush,
2655    Serialize,
2656}
2657
2658impl std::fmt::Display for BatchErrorKind {
2659    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2660        match self {
2661            Self::Pull => write!(f, "pull request failed"),
2662            Self::Flush => write!(f, "flush failed"),
2663            Self::Serialize => write!(f, "serialize failed"),
2664            Self::Subscribe => write!(f, "subscribe failed"),
2665        }
2666    }
2667}
2668
2669pub type BatchError = Error<BatchErrorKind>;
2670
2671impl From<SubscribeError> for BatchError {
2672    fn from(err: SubscribeError) -> Self {
2673        BatchError::with_source(BatchErrorKind::Subscribe, err)
2674    }
2675}
2676
2677impl From<BatchRequestError> for BatchError {
2678    fn from(err: BatchRequestError) -> Self {
2679        BatchError::with_source(BatchErrorKind::Pull, err)
2680    }
2681}
2682
2683#[derive(Clone, Copy, Debug, PartialEq)]
2684pub enum ConsumerRecreateErrorKind {
2685    GetStream,
2686    Recreate,
2687    TimedOut,
2688}
2689
2690impl std::fmt::Display for ConsumerRecreateErrorKind {
2691    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2692        match self {
2693            Self::GetStream => write!(f, "error getting stream"),
2694            Self::Recreate => write!(f, "consumer creation failed"),
2695            Self::TimedOut => write!(f, "timed out"),
2696        }
2697    }
2698}
2699
2700pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2701
2702async fn recreate_consumer_stream(
2703    context: &Context,
2704    config: &OrderedConfig,
2705    stream_name: &str,
2706    consumer_name: &str,
2707    sequence: u64,
2708) -> Result<Stream, ConsumerRecreateError> {
2709    let span = tracing::span!(
2710        tracing::Level::DEBUG,
2711        "recreate_ordered_consumer",
2712        stream_name = stream_name,
2713        consumer_name = consumer_name,
2714        sequence = sequence
2715    );
2716    let _span_handle = span.enter();
2717    let config = config.to_owned();
2718    trace!("delete old consumer before creating new one");
2719
2720    tokio::time::timeout(
2721        Duration::from_secs(5),
2722        context.delete_consumer_from_stream(consumer_name, stream_name),
2723    )
2724    .await
2725    .ok();
2726
2727    let deliver_policy = {
2728        if sequence == 0 {
2729            DeliverPolicy::All
2730        } else {
2731            DeliverPolicy::ByStartSequence {
2732                start_sequence: sequence + 1,
2733            }
2734        }
2735    };
2736    trace!("create the new ordered consumer for sequence {}", sequence);
2737    let consumer = tokio::time::timeout(
2738        Duration::from_secs(5),
2739        context.create_consumer_on_stream(
2740            jetstream::consumer::pull::OrderedConfig {
2741                deliver_policy,
2742                ..config.clone()
2743            },
2744            stream_name,
2745        ),
2746    )
2747    .await
2748    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2749    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2750
2751    let config = Consumer {
2752        config: config.clone().into(),
2753        context: context.clone(),
2754        info: consumer.info,
2755    };
2756
2757    trace!("create iterator");
2758    let stream = tokio::time::timeout(
2759        Duration::from_secs(5),
2760        Stream::stream(
2761            BatchConfig {
2762                batch: 500,
2763                expires: Some(Duration::from_secs(30)),
2764                no_wait: false,
2765                max_bytes: 0,
2766                idle_heartbeat: Duration::from_secs(15),
2767                min_pending: None,
2768                min_ack_pending: None,
2769                group: None,
2770            },
2771            &config,
2772        ),
2773    )
2774    .await
2775    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2776    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2777    trace!("recreated consumer");
2778    stream
2779}