async_nats/jetstream/consumer/
pull.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures_util::{
16    future::{BoxFuture, Either},
17    FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32    connection::State,
33    error::Error,
34    jetstream::{self, Context},
35    StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44    backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45    StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50    /// Returns a stream of messages for Pull Consumer.
51    ///
52    /// # Example
53    ///
54    /// ```no_run
55    /// # #[tokio::main]
56    /// # async fn mains() -> Result<(), async_nats::Error> {
57    /// use futures_util::StreamExt;
58    /// use futures_util::TryStreamExt;
59    ///
60    /// let client = async_nats::connect("localhost:4222").await?;
61    /// let jetstream = async_nats::jetstream::new(client);
62    ///
63    /// let stream = jetstream
64    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
65    ///         name: "events".to_string(),
66    ///         max_messages: 10_000,
67    ///         ..Default::default()
68    ///     })
69    ///     .await?;
70    ///
71    /// jetstream.publish("events", "data".into()).await?;
72    ///
73    /// let consumer = stream
74    ///     .get_or_create_consumer(
75    ///         "consumer",
76    ///         async_nats::jetstream::consumer::pull::Config {
77    ///             durable_name: Some("consumer".to_string()),
78    ///             ..Default::default()
79    ///         },
80    ///     )
81    ///     .await?;
82    ///
83    /// let mut messages = consumer.messages().await?.take(100);
84    /// while let Some(Ok(message)) = messages.next().await {
85    ///     println!("got message {:?}", message);
86    ///     message.ack().await?;
87    /// }
88    /// Ok(())
89    /// # }
90    /// ```
91    pub async fn messages(&self) -> Result<Stream, StreamError> {
92        Stream::stream(
93            BatchConfig {
94                batch: 200,
95                expires: Some(Duration::from_secs(30)),
96                no_wait: false,
97                max_bytes: 0,
98                idle_heartbeat: Duration::from_secs(15),
99                min_pending: None,
100                min_ack_pending: None,
101                group: None,
102                #[cfg(feature = "server_2_12")]
103                priority: None,
104            },
105            self,
106        )
107        .await
108    }
109
110    /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
111    /// messages or bytes buffered.
112    ///
113    /// # Examples
114    ///
115    /// ```no_run
116    /// # #[tokio::main]
117    /// # async fn main() -> Result<(), async_nats::Error>  {
118    /// use async_nats::jetstream::consumer::PullConsumer;
119    /// use futures_util::StreamExt;
120    /// let client = async_nats::connect("localhost:4222").await?;
121    /// let jetstream = async_nats::jetstream::new(client);
122    ///
123    /// let consumer: PullConsumer = jetstream
124    ///     .get_stream("events")
125    ///     .await?
126    ///     .get_consumer("pull")
127    ///     .await?;
128    ///
129    /// let mut messages = consumer
130    ///     .stream()
131    ///     .max_messages_per_batch(100)
132    ///     .max_bytes_per_batch(1024)
133    ///     .messages()
134    ///     .await?;
135    ///
136    /// while let Some(message) = messages.next().await {
137    ///     let message = message?;
138    ///     println!("message: {:?}", message);
139    ///     message.ack().await?;
140    /// }
141    /// # Ok(())
142    /// # }
143    /// ```
144    pub fn stream(&self) -> StreamBuilder<'_> {
145        StreamBuilder::new(self)
146    }
147
148    pub async fn request_batch<I: Into<BatchConfig>>(
149        &self,
150        batch: I,
151        inbox: Subject,
152    ) -> Result<(), BatchRequestError> {
153        debug!("sending batch");
154        let subject = format!(
155            "{}.CONSUMER.MSG.NEXT.{}.{}",
156            self.context.prefix, self.info.stream_name, self.info.name
157        );
158
159        let payload = serde_json::to_vec(&batch.into())
160            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
161
162        self.context
163            .client
164            .publish_with_reply(subject, inbox, payload.into())
165            .await
166            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
167        debug!("batch request sent");
168        Ok(())
169    }
170
171    /// Returns a batch of specified number of messages, or if there are less messages on the
172    /// [Stream] than requested, returns all available messages.
173    ///
174    /// # Example
175    ///
176    /// ```no_run
177    /// # #[tokio::main]
178    /// # async fn mains() -> Result<(), async_nats::Error> {
179    /// use futures_util::StreamExt;
180    /// use futures_util::TryStreamExt;
181    ///
182    /// let client = async_nats::connect("localhost:4222").await?;
183    /// let jetstream = async_nats::jetstream::new(client);
184    ///
185    /// let stream = jetstream
186    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
187    ///         name: "events".to_string(),
188    ///         max_messages: 10_000,
189    ///         ..Default::default()
190    ///     })
191    ///     .await?;
192    ///
193    /// jetstream.publish("events", "data".into()).await?;
194    ///
195    /// let consumer = stream
196    ///     .get_or_create_consumer(
197    ///         "consumer",
198    ///         async_nats::jetstream::consumer::pull::Config {
199    ///             durable_name: Some("consumer".to_string()),
200    ///             ..Default::default()
201    ///         },
202    ///     )
203    ///     .await?;
204    ///
205    /// for _ in 0..100 {
206    ///     jetstream.publish("events", "data".into()).await?;
207    /// }
208    ///
209    /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
210    /// // will finish after 100 messages, as that is the number of messages available on the
211    /// // stream.
212    /// while let Some(Ok(message)) = messages.next().await {
213    ///     println!("got message {:?}", message);
214    ///     message.ack().await?;
215    /// }
216    /// Ok(())
217    /// # }
218    /// ```
219    pub fn fetch(&self) -> FetchBuilder<'_> {
220        FetchBuilder::new(self)
221    }
222
223    /// Returns a batch of specified number of messages unless timeout happens first.
224    ///
225    /// # Example
226    ///
227    /// ```no_run
228    /// # #[tokio::main]
229    /// # async fn mains() -> Result<(), async_nats::Error> {
230    /// use futures_util::StreamExt;
231    /// use futures_util::TryStreamExt;
232    ///
233    /// let client = async_nats::connect("localhost:4222").await?;
234    /// let jetstream = async_nats::jetstream::new(client);
235    ///
236    /// let stream = jetstream
237    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
238    ///         name: "events".to_string(),
239    ///         max_messages: 10_000,
240    ///         ..Default::default()
241    ///     })
242    ///     .await?;
243    ///
244    /// jetstream.publish("events", "data".into()).await?;
245    ///
246    /// let consumer = stream
247    ///     .get_or_create_consumer(
248    ///         "consumer",
249    ///         async_nats::jetstream::consumer::pull::Config {
250    ///             durable_name: Some("consumer".to_string()),
251    ///             ..Default::default()
252    ///         },
253    ///     )
254    ///     .await?;
255    ///
256    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
257    /// while let Some(Ok(message)) = messages.next().await {
258    ///     println!("got message {:?}", message);
259    ///     message.ack().await?;
260    /// }
261    /// Ok(())
262    /// # }
263    /// ```
264    pub fn batch(&self) -> BatchBuilder<'_> {
265        BatchBuilder::new(self)
266    }
267
268    /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
269    /// messages in those batches.
270    ///
271    /// # Example
272    ///
273    /// ```no_run
274    /// # #[tokio::main]
275    /// # async fn mains() -> Result<(), async_nats::Error> {
276    /// use futures_util::StreamExt;
277    /// use futures_util::TryStreamExt;
278    ///
279    /// let client = async_nats::connect("localhost:4222").await?;
280    /// let jetstream = async_nats::jetstream::new(client);
281    ///
282    /// let stream = jetstream
283    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
284    ///         name: "events".to_string(),
285    ///         max_messages: 10_000,
286    ///         ..Default::default()
287    ///     })
288    ///     .await?;
289    ///
290    /// jetstream.publish("events", "data".into()).await?;
291    ///
292    /// let consumer = stream
293    ///     .get_or_create_consumer(
294    ///         "consumer",
295    ///         async_nats::jetstream::consumer::pull::Config {
296    ///             durable_name: Some("consumer".to_string()),
297    ///             ..Default::default()
298    ///         },
299    ///     )
300    ///     .await?;
301    ///
302    /// let mut iter = consumer.sequence(50).unwrap().take(10);
303    /// while let Ok(Some(mut batch)) = iter.try_next().await {
304    ///     while let Ok(Some(message)) = batch.try_next().await {
305    ///         println!("message received: {:?}", message);
306    ///     }
307    /// }
308    /// Ok(())
309    /// # }
310    /// ```
311    pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
312        let context = self.context.clone();
313        let subject = format!(
314            "{}.CONSUMER.MSG.NEXT.{}.{}",
315            self.context.prefix, self.info.stream_name, self.info.name
316        );
317
318        let request = serde_json::to_vec(&BatchConfig {
319            batch,
320            expires: Some(Duration::from_secs(60)),
321            ..Default::default()
322        })
323        .map(Bytes::from)
324        .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
325
326        Ok(Sequence {
327            context,
328            subject,
329            request,
330            pending_messages: batch,
331            next: None,
332        })
333    }
334}
335
336pub struct Batch {
337    pending_messages: usize,
338    subscriber: Subscriber,
339    context: Context,
340    timeout: Option<Pin<Box<Sleep>>>,
341    terminated: bool,
342}
343
344impl Batch {
345    async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
346        let inbox = Subject::from(consumer.context.client.new_inbox());
347        let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
348        consumer.request_batch(batch.clone(), inbox.clone()).await?;
349
350        let sleep = batch.expires.map(|expires| {
351            Box::pin(tokio::time::sleep(
352                expires.saturating_add(Duration::from_secs(5)),
353            ))
354        });
355
356        Ok(Batch {
357            pending_messages: batch.batch,
358            subscriber: subscription,
359            context: consumer.context.clone(),
360            terminated: false,
361            timeout: sleep,
362        })
363    }
364}
365
366impl futures_util::Stream for Batch {
367    type Item = Result<jetstream::Message, crate::Error>;
368
369    fn poll_next(
370        mut self: std::pin::Pin<&mut Self>,
371        cx: &mut std::task::Context<'_>,
372    ) -> std::task::Poll<Option<Self::Item>> {
373        if self.terminated {
374            return Poll::Ready(None);
375        }
376        if self.pending_messages == 0 {
377            self.terminated = true;
378            return Poll::Ready(None);
379        }
380        if let Some(sleep) = self.timeout.as_mut() {
381            match sleep.poll_unpin(cx) {
382                Poll::Ready(_) => {
383                    debug!("batch timeout timer triggered");
384                    // TODO(tp): Maybe we can be smarter here and before timing out, check if
385                    // we consumed all the messages from the subscription buffer in case of user
386                    // slowly consuming messages. Keep in mind that we time out here only if
387                    // for some reason we missed timeout from the server and few seconds have
388                    // passed since expected timeout message.
389                    self.terminated = true;
390                    return Poll::Ready(None);
391                }
392                Poll::Pending => (),
393            }
394        }
395        match self.subscriber.receiver.poll_recv(cx) {
396            Poll::Ready(maybe_message) => match maybe_message {
397                Some(message) => match message.status.unwrap_or(StatusCode::OK) {
398                    StatusCode::TIMEOUT => {
399                        debug!("received timeout. Iterator done");
400                        self.terminated = true;
401                        Poll::Ready(None)
402                    }
403                    StatusCode::IDLE_HEARTBEAT => {
404                        debug!("received heartbeat");
405                        Poll::Pending
406                    }
407                    // If this is fetch variant, terminate on no more messages.
408                    // We do not need to check if this is a fetch, not batch,
409                    // as only fetch will send back `NO_MESSAGES` status.
410                    StatusCode::NOT_FOUND => {
411                        debug!("received `NO_MESSAGES`. Iterator done");
412                        self.terminated = true;
413                        Poll::Ready(None)
414                    }
415                    StatusCode::OK => {
416                        debug!("received message");
417                        self.pending_messages -= 1;
418                        Poll::Ready(Some(Ok(jetstream::Message {
419                            context: self.context.clone(),
420                            message,
421                        })))
422                    }
423                    status => {
424                        debug!("received error");
425                        self.terminated = true;
426                        Poll::Ready(Some(Err(Box::new(std::io::Error::other(format!(
427                            "error while processing messages from the stream: {}, {:?}",
428                            status, message.description
429                        ))))))
430                    }
431                },
432                None => Poll::Ready(None),
433            },
434            std::task::Poll::Pending => std::task::Poll::Pending,
435        }
436    }
437}
438
439pub struct Sequence {
440    context: Context,
441    subject: String,
442    request: Bytes,
443    pending_messages: usize,
444    next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
445}
446
447impl futures_util::Stream for Sequence {
448    type Item = Result<Batch, MessagesError>;
449
450    fn poll_next(
451        mut self: std::pin::Pin<&mut Self>,
452        cx: &mut std::task::Context<'_>,
453    ) -> std::task::Poll<Option<Self::Item>> {
454        match self.next.as_mut() {
455            None => {
456                let context = self.context.clone();
457                let subject = self.subject.clone();
458                let request = self.request.clone();
459                let pending_messages = self.pending_messages;
460
461                let next = self.next.insert(Box::pin(async move {
462                    let inbox = context.client.new_inbox();
463                    let subscriber = context
464                        .client
465                        .subscribe(inbox.clone())
466                        .await
467                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
468
469                    context
470                        .client
471                        .publish_with_reply(subject, inbox, request)
472                        .await
473                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
474
475                    // TODO(tp): Add timeout config and defaults.
476                    Ok(Batch {
477                        pending_messages,
478                        subscriber,
479                        context,
480                        terminated: false,
481                        timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
482                    })
483                }));
484
485                match next.as_mut().poll(cx) {
486                    Poll::Ready(result) => {
487                        self.next = None;
488                        Poll::Ready(Some(result.map_err(|err| {
489                            MessagesError::with_source(MessagesErrorKind::Pull, err)
490                        })))
491                    }
492                    Poll::Pending => Poll::Pending,
493                }
494            }
495
496            Some(next) => match next.as_mut().poll(cx) {
497                Poll::Ready(result) => {
498                    self.next = None;
499                    Poll::Ready(Some(result.map_err(|err| {
500                        MessagesError::with_source(MessagesErrorKind::Pull, err)
501                    })))
502                }
503                Poll::Pending => Poll::Pending,
504            },
505        }
506    }
507}
508
509impl Consumer<OrderedConfig> {
510    /// Returns a stream of messages for Ordered Pull Consumer.
511    ///
512    /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
513    /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
514    /// sees mismatch.
515    ///
516    /// # Example
517    ///
518    /// ```no_run
519    /// # #[tokio::main]
520    /// # async fn mains() -> Result<(), async_nats::Error> {
521    /// use futures_util::StreamExt;
522    /// use futures_util::TryStreamExt;
523    ///
524    /// let client = async_nats::connect("localhost:4222").await?;
525    /// let jetstream = async_nats::jetstream::new(client);
526    ///
527    /// let stream = jetstream
528    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
529    ///         name: "events".to_string(),
530    ///         max_messages: 10_000,
531    ///         ..Default::default()
532    ///     })
533    ///     .await?;
534    ///
535    /// jetstream.publish("events", "data".into()).await?;
536    ///
537    /// let consumer = stream
538    ///     .get_or_create_consumer(
539    ///         "consumer",
540    ///         async_nats::jetstream::consumer::pull::OrderedConfig {
541    ///             name: Some("consumer".to_string()),
542    ///             ..Default::default()
543    ///         },
544    ///     )
545    ///     .await?;
546    ///
547    /// let mut messages = consumer.messages().await?.take(100);
548    /// while let Some(Ok(message)) = messages.next().await {
549    ///     println!("got message {:?}", message);
550    /// }
551    /// Ok(())
552    /// # }
553    /// ```
554    pub async fn messages(self) -> Result<Ordered, StreamError> {
555        let config = Consumer {
556            config: self.config.clone().into(),
557            context: self.context.clone(),
558            info: self.info.clone(),
559        };
560        let stream = Stream::stream(
561            BatchConfig {
562                batch: 500,
563                expires: Some(Duration::from_secs(30)),
564                no_wait: false,
565                max_bytes: 0,
566                idle_heartbeat: Duration::from_secs(15),
567                min_pending: None,
568                min_ack_pending: None,
569                group: None,
570                #[cfg(feature = "server_2_12")]
571                priority: None,
572            },
573            &config,
574        )
575        .await?;
576
577        Ok(Ordered {
578            consumer_sequence: 0,
579            stream_sequence: 0,
580            missed_heartbeats: false,
581            create_stream: None,
582            context: self.context.clone(),
583            consumer_name: self
584                .config
585                .name
586                .clone()
587                .unwrap_or_else(|| self.context.client.new_inbox()),
588            consumer: self.config,
589            stream: Some(stream),
590            stream_name: self.info.stream_name.clone(),
591        })
592    }
593}
594
595/// Configuration for consumers. From a high level, the
596/// `durable_name` and `deliver_subject` fields have a particularly
597/// strong influence on the consumer's overall behavior.
598#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
599pub struct OrderedConfig {
600    /// A name of the consumer. Can be specified for both durable and ephemeral
601    /// consumers.
602    #[serde(default, skip_serializing_if = "Option::is_none")]
603    pub name: Option<String>,
604    /// A short description of the purpose of this consumer.
605    #[serde(default, skip_serializing_if = "Option::is_none")]
606    pub description: Option<String>,
607    #[serde(default, skip_serializing_if = "is_default")]
608    pub filter_subject: String,
609    #[cfg(feature = "server_2_10")]
610    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
611    #[serde(default, skip_serializing_if = "is_default")]
612    pub filter_subjects: Vec<String>,
613    /// Whether messages are sent as quickly as possible or at the rate of receipt
614    pub replay_policy: ReplayPolicy,
615    /// The rate of message delivery in bits per second
616    #[serde(default, skip_serializing_if = "is_default")]
617    pub rate_limit: u64,
618    /// What percentage of acknowledgments should be samples for observability, 0-100
619    #[serde(
620        rename = "sample_freq",
621        with = "super::sample_freq_deser",
622        default,
623        skip_serializing_if = "is_default"
624    )]
625    pub sample_frequency: u8,
626    /// Only deliver headers without payloads.
627    #[serde(default, skip_serializing_if = "is_default")]
628    pub headers_only: bool,
629    /// Allows for a variety of options that determine how this consumer will receive messages
630    #[serde(flatten)]
631    pub deliver_policy: DeliverPolicy,
632    /// The maximum number of waiting consumers.
633    #[serde(default, skip_serializing_if = "is_default")]
634    pub max_waiting: i64,
635    #[cfg(feature = "server_2_10")]
636    // Additional consumer metadata.
637    #[serde(default, skip_serializing_if = "is_default")]
638    pub metadata: HashMap<String, String>,
639    // Maximum number of messages that can be requested in single Pull Request.
640    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
641    // [stream]
642    pub max_batch: i64,
643    // Maximum number of bytes that can be requested in single Pull Request.
644    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
645    // [stream]
646    pub max_bytes: i64,
647    // Maximum expiry that can be set for a single Pull Request.
648    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
649    // [stream]
650    pub max_expires: Duration,
651}
652
653impl From<OrderedConfig> for Config {
654    fn from(config: OrderedConfig) -> Self {
655        Config {
656            durable_name: None,
657            name: config.name,
658            description: config.description,
659            deliver_policy: config.deliver_policy,
660            ack_policy: AckPolicy::None,
661            ack_wait: Duration::default(),
662            max_deliver: 1,
663            filter_subject: config.filter_subject,
664            #[cfg(feature = "server_2_10")]
665            filter_subjects: config.filter_subjects,
666            replay_policy: config.replay_policy,
667            rate_limit: config.rate_limit,
668            sample_frequency: config.sample_frequency,
669            max_waiting: config.max_waiting,
670            max_ack_pending: 0,
671            headers_only: config.headers_only,
672            max_batch: config.max_batch,
673            max_bytes: config.max_bytes,
674            max_expires: config.max_expires,
675            inactive_threshold: Duration::from_secs(30),
676            num_replicas: 1,
677            memory_storage: true,
678            #[cfg(feature = "server_2_10")]
679            metadata: config.metadata,
680            backoff: Vec::new(),
681            #[cfg(feature = "server_2_11")]
682            priority_policy: PriorityPolicy::None,
683            #[cfg(feature = "server_2_11")]
684            priority_groups: Vec::new(),
685            #[cfg(feature = "server_2_11")]
686            pause_until: None,
687        }
688    }
689}
690
691impl FromConsumer for OrderedConfig {
692    fn try_from_consumer_config(
693        config: crate::jetstream::consumer::Config,
694    ) -> Result<Self, crate::Error>
695    where
696        Self: Sized,
697    {
698        Ok(OrderedConfig {
699            name: config.name,
700            description: config.description,
701            filter_subject: config.filter_subject,
702            #[cfg(feature = "server_2_10")]
703            filter_subjects: config.filter_subjects,
704            replay_policy: config.replay_policy,
705            rate_limit: config.rate_limit,
706            sample_frequency: config.sample_frequency,
707            headers_only: config.headers_only,
708            deliver_policy: config.deliver_policy,
709            max_waiting: config.max_waiting,
710            #[cfg(feature = "server_2_10")]
711            metadata: config.metadata,
712            max_batch: config.max_batch,
713            max_bytes: config.max_bytes,
714            max_expires: config.max_expires,
715        })
716    }
717}
718
719impl IntoConsumerConfig for OrderedConfig {
720    fn into_consumer_config(self) -> super::Config {
721        jetstream::consumer::Config {
722            deliver_subject: None,
723            durable_name: None,
724            name: self.name,
725            description: self.description,
726            deliver_group: None,
727            deliver_policy: self.deliver_policy,
728            ack_policy: AckPolicy::None,
729            ack_wait: Duration::default(),
730            max_deliver: 1,
731            filter_subject: self.filter_subject,
732            #[cfg(feature = "server_2_10")]
733            filter_subjects: self.filter_subjects,
734            replay_policy: self.replay_policy,
735            rate_limit: self.rate_limit,
736            sample_frequency: self.sample_frequency,
737            max_waiting: self.max_waiting,
738            max_ack_pending: 0,
739            headers_only: self.headers_only,
740            flow_control: false,
741            idle_heartbeat: Duration::default(),
742            max_batch: 0,
743            max_bytes: 0,
744            max_expires: Duration::default(),
745            inactive_threshold: Duration::from_secs(30),
746            num_replicas: 1,
747            memory_storage: true,
748            #[cfg(feature = "server_2_10")]
749            metadata: self.metadata,
750            backoff: Vec::new(),
751            #[cfg(feature = "server_2_11")]
752            priority_policy: PriorityPolicy::None,
753            #[cfg(feature = "server_2_11")]
754            priority_groups: Vec::new(),
755            #[cfg(feature = "server_2_11")]
756            pause_until: None,
757        }
758    }
759}
760
761pub struct Ordered {
762    context: Context,
763    stream_name: String,
764    consumer: OrderedConfig,
765    consumer_name: String,
766    stream: Option<Stream>,
767    create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
768    consumer_sequence: u64,
769    stream_sequence: u64,
770    missed_heartbeats: bool,
771}
772
773impl futures_util::Stream for Ordered {
774    type Item = Result<jetstream::Message, OrderedError>;
775
776    fn poll_next(
777        mut self: Pin<&mut Self>,
778        cx: &mut std::task::Context<'_>,
779    ) -> Poll<Option<Self::Item>> {
780        let mut recreate = false;
781        // Poll messages
782        if let Some(stream) = self.stream.as_mut() {
783            match stream.poll_next_unpin(cx) {
784                Poll::Ready(message) => match message {
785                    Some(message) => match message {
786                        Ok(message) => {
787                            self.missed_heartbeats = false;
788                            let info = message.info().map_err(|err| {
789                                OrderedError::with_source(OrderedErrorKind::Other, err)
790                            })?;
791                            trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
792                                           self.consumer_sequence,
793                                           self.stream_sequence,
794                                           info.consumer_sequence,
795                                           info.stream_sequence);
796                            if info.consumer_sequence != self.consumer_sequence + 1 {
797                                debug!(
798                                    "ordered consumer mismatch. current {}, info: {}",
799                                    self.consumer_sequence, info.consumer_sequence
800                                );
801                                recreate = true;
802                                self.consumer_sequence = 0;
803                            } else {
804                                self.stream_sequence = info.stream_sequence;
805                                self.consumer_sequence = info.consumer_sequence;
806                                return Poll::Ready(Some(Ok(message)));
807                            }
808                        }
809                        Err(err) => match err.kind() {
810                            MessagesErrorKind::MissingHeartbeat => {
811                                // If we have missed heartbeats set, it means this is a second
812                                // missed heartbeat, so we need to recreate consumer.
813                                if self.missed_heartbeats {
814                                    self.consumer_sequence = 0;
815                                    recreate = true;
816                                } else {
817                                    self.missed_heartbeats = true;
818                                }
819                            }
820                            MessagesErrorKind::ConsumerDeleted
821                            | MessagesErrorKind::NoResponders => {
822                                recreate = true;
823                                self.consumer_sequence = 0;
824                            }
825                            MessagesErrorKind::Pull
826                            | MessagesErrorKind::PushBasedConsumer
827                            | MessagesErrorKind::Other => {
828                                return Poll::Ready(Some(Err(err.into())));
829                            }
830                        },
831                    },
832                    None => return Poll::Ready(None),
833                },
834                Poll::Pending => (),
835            }
836        }
837        // Recreate consumer if needed
838        if recreate {
839            self.stream = None;
840            self.create_stream = Some(Box::pin({
841                let context = self.context.clone();
842                let config = self.consumer.clone();
843                let stream_name = self.stream_name.clone();
844                let consumer_name = self.consumer_name.clone();
845                let sequence = self.stream_sequence;
846                async move {
847                    tryhard::retry_fn(|| {
848                        recreate_consumer_stream(
849                            &context,
850                            &config,
851                            &stream_name,
852                            &consumer_name,
853                            sequence,
854                        )
855                    })
856                    .retries(u32::MAX)
857                    .custom_backoff(backoff)
858                    .await
859                }
860            }))
861        }
862        // check for recreation future
863        if let Some(result) = self.create_stream.as_mut() {
864            match result.poll_unpin(cx) {
865                Poll::Ready(result) => match result {
866                    Ok(stream) => {
867                        self.create_stream = None;
868                        self.stream = Some(stream);
869                        return self.poll_next(cx);
870                    }
871                    Err(err) => {
872                        return Poll::Ready(Some(Err(OrderedError::with_source(
873                            OrderedErrorKind::Recreate,
874                            err,
875                        ))))
876                    }
877                },
878                Poll::Pending => (),
879            }
880        }
881        Poll::Pending
882    }
883}
884
885pub struct Stream {
886    pending_messages: usize,
887    pending_bytes: usize,
888    request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
889    request_tx: tokio::sync::watch::Sender<()>,
890    subscriber: Subscriber,
891    batch_config: BatchConfig,
892    context: Context,
893    pending_request: bool,
894    task_handle: JoinHandle<()>,
895    terminated: bool,
896    heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
897    started: Option<tokio::sync::oneshot::Sender<()>>,
898}
899
900impl Drop for Stream {
901    fn drop(&mut self) {
902        self.task_handle.abort();
903    }
904}
905
906impl Stream {
907    async fn stream(
908        batch_config: BatchConfig,
909        consumer: &Consumer<Config>,
910    ) -> Result<Stream, StreamError> {
911        let inbox = consumer.context.client.new_inbox();
912        let subscription = consumer
913            .context
914            .client
915            .subscribe(inbox.clone())
916            .await
917            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
918        let subject = format!(
919            "{}.CONSUMER.MSG.NEXT.{}.{}",
920            consumer.context.prefix, consumer.info.stream_name, consumer.info.name
921        );
922
923        let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
924        let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
925        let (started_tx, started_rx) = tokio::sync::oneshot::channel();
926        let task_handle = tokio::task::spawn({
927            let batch = batch_config.clone();
928            let consumer = consumer.clone();
929            let mut context = consumer.context.clone();
930            let inbox = inbox.clone();
931            async move {
932                started_rx.await.ok();
933                loop {
934                    // this is just in edge case of missing response for some reason.
935                    let expires = batch_config
936                        .expires
937                        .map(|expires| {
938                            if expires.is_zero() {
939                                Either::Left(future::pending())
940                            } else {
941                                Either::Right(tokio::time::sleep(
942                                    expires.saturating_add(Duration::from_secs(5)),
943                                ))
944                            }
945                        })
946                        .unwrap_or_else(|| Either::Left(future::pending()));
947                    // Need to check previous state, as `changed` will always fire on first
948                    // call.
949                    let prev_state = context.client.state.borrow().to_owned();
950                    let mut pending_reset = false;
951
952                    tokio::select! {
953                       _ = context.client.state.changed() => {
954                            let state = context.client.state.borrow().to_owned();
955                            if !(state == crate::connection::State::Connected
956                                && prev_state != State::Connected) {
957                                    continue;
958                                }
959                            debug!("detected !Connected -> Connected state change");
960
961                            match tryhard::retry_fn(|| consumer.get_info())
962                                .retries(5).custom_backoff(backoff).await
963                                .map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
964                                    Ok(info) => {
965                                        if info.num_waiting == 0 {
966                                            pending_reset = true;
967                                        }
968                                    }
969                                    Err(err) => {
970                                         if let Err(err) = request_result_tx.send(Err(err)).await {
971                                            debug!("failed to sent request result: {}", err);
972                                        }
973                                    },
974                            }
975                        },
976                        _ = request_rx.changed() => debug!("task received request request"),
977                        _ = expires => {
978                            pending_reset = true;
979                            debug!("expired pull request")},
980                    }
981
982                    let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
983                    let result = context
984                        .client
985                        .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
986                        .await
987                        .map(|_| pending_reset);
988                    // TODO: add tracing instead of ignoring this.
989                    request_result_tx
990                        .send(result.map(|_| pending_reset).map_err(|err| {
991                            crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
992                                .into()
993                        }))
994                        .await
995                        .ok();
996                    trace!("result send over tx");
997                }
998            }
999        });
1000
1001        Ok(Stream {
1002            task_handle,
1003            request_result_rx,
1004            request_tx,
1005            batch_config,
1006            pending_messages: 0,
1007            pending_bytes: 0,
1008            subscriber: subscription,
1009            context: consumer.context.clone(),
1010            pending_request: false,
1011            terminated: false,
1012            heartbeat_timeout: None,
1013            started: Some(started_tx),
1014        })
1015    }
1016}
1017
1018#[derive(Clone, Copy, Debug, PartialEq)]
1019pub enum OrderedErrorKind {
1020    MissingHeartbeat,
1021    ConsumerDeleted,
1022    Pull,
1023    PushBasedConsumer,
1024    Recreate,
1025    NoResponders,
1026    Other,
1027}
1028
1029impl std::fmt::Display for OrderedErrorKind {
1030    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1031        match self {
1032            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1033            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1034            Self::Pull => write!(f, "pull request failed"),
1035            Self::Other => write!(f, "error"),
1036            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1037            Self::Recreate => write!(f, "consumer recreation failed"),
1038            Self::NoResponders => write!(f, "no responders"),
1039        }
1040    }
1041}
1042
1043pub type OrderedError = Error<OrderedErrorKind>;
1044
1045impl From<MessagesError> for OrderedError {
1046    fn from(err: MessagesError) -> Self {
1047        match err.kind() {
1048            MessagesErrorKind::MissingHeartbeat => {
1049                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1050            }
1051            MessagesErrorKind::ConsumerDeleted => {
1052                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1053            }
1054            MessagesErrorKind::Pull => OrderedError {
1055                kind: OrderedErrorKind::Pull,
1056                source: err.source,
1057            },
1058            MessagesErrorKind::PushBasedConsumer => {
1059                OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1060            }
1061            MessagesErrorKind::Other => OrderedError {
1062                kind: OrderedErrorKind::Other,
1063                source: err.source,
1064            },
1065            MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
1066        }
1067    }
1068}
1069
1070#[derive(Clone, Copy, Debug, PartialEq)]
1071pub enum MessagesErrorKind {
1072    MissingHeartbeat,
1073    ConsumerDeleted,
1074    Pull,
1075    PushBasedConsumer,
1076    NoResponders,
1077    Other,
1078}
1079
1080impl std::fmt::Display for MessagesErrorKind {
1081    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1082        match self {
1083            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1084            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1085            Self::Pull => write!(f, "pull request failed"),
1086            Self::Other => write!(f, "error"),
1087            Self::NoResponders => write!(f, "no responders"),
1088            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1089        }
1090    }
1091}
1092
1093pub type MessagesError = Error<MessagesErrorKind>;
1094
1095impl futures_util::Stream for Stream {
1096    type Item = Result<jetstream::Message, MessagesError>;
1097
1098    fn poll_next(
1099        mut self: std::pin::Pin<&mut Self>,
1100        cx: &mut std::task::Context<'_>,
1101    ) -> std::task::Poll<Option<Self::Item>> {
1102        if let Some(started) = self.started.take() {
1103            trace!("stream started, sending started signal");
1104            if started.send(()).is_err() {
1105                debug!("failed to send started signal");
1106            }
1107        }
1108        if self.terminated {
1109            return Poll::Ready(None);
1110        }
1111
1112        if !self.batch_config.idle_heartbeat.is_zero() {
1113            trace!("checking idle hearbeats");
1114            let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1115            match self
1116                .heartbeat_timeout
1117                .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1118                .poll_unpin(cx)
1119            {
1120                Poll::Ready(_) => {
1121                    self.heartbeat_timeout = None;
1122                    return Poll::Ready(Some(Err(MessagesError::new(
1123                        MessagesErrorKind::MissingHeartbeat,
1124                    ))));
1125                }
1126                Poll::Pending => (),
1127            }
1128        }
1129
1130        loop {
1131            trace!("pending messages: {}", self.pending_messages);
1132            if (self.pending_messages <= self.batch_config.batch / 2
1133                || (self.batch_config.max_bytes > 0
1134                    && self.pending_bytes <= self.batch_config.max_bytes / 2))
1135                && !self.pending_request
1136            {
1137                debug!("pending messages reached threshold to send new fetch request");
1138                self.request_tx.send(()).ok();
1139                self.pending_request = true;
1140            }
1141
1142            match self.request_result_rx.poll_recv(cx) {
1143                Poll::Ready(resp) => match resp {
1144                    Some(resp) => match resp {
1145                        Ok(reset) => {
1146                            trace!("request response: {:?}", reset);
1147                            debug!("request sent, setting pending messages");
1148                            if reset {
1149                                self.pending_messages = self.batch_config.batch;
1150                                self.pending_bytes = self.batch_config.max_bytes;
1151                            } else {
1152                                self.pending_messages += self.batch_config.batch;
1153                                self.pending_bytes += self.batch_config.max_bytes;
1154                            }
1155                            self.pending_request = false;
1156                            continue;
1157                        }
1158                        Err(err) => {
1159                            return Poll::Ready(Some(Err(MessagesError::with_source(
1160                                MessagesErrorKind::Pull,
1161                                err,
1162                            ))))
1163                        }
1164                    },
1165                    None => return Poll::Ready(None),
1166                },
1167                Poll::Pending => {
1168                    trace!("pending result");
1169                }
1170            }
1171
1172            trace!("polling subscriber");
1173            match self.subscriber.receiver.poll_recv(cx) {
1174                Poll::Ready(maybe_message) => {
1175                    self.heartbeat_timeout = None;
1176                    match maybe_message {
1177                        Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1178                            StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1179                                debug!("received status message: {:?}", message);
1180                                // If consumer has been deleted, error and shutdown the iterator.
1181                                if message.description.as_deref() == Some("Consumer Deleted") {
1182                                    self.terminated = true;
1183                                    return Poll::Ready(Some(Err(MessagesError::new(
1184                                        MessagesErrorKind::ConsumerDeleted,
1185                                    ))));
1186                                }
1187                                // If consumer is not pull based, error and shutdown the iterator.
1188                                if message.description.as_deref() == Some("Consumer is push based")
1189                                {
1190                                    self.terminated = true;
1191                                    return Poll::Ready(Some(Err(MessagesError::new(
1192                                        MessagesErrorKind::PushBasedConsumer,
1193                                    ))));
1194                                }
1195
1196                                // Do accounting for messages left after terminated/completed pull request.
1197                                let pending_messages = message
1198                                    .headers
1199                                    .as_ref()
1200                                    .and_then(|headers| headers.get("Nats-Pending-Messages"))
1201                                    .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1202                                    .map_err(|err| {
1203                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1204                                    })?;
1205
1206                                let pending_bytes = message
1207                                    .headers
1208                                    .as_ref()
1209                                    .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1210                                    .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1211                                    .map_err(|err| {
1212                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1213                                    })?;
1214
1215                                debug!(
1216                                    "timeout reached. remaining messages: {}, bytes {}",
1217                                    pending_messages, pending_bytes
1218                                );
1219                                self.pending_messages =
1220                                    self.pending_messages.saturating_sub(pending_messages);
1221                                trace!("message bytes len: {}", pending_bytes);
1222                                self.pending_bytes =
1223                                    self.pending_bytes.saturating_sub(pending_bytes);
1224                                continue;
1225                            }
1226                            // Idle Hearbeat means we have no messages, but consumer is fine.
1227                            StatusCode::IDLE_HEARTBEAT => {
1228                                debug!("received idle heartbeat");
1229                                continue;
1230                            }
1231                            // We got an message from a stream.
1232                            StatusCode::OK => {
1233                                trace!("message received");
1234                                self.pending_messages = self.pending_messages.saturating_sub(1);
1235                                self.pending_bytes =
1236                                    self.pending_bytes.saturating_sub(message.length);
1237                                return Poll::Ready(Some(Ok(jetstream::Message {
1238                                    context: self.context.clone(),
1239                                    message,
1240                                })));
1241                            }
1242                            StatusCode::NO_RESPONDERS => {
1243                                debug!("received no responders");
1244                                return Poll::Ready(Some(Err(MessagesError::new(
1245                                    MessagesErrorKind::NoResponders,
1246                                ))));
1247                            }
1248                            status => {
1249                                debug!("received unknown message: {:?}", message);
1250                                return Poll::Ready(Some(Err(MessagesError::with_source(
1251                                    MessagesErrorKind::Other,
1252                                    format!(
1253                                        "error while processing messages from the stream: {}, {:?}",
1254                                        status, message.description
1255                                    ),
1256                                ))));
1257                            }
1258                        },
1259                        None => return Poll::Ready(None),
1260                    }
1261                }
1262                Poll::Pending => {
1263                    debug!("subscriber still pending");
1264                    return std::task::Poll::Pending;
1265                }
1266            }
1267        }
1268    }
1269}
1270
1271/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1272///
1273/// # Examples
1274///
1275/// ```no_run
1276/// # #[tokio::main]
1277/// # async fn main() -> Result<(), async_nats::Error>  {
1278/// use futures_util::StreamExt;
1279/// use async_nats::jetstream::consumer::PullConsumer;
1280/// let client = async_nats::connect("localhost:4222").await?;
1281/// let jetstream = async_nats::jetstream::new(client);
1282///
1283/// let consumer: PullConsumer = jetstream
1284///     .get_stream("events").await?
1285///     .get_consumer("pull").await?;
1286///
1287/// let mut messages = consumer.stream()
1288///     .max_messages_per_batch(100)
1289///     .max_bytes_per_batch(1024)
1290///     .messages().await?;
1291///
1292/// while let Some(message) = messages.next().await {
1293///     let message = message?;
1294///     println!("message: {:?}", message);
1295///     message.ack().await?;
1296/// }
1297/// # Ok(())
1298/// # }
1299pub struct StreamBuilder<'a> {
1300    batch: usize,
1301    max_bytes: usize,
1302    heartbeat: Duration,
1303    expires: Duration,
1304    group: Option<String>,
1305    min_pending: Option<usize>,
1306    min_ack_pending: Option<usize>,
1307    #[cfg(feature = "server_2_12")]
1308    priority: Option<usize>,
1309    consumer: &'a Consumer<Config>,
1310}
1311
1312impl<'a> StreamBuilder<'a> {
1313    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1314        StreamBuilder {
1315            consumer,
1316            batch: 200,
1317            max_bytes: 0,
1318            expires: Duration::from_secs(30),
1319            heartbeat: Duration::default(),
1320            group: None,
1321            min_pending: None,
1322            min_ack_pending: None,
1323            #[cfg(feature = "server_2_12")]
1324            priority: None,
1325        }
1326    }
1327
1328    /// Sets max bytes that can be buffered on the Client while processing already received
1329    /// messages.
1330    /// Higher values will yield better performance, but also potentially increase memory usage if
1331    /// application is acknowledging messages much slower than they arrive.
1332    ///
1333    /// Default values should provide reasonable balance between performance and memory usage.
1334    ///
1335    /// # Examples
1336    ///
1337    /// ```no_run
1338    /// # #[tokio::main]
1339    /// # async fn main() -> Result<(), async_nats::Error>  {
1340    /// use async_nats::jetstream::consumer::PullConsumer;
1341    /// use futures_util::StreamExt;
1342    /// let client = async_nats::connect("localhost:4222").await?;
1343    /// let jetstream = async_nats::jetstream::new(client);
1344    ///
1345    /// let consumer: PullConsumer = jetstream
1346    ///     .get_stream("events")
1347    ///     .await?
1348    ///     .get_consumer("pull")
1349    ///     .await?;
1350    ///
1351    /// let mut messages = consumer
1352    ///     .stream()
1353    ///     .max_bytes_per_batch(1024)
1354    ///     .messages()
1355    ///     .await?;
1356    ///
1357    /// while let Some(message) = messages.next().await {
1358    ///     let message = message?;
1359    ///     println!("message: {:?}", message);
1360    ///     message.ack().await?;
1361    /// }
1362    /// # Ok(())
1363    /// # }
1364    /// ```
1365    pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1366        self.max_bytes = max_bytes;
1367        self
1368    }
1369
1370    /// Sets max number of messages that can be buffered on the Client while processing already received
1371    /// messages.
1372    /// Higher values will yield better performance, but also potentially increase memory usage if
1373    /// application is acknowledging messages much slower than they arrive.
1374    ///
1375    /// Default values should provide reasonable balance between performance and memory usage.
1376    ///
1377    /// # Examples
1378    ///
1379    /// ```no_run
1380    /// # #[tokio::main]
1381    /// # async fn main() -> Result<(), async_nats::Error>  {
1382    /// use async_nats::jetstream::consumer::PullConsumer;
1383    /// use futures_util::StreamExt;
1384    /// let client = async_nats::connect("localhost:4222").await?;
1385    /// let jetstream = async_nats::jetstream::new(client);
1386    ///
1387    /// let consumer: PullConsumer = jetstream
1388    ///     .get_stream("events")
1389    ///     .await?
1390    ///     .get_consumer("pull")
1391    ///     .await?;
1392    ///
1393    /// let mut messages = consumer
1394    ///     .stream()
1395    ///     .max_messages_per_batch(100)
1396    ///     .messages()
1397    ///     .await?;
1398    ///
1399    /// while let Some(message) = messages.next().await {
1400    ///     let message = message?;
1401    ///     println!("message: {:?}", message);
1402    ///     message.ack().await?;
1403    /// }
1404    /// # Ok(())
1405    /// # }
1406    /// ```
1407    pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1408        self.batch = batch;
1409        self
1410    }
1411
1412    /// Sets heartbeat which will be send by the server if there are no messages for a given
1413    /// [Consumer] pending.
1414    ///
1415    /// # Examples
1416    ///
1417    /// ```no_run
1418    /// # #[tokio::main]
1419    /// # async fn main() -> Result<(), async_nats::Error>  {
1420    /// use async_nats::jetstream::consumer::PullConsumer;
1421    /// use futures_util::StreamExt;
1422    /// let client = async_nats::connect("localhost:4222").await?;
1423    /// let jetstream = async_nats::jetstream::new(client);
1424    ///
1425    /// let consumer: PullConsumer = jetstream
1426    ///     .get_stream("events")
1427    ///     .await?
1428    ///     .get_consumer("pull")
1429    ///     .await?;
1430    ///
1431    /// let mut messages = consumer
1432    ///     .stream()
1433    ///     .heartbeat(std::time::Duration::from_secs(10))
1434    ///     .messages()
1435    ///     .await?;
1436    ///
1437    /// while let Some(message) = messages.next().await {
1438    ///     let message = message?;
1439    ///     println!("message: {:?}", message);
1440    ///     message.ack().await?;
1441    /// }
1442    /// # Ok(())
1443    /// # }
1444    /// ```
1445    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1446        self.heartbeat = heartbeat;
1447        self
1448    }
1449
1450    /// Low level API that does not need tweaking for most use cases.
1451    /// Sets how long each batch request waits for whole batch of messages before timing out.
1452    /// [Consumer] pending.
1453    ///
1454    /// # Examples
1455    ///
1456    /// ```no_run
1457    /// # #[tokio::main]
1458    /// # async fn main() -> Result<(), async_nats::Error>  {
1459    /// use async_nats::jetstream::consumer::PullConsumer;
1460    /// use futures_util::StreamExt;
1461    /// let client = async_nats::connect("localhost:4222").await?;
1462    /// let jetstream = async_nats::jetstream::new(client);
1463    ///
1464    /// let consumer: PullConsumer = jetstream
1465    ///     .get_stream("events")
1466    ///     .await?
1467    ///     .get_consumer("pull")
1468    ///     .await?;
1469    ///
1470    /// let mut messages = consumer
1471    ///     .stream()
1472    ///     .expires(std::time::Duration::from_secs(30))
1473    ///     .messages()
1474    ///     .await?;
1475    ///
1476    /// while let Some(message) = messages.next().await {
1477    ///     let message = message?;
1478    ///     println!("message: {:?}", message);
1479    ///     message.ack().await?;
1480    /// }
1481    /// # Ok(())
1482    /// # }
1483    /// ```
1484    pub fn expires(mut self, expires: Duration) -> Self {
1485        self.expires = expires;
1486        self
1487    }
1488
1489    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1490    /// messages for a [Consumer].
1491    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1492    ///
1493    /// # Examples
1494    ///
1495    /// ```no_run
1496    /// # #[tokio::main]
1497    /// # async fn main() -> Result<(), async_nats::Error>  {
1498    /// use async_nats::jetstream::consumer::PullConsumer;
1499    /// use futures_util::StreamExt;
1500    /// let client = async_nats::connect("localhost:4222").await?;
1501    /// let jetstream = async_nats::jetstream::new(client);
1502    ///
1503    /// let consumer: PullConsumer = jetstream
1504    ///     .get_stream("events")
1505    ///     .await?
1506    ///     .get_consumer("pull")
1507    ///     .await?;
1508    ///
1509    /// let mut messages = consumer
1510    ///     .stream()
1511    ///     .expires(std::time::Duration::from_secs(30))
1512    ///     .group("A")
1513    ///     .min_pending(100)
1514    ///     .messages()
1515    ///     .await?;
1516    ///
1517    /// while let Some(message) = messages.next().await {
1518    ///     let message = message?;
1519    ///     println!("message: {:?}", message);
1520    ///     message.ack().await?;
1521    /// }
1522    /// # Ok(())
1523    /// # }
1524    /// ```
1525    pub fn min_pending(mut self, min_pending: usize) -> Self {
1526        self.min_pending = Some(min_pending);
1527        self
1528    }
1529
1530    /// Sets the priority at which this stream will get messages. If there are any requests with
1531    /// lower priority number, this stream will not get messages until those are satisfied.
1532    #[cfg(feature = "server_2_12")]
1533    pub fn priority(mut self, priority: usize) -> Self {
1534        self.priority = Some(priority);
1535        self
1536    }
1537
1538    /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1539    /// messages for a [Consumer].
1540    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1541    ///
1542    /// # Examples
1543    ///
1544    /// ```no_run
1545    /// # #[tokio::main]
1546    /// # async fn main() -> Result<(), async_nats::Error>  {
1547    /// use async_nats::jetstream::consumer::PullConsumer;
1548    /// use futures_util::StreamExt;
1549    /// let client = async_nats::connect("localhost:4222").await?;
1550    /// let jetstream = async_nats::jetstream::new(client);
1551    ///
1552    /// let consumer: PullConsumer = jetstream
1553    ///     .get_stream("events")
1554    ///     .await?
1555    ///     .get_consumer("pull")
1556    ///     .await?;
1557    ///
1558    /// let mut messages = consumer
1559    ///     .stream()
1560    ///     .expires(std::time::Duration::from_secs(30))
1561    ///     .group("A")
1562    ///     .min_ack_pending(100)
1563    ///     .messages()
1564    ///     .await?;
1565    ///
1566    /// while let Some(message) = messages.next().await {
1567    ///     let message = message?;
1568    ///     println!("message: {:?}", message);
1569    ///     message.ack().await?;
1570    /// }
1571    /// # Ok(())
1572    /// # }
1573    /// ```
1574    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1575        self.min_ack_pending = Some(min_ack_pending);
1576        self
1577    }
1578
1579    /// Setting group when using [Consumer] with [Config::priority_groups].
1580    ///
1581    /// # Examples
1582    ///
1583    /// ```no_run
1584    /// # #[tokio::main]
1585    /// # async fn main() -> Result<(), async_nats::Error>  {
1586    /// use async_nats::jetstream::consumer::PullConsumer;
1587    /// use futures_util::StreamExt;
1588    /// let client = async_nats::connect("localhost:4222").await?;
1589    /// let jetstream = async_nats::jetstream::new(client);
1590    ///
1591    /// let consumer: PullConsumer = jetstream
1592    ///     .get_stream("events")
1593    ///     .await?
1594    ///     .get_consumer("pull")
1595    ///     .await?;
1596    ///
1597    /// let mut messages = consumer
1598    ///     .stream()
1599    ///     .expires(std::time::Duration::from_secs(30))
1600    ///     .group("A")
1601    ///     .min_ack_pending(100)
1602    ///     .messages()
1603    ///     .await?;
1604    ///
1605    /// while let Some(message) = messages.next().await {
1606    ///     let message = message?;
1607    ///     println!("message: {:?}", message);
1608    ///     message.ack().await?;
1609    /// }
1610    /// # Ok(())
1611    /// # }
1612    /// ```
1613    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1614        self.group = Some(group.into());
1615        self
1616    }
1617
1618    /// Creates actual [Stream] with provided configuration.
1619    ///
1620    /// # Examples
1621    ///
1622    /// ```no_run
1623    /// # #[tokio::main]
1624    /// # async fn main() -> Result<(), async_nats::Error>  {
1625    /// use async_nats::jetstream::consumer::PullConsumer;
1626    /// use futures_util::StreamExt;
1627    /// let client = async_nats::connect("localhost:4222").await?;
1628    /// let jetstream = async_nats::jetstream::new(client);
1629    ///
1630    /// let consumer: PullConsumer = jetstream
1631    ///     .get_stream("events")
1632    ///     .await?
1633    ///     .get_consumer("pull")
1634    ///     .await?;
1635    ///
1636    /// let mut messages = consumer
1637    ///     .stream()
1638    ///     .max_messages_per_batch(100)
1639    ///     .messages()
1640    ///     .await?;
1641    ///
1642    /// while let Some(message) = messages.next().await {
1643    ///     let message = message?;
1644    ///     println!("message: {:?}", message);
1645    ///     message.ack().await?;
1646    /// }
1647    /// # Ok(())
1648    /// # }
1649    /// ```
1650    pub async fn messages(self) -> Result<Stream, StreamError> {
1651        Stream::stream(
1652            BatchConfig {
1653                batch: self.batch,
1654                expires: Some(self.expires),
1655                no_wait: false,
1656                max_bytes: self.max_bytes,
1657                idle_heartbeat: self.heartbeat,
1658                min_pending: self.min_pending,
1659                group: self.group,
1660                min_ack_pending: self.min_ack_pending,
1661                #[cfg(feature = "server_2_12")]
1662                priority: self.priority,
1663            },
1664            self.consumer,
1665        )
1666        .await
1667    }
1668}
1669
1670/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1671///
1672/// # Examples
1673///
1674/// ```no_run
1675/// # #[tokio::main]
1676/// # async fn main() -> Result<(), async_nats::Error>  {
1677/// use async_nats::jetstream::consumer::PullConsumer;
1678/// use futures_util::StreamExt;
1679/// let client = async_nats::connect("localhost:4222").await?;
1680/// let jetstream = async_nats::jetstream::new(client);
1681///
1682/// let consumer: PullConsumer = jetstream
1683///     .get_stream("events")
1684///     .await?
1685///     .get_consumer("pull")
1686///     .await?;
1687///
1688/// let mut messages = consumer
1689///     .fetch()
1690///     .max_messages(100)
1691///     .max_bytes(1024)
1692///     .messages()
1693///     .await?;
1694///
1695/// while let Some(message) = messages.next().await {
1696///     let message = message?;
1697///     println!("message: {:?}", message);
1698///     message.ack().await?;
1699/// }
1700/// # Ok(())
1701/// # }
1702/// ```
1703pub struct FetchBuilder<'a> {
1704    batch: usize,
1705    max_bytes: usize,
1706    heartbeat: Duration,
1707    expires: Option<Duration>,
1708    min_pending: Option<usize>,
1709    min_ack_pending: Option<usize>,
1710    group: Option<String>,
1711    consumer: &'a Consumer<Config>,
1712}
1713
1714impl<'a> FetchBuilder<'a> {
1715    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1716        FetchBuilder {
1717            consumer,
1718            batch: 200,
1719            max_bytes: 0,
1720            expires: None,
1721            min_pending: None,
1722            min_ack_pending: None,
1723            group: None,
1724            heartbeat: Duration::default(),
1725        }
1726    }
1727
1728    /// Sets max bytes that can be buffered on the Client while processing already received
1729    /// messages.
1730    /// Higher values will yield better performance, but also potentially increase memory usage if
1731    /// application is acknowledging messages much slower than they arrive.
1732    ///
1733    /// Default values should provide reasonable balance between performance and memory usage.
1734    ///
1735    /// # Examples
1736    ///
1737    /// ```no_run
1738    /// # #[tokio::main]
1739    /// # async fn main() -> Result<(), async_nats::Error>  {
1740    /// use futures_util::StreamExt;
1741    /// let client = async_nats::connect("localhost:4222").await?;
1742    /// let jetstream = async_nats::jetstream::new(client);
1743    ///
1744    /// let consumer = jetstream
1745    ///     .get_stream("events")
1746    ///     .await?
1747    ///     .get_consumer("pull")
1748    ///     .await?;
1749    ///
1750    /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1751    ///
1752    /// while let Some(message) = messages.next().await {
1753    ///     let message = message?;
1754    ///     println!("message: {:?}", message);
1755    ///     message.ack().await?;
1756    /// }
1757    /// # Ok(())
1758    /// # }
1759    /// ```
1760    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1761        self.max_bytes = max_bytes;
1762        self
1763    }
1764
1765    /// Sets max number of messages that can be buffered on the Client while processing already received
1766    /// messages.
1767    /// Higher values will yield better performance, but also potentially increase memory usage if
1768    /// application is acknowledging messages much slower than they arrive.
1769    ///
1770    /// Default values should provide reasonable balance between performance and memory usage.
1771    ///
1772    /// # Examples
1773    ///
1774    /// ```no_run
1775    /// # #[tokio::main]
1776    /// # async fn main() -> Result<(), async_nats::Error>  {
1777    /// use futures_util::StreamExt;
1778    /// let client = async_nats::connect("localhost:4222").await?;
1779    /// let jetstream = async_nats::jetstream::new(client);
1780    ///
1781    /// let consumer = jetstream
1782    ///     .get_stream("events")
1783    ///     .await?
1784    ///     .get_consumer("pull")
1785    ///     .await?;
1786    ///
1787    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1788    ///
1789    /// while let Some(message) = messages.next().await {
1790    ///     let message = message?;
1791    ///     println!("message: {:?}", message);
1792    ///     message.ack().await?;
1793    /// }
1794    /// # Ok(())
1795    /// # }
1796    /// ```
1797    pub fn max_messages(mut self, batch: usize) -> Self {
1798        self.batch = batch;
1799        self
1800    }
1801
1802    /// Sets heartbeat which will be send by the server if there are no messages for a given
1803    /// [Consumer] pending.
1804    ///
1805    /// # Examples
1806    ///
1807    /// ```no_run
1808    /// # #[tokio::main]
1809    /// # async fn main() -> Result<(), async_nats::Error>  {
1810    /// use async_nats::jetstream::consumer::PullConsumer;
1811    /// use futures_util::StreamExt;
1812    /// let client = async_nats::connect("localhost:4222").await?;
1813    /// let jetstream = async_nats::jetstream::new(client);
1814    ///
1815    /// let consumer = jetstream
1816    ///     .get_stream("events")
1817    ///     .await?
1818    ///     .get_consumer("pull")
1819    ///     .await?;
1820    ///
1821    /// let mut messages = consumer
1822    ///     .fetch()
1823    ///     .heartbeat(std::time::Duration::from_secs(10))
1824    ///     .messages()
1825    ///     .await?;
1826    ///
1827    /// while let Some(message) = messages.next().await {
1828    ///     let message = message?;
1829    ///     println!("message: {:?}", message);
1830    ///     message.ack().await?;
1831    /// }
1832    /// # Ok(())
1833    /// # }
1834    /// ```
1835    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1836        self.heartbeat = heartbeat;
1837        self
1838    }
1839
1840    /// Low level API that does not need tweaking for most use cases.
1841    /// Sets how long each batch request waits for whole batch of messages before timing out.
1842    /// [Consumer] pending.
1843    ///
1844    /// # Examples
1845    ///
1846    /// ```no_run
1847    /// # #[tokio::main]
1848    /// # async fn main() -> Result<(), async_nats::Error>  {
1849    /// use async_nats::jetstream::consumer::PullConsumer;
1850    /// use futures_util::StreamExt;
1851    ///
1852    /// let client = async_nats::connect("localhost:4222").await?;
1853    /// let jetstream = async_nats::jetstream::new(client);
1854    ///
1855    /// let consumer: PullConsumer = jetstream
1856    ///     .get_stream("events")
1857    ///     .await?
1858    ///     .get_consumer("pull")
1859    ///     .await?;
1860    ///
1861    /// let mut messages = consumer
1862    ///     .fetch()
1863    ///     .expires(std::time::Duration::from_secs(30))
1864    ///     .messages()
1865    ///     .await?;
1866    ///
1867    /// while let Some(message) = messages.next().await {
1868    ///     let message = message?;
1869    ///     println!("message: {:?}", message);
1870    ///     message.ack().await?;
1871    /// }
1872    /// # Ok(())
1873    /// # }
1874    /// ```
1875    pub fn expires(mut self, expires: Duration) -> Self {
1876        self.expires = Some(expires);
1877        self
1878    }
1879
1880    /// Sets overflow threshold for minimum pending messages before this stream will start getting
1881    /// messages.
1882    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1883    /// [PriorityPolicy::Overflow] set.
1884    ///
1885    /// # Examples
1886    ///
1887    /// ```no_run
1888    /// # #[tokio::main]
1889    /// # async fn main() -> Result<(), async_nats::Error>  {
1890    /// use async_nats::jetstream::consumer::PullConsumer;
1891    /// use futures_util::StreamExt;
1892    ///
1893    /// let client = async_nats::connect("localhost:4222").await?;
1894    /// let jetstream = async_nats::jetstream::new(client);
1895    ///
1896    /// let consumer: PullConsumer = jetstream
1897    ///     .get_stream("events")
1898    ///     .await?
1899    ///     .get_consumer("pull")
1900    ///     .await?;
1901    ///
1902    /// let mut messages = consumer
1903    ///     .fetch()
1904    ///     .expires(std::time::Duration::from_secs(30))
1905    ///     .group("A")
1906    ///     .min_pending(100)
1907    ///     .messages()
1908    ///     .await?;
1909    ///
1910    /// while let Some(message) = messages.next().await {
1911    ///     let message = message?;
1912    ///     println!("message: {:?}", message);
1913    ///     message.ack().await?;
1914    /// }
1915    /// # Ok(())
1916    /// # }
1917    /// ```
1918    pub fn min_pending(mut self, min_pending: usize) -> Self {
1919        self.min_pending = Some(min_pending);
1920        self
1921    }
1922
1923    /// Sets the priority at which this stream will get messages. If there are any requests with
1924    /// lower priority number, this stream will not get messages until those are satisfied.
1925    #[cfg(feature = "server_2_12")]
1926    pub fn priority(mut self, priority: usize) -> Self {
1927        self.batch = priority;
1928        self
1929    }
1930
1931    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1932    /// messages.
1933    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1934    /// [PriorityPolicy::Overflow] set.
1935    ///
1936    /// # Examples
1937    ///
1938    /// ```no_run
1939    /// # #[tokio::main]
1940    /// # async fn main() -> Result<(), async_nats::Error>  {
1941    /// use async_nats::jetstream::consumer::PullConsumer;
1942    /// use futures_util::StreamExt;
1943    ///
1944    /// let client = async_nats::connect("localhost:4222").await?;
1945    /// let jetstream = async_nats::jetstream::new(client);
1946    ///
1947    /// let consumer: PullConsumer = jetstream
1948    ///     .get_stream("events")
1949    ///     .await?
1950    ///     .get_consumer("pull")
1951    ///     .await?;
1952    ///
1953    /// let mut messages = consumer
1954    ///     .fetch()
1955    ///     .expires(std::time::Duration::from_secs(30))
1956    ///     .group("A")
1957    ///     .min_ack_pending(100)
1958    ///     .messages()
1959    ///     .await?;
1960    ///
1961    /// while let Some(message) = messages.next().await {
1962    ///     let message = message?;
1963    ///     println!("message: {:?}", message);
1964    ///     message.ack().await?;
1965    /// }
1966    /// # Ok(())
1967    /// # }
1968    /// ```
1969    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1970        self.min_ack_pending = Some(min_ack_pending);
1971        self
1972    }
1973
1974    /// Setting group when using [Consumer] with [PriorityPolicy].
1975    ///
1976    /// # Examples
1977    ///
1978    /// ```no_run
1979    /// # #[tokio::main]
1980    /// # async fn main() -> Result<(), async_nats::Error>  {
1981    /// use async_nats::jetstream::consumer::PullConsumer;
1982    /// use futures_util::StreamExt;
1983    ///
1984    /// let client = async_nats::connect("localhost:4222").await?;
1985    /// let jetstream = async_nats::jetstream::new(client);
1986    ///
1987    /// let consumer: PullConsumer = jetstream
1988    ///     .get_stream("events")
1989    ///     .await?
1990    ///     .get_consumer("pull")
1991    ///     .await?;
1992    ///
1993    /// let mut messages = consumer
1994    ///     .fetch()
1995    ///     .expires(std::time::Duration::from_secs(30))
1996    ///     .group("A")
1997    ///     .min_ack_pending(100)
1998    ///     .messages()
1999    ///     .await?;
2000    ///
2001    /// while let Some(message) = messages.next().await {
2002    ///     let message = message?;
2003    ///     println!("message: {:?}", message);
2004    ///     message.ack().await?;
2005    /// }
2006    /// # Ok(())
2007    /// # }
2008    /// ```
2009    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2010        self.group = Some(group.into());
2011        self
2012    }
2013
2014    /// Creates actual [Stream] with provided configuration.
2015    ///
2016    /// # Examples
2017    ///
2018    /// ```no_run
2019    /// # #[tokio::main]
2020    /// # async fn main() -> Result<(), async_nats::Error>  {
2021    /// use async_nats::jetstream::consumer::PullConsumer;
2022    /// use futures_util::StreamExt;
2023    /// let client = async_nats::connect("localhost:4222").await?;
2024    /// let jetstream = async_nats::jetstream::new(client);
2025    ///
2026    /// let consumer: PullConsumer = jetstream
2027    ///     .get_stream("events")
2028    ///     .await?
2029    ///     .get_consumer("pull")
2030    ///     .await?;
2031    ///
2032    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
2033    ///
2034    /// while let Some(message) = messages.next().await {
2035    ///     let message = message?;
2036    ///     println!("message: {:?}", message);
2037    ///     message.ack().await?;
2038    /// }
2039    /// # Ok(())
2040    /// # }
2041    /// ```
2042    pub async fn messages(self) -> Result<Batch, BatchError> {
2043        Batch::batch(
2044            BatchConfig {
2045                batch: self.batch,
2046                expires: self.expires,
2047                no_wait: true,
2048                max_bytes: self.max_bytes,
2049                idle_heartbeat: self.heartbeat,
2050                min_pending: self.min_pending,
2051                min_ack_pending: self.min_ack_pending,
2052                group: self.group,
2053                #[cfg(feature = "server_2_12")]
2054                priority: None,
2055            },
2056            self.consumer,
2057        )
2058        .await
2059    }
2060}
2061
2062/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2063///
2064/// # Examples
2065///
2066/// ```no_run
2067/// # #[tokio::main]
2068/// # async fn main() -> Result<(), async_nats::Error>  {
2069/// use async_nats::jetstream::consumer::PullConsumer;
2070/// use futures_util::StreamExt;
2071/// let client = async_nats::connect("localhost:4222").await?;
2072/// let jetstream = async_nats::jetstream::new(client);
2073///
2074/// let consumer: PullConsumer = jetstream
2075///     .get_stream("events")
2076///     .await?
2077///     .get_consumer("pull")
2078///     .await?;
2079///
2080/// let mut messages = consumer
2081///     .batch()
2082///     .max_messages(100)
2083///     .max_bytes(1024)
2084///     .messages()
2085///     .await?;
2086///
2087/// while let Some(message) = messages.next().await {
2088///     let message = message?;
2089///     println!("message: {:?}", message);
2090///     message.ack().await?;
2091/// }
2092/// # Ok(())
2093/// # }
2094/// ```
2095pub struct BatchBuilder<'a> {
2096    batch: usize,
2097    max_bytes: usize,
2098    heartbeat: Duration,
2099    expires: Duration,
2100    min_pending: Option<usize>,
2101    min_ack_pending: Option<usize>,
2102    group: Option<String>,
2103    consumer: &'a Consumer<Config>,
2104}
2105
2106impl<'a> BatchBuilder<'a> {
2107    pub fn new(consumer: &'a Consumer<Config>) -> Self {
2108        BatchBuilder {
2109            consumer,
2110            batch: 200,
2111            max_bytes: 0,
2112            expires: Duration::ZERO,
2113            heartbeat: Duration::default(),
2114            min_pending: None,
2115            min_ack_pending: None,
2116            group: None,
2117        }
2118    }
2119
2120    /// Sets max bytes that can be buffered on the Client while processing already received
2121    /// messages.
2122    /// Higher values will yield better performance, but also potentially increase memory usage if
2123    /// application is acknowledging messages much slower than they arrive.
2124    ///
2125    /// Default values should provide reasonable balance between performance and memory usage.
2126    ///
2127    /// # Examples
2128    ///
2129    /// ```no_run
2130    /// # #[tokio::main]
2131    /// # async fn main() -> Result<(), async_nats::Error>  {
2132    /// use async_nats::jetstream::consumer::PullConsumer;
2133    /// use futures_util::StreamExt;
2134    /// let client = async_nats::connect("localhost:4222").await?;
2135    /// let jetstream = async_nats::jetstream::new(client);
2136    ///
2137    /// let consumer: PullConsumer = jetstream
2138    ///     .get_stream("events")
2139    ///     .await?
2140    ///     .get_consumer("pull")
2141    ///     .await?;
2142    ///
2143    /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2144    ///
2145    /// while let Some(message) = messages.next().await {
2146    ///     let message = message?;
2147    ///     println!("message: {:?}", message);
2148    ///     message.ack().await?;
2149    /// }
2150    /// # Ok(())
2151    /// # }
2152    /// ```
2153    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2154        self.max_bytes = max_bytes;
2155        self
2156    }
2157
2158    /// Sets max number of messages that can be buffered on the Client while processing already received
2159    /// messages.
2160    /// Higher values will yield better performance, but also potentially increase memory usage if
2161    /// application is acknowledging messages much slower than they arrive.
2162    ///
2163    /// Default values should provide reasonable balance between performance and memory usage.
2164    ///
2165    /// # Examples
2166    ///
2167    /// ```no_run
2168    /// # #[tokio::main]
2169    /// # async fn main() -> Result<(), async_nats::Error>  {
2170    /// use async_nats::jetstream::consumer::PullConsumer;
2171    /// use futures_util::StreamExt;
2172    /// let client = async_nats::connect("localhost:4222").await?;
2173    /// let jetstream = async_nats::jetstream::new(client);
2174    ///
2175    /// let consumer: PullConsumer = jetstream
2176    ///     .get_stream("events")
2177    ///     .await?
2178    ///     .get_consumer("pull")
2179    ///     .await?;
2180    ///
2181    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2182    ///
2183    /// while let Some(message) = messages.next().await {
2184    ///     let message = message?;
2185    ///     println!("message: {:?}", message);
2186    ///     message.ack().await?;
2187    /// }
2188    /// # Ok(())
2189    /// # }
2190    /// ```
2191    pub fn max_messages(mut self, batch: usize) -> Self {
2192        self.batch = batch;
2193        self
2194    }
2195
2196    /// Sets heartbeat which will be send by the server if there are no messages for a given
2197    /// [Consumer] pending.
2198    ///
2199    /// # Examples
2200    ///
2201    /// ```no_run
2202    /// # #[tokio::main]
2203    /// # async fn main() -> Result<(), async_nats::Error>  {
2204    /// use async_nats::jetstream::consumer::PullConsumer;
2205    /// use futures_util::StreamExt;
2206    /// let client = async_nats::connect("localhost:4222").await?;
2207    /// let jetstream = async_nats::jetstream::new(client);
2208    ///
2209    /// let consumer: PullConsumer = jetstream
2210    ///     .get_stream("events")
2211    ///     .await?
2212    ///     .get_consumer("pull")
2213    ///     .await?;
2214    ///
2215    /// let mut messages = consumer
2216    ///     .batch()
2217    ///     .heartbeat(std::time::Duration::from_secs(10))
2218    ///     .messages()
2219    ///     .await?;
2220    ///
2221    /// while let Some(message) = messages.next().await {
2222    ///     let message = message?;
2223    ///     println!("message: {:?}", message);
2224    ///     message.ack().await?;
2225    /// }
2226    /// # Ok(())
2227    /// # }
2228    /// ```
2229    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2230        self.heartbeat = heartbeat;
2231        self
2232    }
2233
2234    /// Sets overflow threshold for minimum pending messages before this stream will start getting
2235    /// messages.
2236    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2237    /// [PriorityPolicy::Overflow] set.
2238    ///
2239    /// # Examples
2240    ///
2241    /// ```no_run
2242    /// # #[tokio::main]
2243    /// # async fn main() -> Result<(), async_nats::Error>  {
2244    /// use async_nats::jetstream::consumer::PullConsumer;
2245    /// use futures_util::StreamExt;
2246    ///
2247    /// let client = async_nats::connect("localhost:4222").await?;
2248    /// let jetstream = async_nats::jetstream::new(client);
2249    ///
2250    /// let consumer: PullConsumer = jetstream
2251    ///     .get_stream("events")
2252    ///     .await?
2253    ///     .get_consumer("pull")
2254    ///     .await?;
2255    ///
2256    /// let mut messages = consumer
2257    ///     .batch()
2258    ///     .expires(std::time::Duration::from_secs(30))
2259    ///     .group("A")
2260    ///     .min_pending(100)
2261    ///     .messages()
2262    ///     .await?;
2263    ///
2264    /// while let Some(message) = messages.next().await {
2265    ///     let message = message?;
2266    ///     println!("message: {:?}", message);
2267    ///     message.ack().await?;
2268    /// }
2269    /// # Ok(())
2270    /// # }
2271    /// ```
2272    pub fn min_pending(mut self, min_pending: usize) -> Self {
2273        self.min_pending = Some(min_pending);
2274        self
2275    }
2276
2277    /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2278    /// messages.
2279    /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2280    /// [PriorityPolicy::Overflow] set.
2281    ///
2282    /// # Examples
2283    ///
2284    /// ```no_run
2285    /// # #[tokio::main]
2286    /// # async fn main() -> Result<(), async_nats::Error>  {
2287    /// use async_nats::jetstream::consumer::PullConsumer;
2288    /// use futures_util::StreamExt;
2289    ///
2290    /// let client = async_nats::connect("localhost:4222").await?;
2291    /// let jetstream = async_nats::jetstream::new(client);
2292    ///
2293    /// let consumer: PullConsumer = jetstream
2294    ///     .get_stream("events")
2295    ///     .await?
2296    ///     .get_consumer("pull")
2297    ///     .await?;
2298    ///
2299    /// let mut messages = consumer
2300    ///     .batch()
2301    ///     .expires(std::time::Duration::from_secs(30))
2302    ///     .group("A")
2303    ///     .min_ack_pending(100)
2304    ///     .messages()
2305    ///     .await?;
2306    ///
2307    /// while let Some(message) = messages.next().await {
2308    ///     let message = message?;
2309    ///     println!("message: {:?}", message);
2310    ///     message.ack().await?;
2311    /// }
2312    /// # Ok(())
2313    /// # }
2314    /// ```
2315    pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2316        self.min_ack_pending = Some(min_ack_pending);
2317        self
2318    }
2319
2320    /// Setting group when using [Consumer] with [PriorityPolicy].
2321    ///
2322    /// # Examples
2323    ///
2324    /// ```no_run
2325    /// # #[tokio::main]
2326    /// # async fn main() -> Result<(), async_nats::Error>  {
2327    /// use async_nats::jetstream::consumer::PullConsumer;
2328    /// use futures_util::StreamExt;
2329    ///
2330    /// let client = async_nats::connect("localhost:4222").await?;
2331    /// let jetstream = async_nats::jetstream::new(client);
2332    ///
2333    /// let consumer: PullConsumer = jetstream
2334    ///     .get_stream("events")
2335    ///     .await?
2336    ///     .get_consumer("pull")
2337    ///     .await?;
2338    ///
2339    /// let mut messages = consumer
2340    ///     .batch()
2341    ///     .expires(std::time::Duration::from_secs(30))
2342    ///     .group("A")
2343    ///     .min_ack_pending(100)
2344    ///     .messages()
2345    ///     .await?;
2346    ///
2347    /// while let Some(message) = messages.next().await {
2348    ///     let message = message?;
2349    ///     println!("message: {:?}", message);
2350    ///     message.ack().await?;
2351    /// }
2352    /// # Ok(())
2353    /// # }
2354    /// ```
2355    pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2356        self.group = Some(group.into());
2357        self
2358    }
2359
2360    /// Low level API that does not need tweaking for most use cases.
2361    /// Sets how long each batch request waits for whole batch of messages before timing out.
2362    /// [Consumer] pending.
2363    ///
2364    /// # Examples
2365    ///
2366    /// ```no_run
2367    /// # #[tokio::main]
2368    /// # async fn main() -> Result<(), async_nats::Error>  {
2369    /// use async_nats::jetstream::consumer::PullConsumer;
2370    /// use futures_util::StreamExt;
2371    /// let client = async_nats::connect("localhost:4222").await?;
2372    /// let jetstream = async_nats::jetstream::new(client);
2373    ///
2374    /// let consumer: PullConsumer = jetstream
2375    ///     .get_stream("events")
2376    ///     .await?
2377    ///     .get_consumer("pull")
2378    ///     .await?;
2379    ///
2380    /// let mut messages = consumer
2381    ///     .batch()
2382    ///     .expires(std::time::Duration::from_secs(30))
2383    ///     .messages()
2384    ///     .await?;
2385    ///
2386    /// while let Some(message) = messages.next().await {
2387    ///     let message = message?;
2388    ///     println!("message: {:?}", message);
2389    ///     message.ack().await?;
2390    /// }
2391    /// # Ok(())
2392    /// # }
2393    /// ```
2394    pub fn expires(mut self, expires: Duration) -> Self {
2395        self.expires = expires;
2396        self
2397    }
2398
2399    /// Creates actual [Stream] with provided configuration.
2400    ///
2401    /// # Examples
2402    ///
2403    /// ```no_run
2404    /// # #[tokio::main]
2405    /// # async fn main() -> Result<(), async_nats::Error>  {
2406    /// use async_nats::jetstream::consumer::PullConsumer;
2407    /// use futures_util::StreamExt;
2408    /// let client = async_nats::connect("localhost:4222").await?;
2409    /// let jetstream = async_nats::jetstream::new(client);
2410    ///
2411    /// let consumer: PullConsumer = jetstream
2412    ///     .get_stream("events")
2413    ///     .await?
2414    ///     .get_consumer("pull")
2415    ///     .await?;
2416    ///
2417    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2418    ///
2419    /// while let Some(message) = messages.next().await {
2420    ///     let message = message?;
2421    ///     println!("message: {:?}", message);
2422    ///     message.ack().await?;
2423    /// }
2424    /// # Ok(())
2425    /// # }
2426    /// ```
2427    pub async fn messages(self) -> Result<Batch, BatchError> {
2428        let config = BatchConfig {
2429            batch: self.batch,
2430            expires: Some(self.expires),
2431            no_wait: false,
2432            max_bytes: self.max_bytes,
2433            idle_heartbeat: self.heartbeat,
2434            min_pending: self.min_pending,
2435            min_ack_pending: self.min_ack_pending,
2436            group: self.group,
2437            #[cfg(feature = "server_2_12")]
2438            priority: None,
2439        };
2440        Batch::batch(config, self.consumer).await
2441    }
2442}
2443
2444/// Used for next Pull Request for Pull Consumer
2445#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2446pub struct BatchConfig {
2447    /// The number of messages that are being requested to be delivered.
2448    pub batch: usize,
2449    /// The optional number of nanoseconds that the server will store this next request for
2450    /// before forgetting about the pending batch size.
2451    #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2452    pub expires: Option<Duration>,
2453    /// This optionally causes the server not to store this pending request at all, but when there are no
2454    /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2455    /// can know when you reached the end of the stream for example. A 409 is returned if the
2456    /// Consumer has reached MaxAckPending limits.
2457    #[serde(skip_serializing_if = "is_default")]
2458    pub no_wait: bool,
2459
2460    /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2461    /// Whichever value is reached first, batch will complete.
2462    pub max_bytes: usize,
2463
2464    /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2465    /// client
2466    #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2467    pub idle_heartbeat: Duration,
2468
2469    pub min_pending: Option<usize>,
2470    pub min_ack_pending: Option<usize>,
2471    pub group: Option<String>,
2472    #[cfg(feature = "server_2_12")]
2473    pub priority: Option<usize>,
2474}
2475
2476fn is_default<T: Default + Eq>(t: &T) -> bool {
2477    t == &T::default()
2478}
2479
2480#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2481pub struct Config {
2482    /// Setting `durable_name` to `Some(...)` will cause this consumer
2483    /// to be "durable". This may be a good choice for workloads that
2484    /// benefit from the `JetStream` server or cluster remembering the
2485    /// progress of consumers for fault tolerance purposes. If a consumer
2486    /// crashes, the `JetStream` server or cluster will remember which
2487    /// messages the consumer acknowledged. When the consumer recovers,
2488    /// this information will allow the consumer to resume processing
2489    /// where it left off. If you're unsure, set this to `Some(...)`.
2490    ///
2491    /// Setting `durable_name` to `None` will cause this consumer to
2492    /// be "ephemeral". This may be a good choice for workloads where
2493    /// you don't need the `JetStream` server to remember the consumer's
2494    /// progress in the case of a crash, such as certain "high churn"
2495    /// workloads or workloads where a crashed instance is not required
2496    /// to recover.
2497    #[serde(default, skip_serializing_if = "Option::is_none")]
2498    pub durable_name: Option<String>,
2499    /// A name of the consumer. Can be specified for both durable and ephemeral
2500    /// consumers.
2501    #[serde(default, skip_serializing_if = "Option::is_none")]
2502    pub name: Option<String>,
2503    /// A short description of the purpose of this consumer.
2504    #[serde(default, skip_serializing_if = "Option::is_none")]
2505    pub description: Option<String>,
2506    /// Allows for a variety of options that determine how this consumer will receive messages
2507    #[serde(flatten)]
2508    pub deliver_policy: DeliverPolicy,
2509    /// How messages should be acknowledged
2510    pub ack_policy: AckPolicy,
2511    /// How long to allow messages to remain un-acknowledged before attempting redelivery
2512    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2513    pub ack_wait: Duration,
2514    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2515    #[serde(default, skip_serializing_if = "is_default")]
2516    pub max_deliver: i64,
2517    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2518    #[serde(default, skip_serializing_if = "is_default")]
2519    pub filter_subject: String,
2520    #[cfg(feature = "server_2_10")]
2521    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2522    #[serde(default, skip_serializing_if = "is_default")]
2523    pub filter_subjects: Vec<String>,
2524    /// Whether messages are sent as quickly as possible or at the rate of receipt
2525    pub replay_policy: ReplayPolicy,
2526    /// The rate of message delivery in bits per second
2527    #[serde(default, skip_serializing_if = "is_default")]
2528    pub rate_limit: u64,
2529    /// What percentage of acknowledgments should be samples for observability, 0-100
2530    #[serde(
2531        rename = "sample_freq",
2532        with = "super::sample_freq_deser",
2533        default,
2534        skip_serializing_if = "is_default"
2535    )]
2536    pub sample_frequency: u8,
2537    /// The maximum number of waiting consumers.
2538    #[serde(default, skip_serializing_if = "is_default")]
2539    pub max_waiting: i64,
2540    /// The maximum number of unacknowledged messages that may be
2541    /// in-flight before pausing sending additional messages to
2542    /// this consumer.
2543    #[serde(default, skip_serializing_if = "is_default")]
2544    pub max_ack_pending: i64,
2545    /// Only deliver headers without payloads.
2546    #[serde(default, skip_serializing_if = "is_default")]
2547    pub headers_only: bool,
2548    /// Maximum size of a request batch
2549    #[serde(default, skip_serializing_if = "is_default")]
2550    pub max_batch: i64,
2551    /// Maximum value of request max_bytes
2552    #[serde(default, skip_serializing_if = "is_default")]
2553    pub max_bytes: i64,
2554    /// Maximum value for request expiration
2555    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2556    pub max_expires: Duration,
2557    /// Threshold for consumer inactivity
2558    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2559    pub inactive_threshold: Duration,
2560    /// Number of consumer replicas
2561    #[serde(default, skip_serializing_if = "is_default")]
2562    pub num_replicas: usize,
2563    /// Force consumer to use memory storage.
2564    #[serde(default, skip_serializing_if = "is_default")]
2565    pub memory_storage: bool,
2566    #[cfg(feature = "server_2_10")]
2567    // Additional consumer metadata.
2568    #[serde(default, skip_serializing_if = "is_default")]
2569    pub metadata: HashMap<String, String>,
2570    /// Custom backoff for missed acknowledgments.
2571    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2572    pub backoff: Vec<Duration>,
2573
2574    /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2575    #[cfg(feature = "server_2_11")]
2576    #[serde(default, skip_serializing_if = "is_default")]
2577    pub priority_policy: PriorityPolicy,
2578    /// Priority groups for this consumer. Currently only one group is supported and is used
2579    /// in conjunction with [Config::priority_policy].
2580    #[cfg(feature = "server_2_11")]
2581    #[serde(default, skip_serializing_if = "is_default")]
2582    pub priority_groups: Vec<String>,
2583    /// For suspending the consumer until the deadline.
2584    #[cfg(feature = "server_2_11")]
2585    #[serde(
2586        default,
2587        with = "rfc3339::option",
2588        skip_serializing_if = "Option::is_none"
2589    )]
2590    pub pause_until: Option<OffsetDateTime>,
2591}
2592
2593impl IntoConsumerConfig for &Config {
2594    fn into_consumer_config(self) -> consumer::Config {
2595        self.clone().into_consumer_config()
2596    }
2597}
2598
2599impl IntoConsumerConfig for Config {
2600    fn into_consumer_config(self) -> consumer::Config {
2601        jetstream::consumer::Config {
2602            deliver_subject: None,
2603            name: self.name,
2604            durable_name: self.durable_name,
2605            description: self.description,
2606            deliver_group: None,
2607            deliver_policy: self.deliver_policy,
2608            ack_policy: self.ack_policy,
2609            ack_wait: self.ack_wait,
2610            max_deliver: self.max_deliver,
2611            filter_subject: self.filter_subject,
2612            #[cfg(feature = "server_2_10")]
2613            filter_subjects: self.filter_subjects,
2614            replay_policy: self.replay_policy,
2615            rate_limit: self.rate_limit,
2616            sample_frequency: self.sample_frequency,
2617            max_waiting: self.max_waiting,
2618            max_ack_pending: self.max_ack_pending,
2619            headers_only: self.headers_only,
2620            flow_control: false,
2621            idle_heartbeat: Duration::default(),
2622            max_batch: self.max_batch,
2623            max_bytes: self.max_bytes,
2624            max_expires: self.max_expires,
2625            inactive_threshold: self.inactive_threshold,
2626            num_replicas: self.num_replicas,
2627            memory_storage: self.memory_storage,
2628            #[cfg(feature = "server_2_10")]
2629            metadata: self.metadata,
2630            backoff: self.backoff,
2631            #[cfg(feature = "server_2_11")]
2632            priority_policy: self.priority_policy,
2633            #[cfg(feature = "server_2_11")]
2634            priority_groups: self.priority_groups,
2635            #[cfg(feature = "server_2_11")]
2636            pause_until: self.pause_until,
2637        }
2638    }
2639}
2640impl FromConsumer for Config {
2641    fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2642        if config.deliver_subject.is_some() {
2643            return Err(Box::new(std::io::Error::other(
2644                "pull consumer cannot have delivery subject",
2645            )));
2646        }
2647        Ok(Config {
2648            durable_name: config.durable_name,
2649            name: config.name,
2650            description: config.description,
2651            deliver_policy: config.deliver_policy,
2652            ack_policy: config.ack_policy,
2653            ack_wait: config.ack_wait,
2654            max_deliver: config.max_deliver,
2655            filter_subject: config.filter_subject,
2656            #[cfg(feature = "server_2_10")]
2657            filter_subjects: config.filter_subjects,
2658            replay_policy: config.replay_policy,
2659            rate_limit: config.rate_limit,
2660            sample_frequency: config.sample_frequency,
2661            max_waiting: config.max_waiting,
2662            max_ack_pending: config.max_ack_pending,
2663            headers_only: config.headers_only,
2664            max_batch: config.max_batch,
2665            max_bytes: config.max_bytes,
2666            max_expires: config.max_expires,
2667            inactive_threshold: config.inactive_threshold,
2668            num_replicas: config.num_replicas,
2669            memory_storage: config.memory_storage,
2670            #[cfg(feature = "server_2_10")]
2671            metadata: config.metadata,
2672            backoff: config.backoff,
2673            #[cfg(feature = "server_2_11")]
2674            priority_policy: config.priority_policy,
2675            #[cfg(feature = "server_2_11")]
2676            priority_groups: config.priority_groups,
2677            #[cfg(feature = "server_2_11")]
2678            pause_until: config.pause_until,
2679        })
2680    }
2681}
2682
2683#[derive(Clone, Copy, Debug, PartialEq)]
2684pub enum BatchRequestErrorKind {
2685    Publish,
2686    Flush,
2687    Serialize,
2688}
2689
2690impl std::fmt::Display for BatchRequestErrorKind {
2691    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2692        match self {
2693            Self::Publish => write!(f, "publish failed"),
2694            Self::Flush => write!(f, "flush failed"),
2695            Self::Serialize => write!(f, "serialize failed"),
2696        }
2697    }
2698}
2699
2700pub type BatchRequestError = Error<BatchRequestErrorKind>;
2701
2702#[derive(Clone, Copy, Debug, PartialEq)]
2703pub enum BatchErrorKind {
2704    Subscribe,
2705    Pull,
2706    Flush,
2707    Serialize,
2708}
2709
2710impl std::fmt::Display for BatchErrorKind {
2711    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2712        match self {
2713            Self::Pull => write!(f, "pull request failed"),
2714            Self::Flush => write!(f, "flush failed"),
2715            Self::Serialize => write!(f, "serialize failed"),
2716            Self::Subscribe => write!(f, "subscribe failed"),
2717        }
2718    }
2719}
2720
2721pub type BatchError = Error<BatchErrorKind>;
2722
2723impl From<SubscribeError> for BatchError {
2724    fn from(err: SubscribeError) -> Self {
2725        BatchError::with_source(BatchErrorKind::Subscribe, err)
2726    }
2727}
2728
2729impl From<BatchRequestError> for BatchError {
2730    fn from(err: BatchRequestError) -> Self {
2731        BatchError::with_source(BatchErrorKind::Pull, err)
2732    }
2733}
2734
2735#[derive(Clone, Copy, Debug, PartialEq)]
2736pub enum ConsumerRecreateErrorKind {
2737    GetStream,
2738    Recreate,
2739    TimedOut,
2740}
2741
2742impl std::fmt::Display for ConsumerRecreateErrorKind {
2743    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2744        match self {
2745            Self::GetStream => write!(f, "error getting stream"),
2746            Self::Recreate => write!(f, "consumer creation failed"),
2747            Self::TimedOut => write!(f, "timed out"),
2748        }
2749    }
2750}
2751
2752pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2753
2754async fn recreate_consumer_stream(
2755    context: &Context,
2756    config: &OrderedConfig,
2757    stream_name: &str,
2758    consumer_name: &str,
2759    sequence: u64,
2760) -> Result<Stream, ConsumerRecreateError> {
2761    let span = tracing::span!(
2762        tracing::Level::DEBUG,
2763        "recreate_ordered_consumer",
2764        stream_name = stream_name,
2765        consumer_name = consumer_name,
2766        sequence = sequence
2767    );
2768    let _span_handle = span.enter();
2769    let config = config.to_owned();
2770    trace!("delete old consumer before creating new one");
2771
2772    tokio::time::timeout(
2773        Duration::from_secs(5),
2774        context.delete_consumer_from_stream(consumer_name, stream_name),
2775    )
2776    .await
2777    .ok();
2778
2779    let deliver_policy = {
2780        if sequence == 0 {
2781            DeliverPolicy::All
2782        } else {
2783            DeliverPolicy::ByStartSequence {
2784                start_sequence: sequence + 1,
2785            }
2786        }
2787    };
2788    trace!("create the new ordered consumer for sequence {}", sequence);
2789    let consumer = tokio::time::timeout(
2790        Duration::from_secs(5),
2791        context.create_consumer_on_stream(
2792            jetstream::consumer::pull::OrderedConfig {
2793                deliver_policy,
2794                ..config.clone()
2795            },
2796            stream_name,
2797        ),
2798    )
2799    .await
2800    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2801    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2802
2803    let config = Consumer {
2804        config: config.clone().into(),
2805        context: context.clone(),
2806        info: consumer.info,
2807    };
2808
2809    trace!("create iterator");
2810    let stream = tokio::time::timeout(
2811        Duration::from_secs(5),
2812        Stream::stream(
2813            BatchConfig {
2814                batch: 500,
2815                expires: Some(Duration::from_secs(30)),
2816                no_wait: false,
2817                max_bytes: 0,
2818                idle_heartbeat: Duration::from_secs(15),
2819                min_pending: None,
2820                min_ack_pending: None,
2821                group: None,
2822                #[cfg(feature = "server_2_12")]
2823                priority: None,
2824            },
2825            &config,
2826        ),
2827    )
2828    .await
2829    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2830    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2831    trace!("recreated consumer");
2832    stream
2833}