async_nats_wrpc/jetstream/consumer/
pull.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16    future::{BoxFuture, Either},
17    FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_10")]
21use std::collections::HashMap;
22use std::{future, pin::Pin, task::Poll, time::Duration};
23use tokio::{task::JoinHandle, time::Sleep};
24
25use serde::{Deserialize, Serialize};
26use tracing::{debug, trace};
27
28use crate::{
29    connection::State,
30    error::Error,
31    jetstream::{self, Context},
32    StatusCode, SubscribeError, Subscriber,
33};
34
35use crate::subject::Subject;
36
37use super::{
38    AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
39    StreamError, StreamErrorKind,
40};
41use jetstream::consumer;
42
43impl Consumer<Config> {
44    /// Returns a stream of messages for Pull Consumer.
45    ///
46    /// # Example
47    ///
48    /// ```no_run
49    /// # #[tokio::main]
50    /// # async fn mains() -> Result<(), async_nats::Error> {
51    /// use futures::StreamExt;
52    /// use futures::TryStreamExt;
53    ///
54    /// let client = async_nats::connect("localhost:4222").await?;
55    /// let jetstream = async_nats::jetstream::new(client);
56    ///
57    /// let stream = jetstream
58    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
59    ///         name: "events".to_string(),
60    ///         max_messages: 10_000,
61    ///         ..Default::default()
62    ///     })
63    ///     .await?;
64    ///
65    /// jetstream.publish("events", "data".into()).await?;
66    ///
67    /// let consumer = stream
68    ///     .get_or_create_consumer(
69    ///         "consumer",
70    ///         async_nats::jetstream::consumer::pull::Config {
71    ///             durable_name: Some("consumer".to_string()),
72    ///             ..Default::default()
73    ///         },
74    ///     )
75    ///     .await?;
76    ///
77    /// let mut messages = consumer.messages().await?.take(100);
78    /// while let Some(Ok(message)) = messages.next().await {
79    ///     println!("got message {:?}", message);
80    ///     message.ack().await?;
81    /// }
82    /// Ok(())
83    /// # }
84    /// ```
85    pub async fn messages(&self) -> Result<Stream, StreamError> {
86        Stream::stream(
87            BatchConfig {
88                batch: 200,
89                expires: Some(Duration::from_secs(30)),
90                no_wait: false,
91                max_bytes: 0,
92                idle_heartbeat: Duration::from_secs(15),
93            },
94            self,
95        )
96        .await
97    }
98
99    /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
100    /// messages or bytes buffered.
101    ///
102    /// # Examples
103    ///
104    /// ```no_run
105    /// # #[tokio::main]
106    /// # async fn main() -> Result<(), async_nats::Error>  {
107    /// use async_nats::jetstream::consumer::PullConsumer;
108    /// use futures::StreamExt;
109    /// let client = async_nats::connect("localhost:4222").await?;
110    /// let jetstream = async_nats::jetstream::new(client);
111    ///
112    /// let consumer: PullConsumer = jetstream
113    ///     .get_stream("events")
114    ///     .await?
115    ///     .get_consumer("pull")
116    ///     .await?;
117    ///
118    /// let mut messages = consumer
119    ///     .stream()
120    ///     .max_messages_per_batch(100)
121    ///     .max_bytes_per_batch(1024)
122    ///     .messages()
123    ///     .await?;
124    ///
125    /// while let Some(message) = messages.next().await {
126    ///     let message = message?;
127    ///     println!("message: {:?}", message);
128    ///     message.ack().await?;
129    /// }
130    /// # Ok(())
131    /// # }
132    /// ```
133    pub fn stream(&self) -> StreamBuilder<'_> {
134        StreamBuilder::new(self)
135    }
136
137    pub(crate) async fn request_batch<I: Into<BatchConfig>>(
138        &self,
139        batch: I,
140        inbox: Subject,
141    ) -> Result<(), BatchRequestError> {
142        debug!("sending batch");
143        let subject = format!(
144            "{}.CONSUMER.MSG.NEXT.{}.{}",
145            self.context.prefix, self.info.stream_name, self.info.name
146        );
147
148        let payload = serde_json::to_vec(&batch.into())
149            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
150
151        self.context
152            .client
153            .publish_with_reply(subject, inbox, payload.into())
154            .await
155            .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
156        debug!("batch request sent");
157        Ok(())
158    }
159
160    /// Returns a batch of specified number of messages, or if there are less messages on the
161    /// [Stream] than requested, returns all available messages.
162    ///
163    /// # Example
164    ///
165    /// ```no_run
166    /// # #[tokio::main]
167    /// # async fn mains() -> Result<(), async_nats::Error> {
168    /// use futures::StreamExt;
169    /// use futures::TryStreamExt;
170    ///
171    /// let client = async_nats::connect("localhost:4222").await?;
172    /// let jetstream = async_nats::jetstream::new(client);
173    ///
174    /// let stream = jetstream
175    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
176    ///         name: "events".to_string(),
177    ///         max_messages: 10_000,
178    ///         ..Default::default()
179    ///     })
180    ///     .await?;
181    ///
182    /// jetstream.publish("events", "data".into()).await?;
183    ///
184    /// let consumer = stream
185    ///     .get_or_create_consumer(
186    ///         "consumer",
187    ///         async_nats::jetstream::consumer::pull::Config {
188    ///             durable_name: Some("consumer".to_string()),
189    ///             ..Default::default()
190    ///         },
191    ///     )
192    ///     .await?;
193    ///
194    /// for _ in 0..100 {
195    ///     jetstream.publish("events", "data".into()).await?;
196    /// }
197    ///
198    /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
199    /// // will finish after 100 messages, as that is the number of messages available on the
200    /// // stream.
201    /// while let Some(Ok(message)) = messages.next().await {
202    ///     println!("got message {:?}", message);
203    ///     message.ack().await?;
204    /// }
205    /// Ok(())
206    /// # }
207    /// ```
208    pub fn fetch(&self) -> FetchBuilder {
209        FetchBuilder::new(self)
210    }
211
212    /// Returns a batch of specified number of messages unless timeout happens first.
213    ///
214    /// # Example
215    ///
216    /// ```no_run
217    /// # #[tokio::main]
218    /// # async fn mains() -> Result<(), async_nats::Error> {
219    /// use futures::StreamExt;
220    /// use futures::TryStreamExt;
221    ///
222    /// let client = async_nats::connect("localhost:4222").await?;
223    /// let jetstream = async_nats::jetstream::new(client);
224    ///
225    /// let stream = jetstream
226    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
227    ///         name: "events".to_string(),
228    ///         max_messages: 10_000,
229    ///         ..Default::default()
230    ///     })
231    ///     .await?;
232    ///
233    /// jetstream.publish("events", "data".into()).await?;
234    ///
235    /// let consumer = stream
236    ///     .get_or_create_consumer(
237    ///         "consumer",
238    ///         async_nats::jetstream::consumer::pull::Config {
239    ///             durable_name: Some("consumer".to_string()),
240    ///             ..Default::default()
241    ///         },
242    ///     )
243    ///     .await?;
244    ///
245    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
246    /// while let Some(Ok(message)) = messages.next().await {
247    ///     println!("got message {:?}", message);
248    ///     message.ack().await?;
249    /// }
250    /// Ok(())
251    /// # }
252    /// ```
253    pub fn batch(&self) -> BatchBuilder {
254        BatchBuilder::new(self)
255    }
256
257    /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
258    /// messages in those batches.
259    ///
260    /// # Example
261    ///
262    /// ```no_run
263    /// # #[tokio::main]
264    /// # async fn mains() -> Result<(), async_nats::Error> {
265    /// use futures::StreamExt;
266    /// use futures::TryStreamExt;
267    ///
268    /// let client = async_nats::connect("localhost:4222").await?;
269    /// let jetstream = async_nats::jetstream::new(client);
270    ///
271    /// let stream = jetstream
272    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
273    ///         name: "events".to_string(),
274    ///         max_messages: 10_000,
275    ///         ..Default::default()
276    ///     })
277    ///     .await?;
278    ///
279    /// jetstream.publish("events", "data".into()).await?;
280    ///
281    /// let consumer = stream
282    ///     .get_or_create_consumer(
283    ///         "consumer",
284    ///         async_nats::jetstream::consumer::pull::Config {
285    ///             durable_name: Some("consumer".to_string()),
286    ///             ..Default::default()
287    ///         },
288    ///     )
289    ///     .await?;
290    ///
291    /// let mut iter = consumer.sequence(50).unwrap().take(10);
292    /// while let Ok(Some(mut batch)) = iter.try_next().await {
293    ///     while let Ok(Some(message)) = batch.try_next().await {
294    ///         println!("message received: {:?}", message);
295    ///     }
296    /// }
297    /// Ok(())
298    /// # }
299    /// ```
300    pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
301        let context = self.context.clone();
302        let subject = format!(
303            "{}.CONSUMER.MSG.NEXT.{}.{}",
304            self.context.prefix, self.info.stream_name, self.info.name
305        );
306
307        let request = serde_json::to_vec(&BatchConfig {
308            batch,
309            expires: Some(Duration::from_secs(60)),
310            ..Default::default()
311        })
312        .map(Bytes::from)
313        .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
314
315        Ok(Sequence {
316            context,
317            subject,
318            request,
319            pending_messages: batch,
320            next: None,
321        })
322    }
323}
324
325pub struct Batch {
326    pending_messages: usize,
327    subscriber: Subscriber,
328    context: Context,
329    timeout: Option<Pin<Box<Sleep>>>,
330    terminated: bool,
331}
332
333impl<'a> Batch {
334    async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
335        let inbox = Subject::from(consumer.context.client.new_inbox());
336        let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
337        consumer.request_batch(batch, inbox.clone()).await?;
338
339        let sleep = batch.expires.map(|expires| {
340            Box::pin(tokio::time::sleep(
341                expires.saturating_add(Duration::from_secs(5)),
342            ))
343        });
344
345        Ok(Batch {
346            pending_messages: batch.batch,
347            subscriber: subscription,
348            context: consumer.context.clone(),
349            terminated: false,
350            timeout: sleep,
351        })
352    }
353}
354
355impl futures::Stream for Batch {
356    type Item = Result<jetstream::Message, crate::Error>;
357
358    fn poll_next(
359        mut self: std::pin::Pin<&mut Self>,
360        cx: &mut std::task::Context<'_>,
361    ) -> std::task::Poll<Option<Self::Item>> {
362        if self.terminated {
363            return Poll::Ready(None);
364        }
365        if self.pending_messages == 0 {
366            self.terminated = true;
367            return Poll::Ready(None);
368        }
369        if let Some(sleep) = self.timeout.as_mut() {
370            match sleep.poll_unpin(cx) {
371                Poll::Ready(_) => {
372                    debug!("batch timeout timer triggered");
373                    // TODO(tp): Maybe we can be smarter here and before timing out, check if
374                    // we consumed all the messages from the subscription buffer in case of user
375                    // slowly consuming messages. Keep in mind that we time out here only if
376                    // for some reason we missed timeout from the server and few seconds have
377                    // passed since expected timeout message.
378                    self.terminated = true;
379                    return Poll::Ready(None);
380                }
381                Poll::Pending => (),
382            }
383        }
384        match self.subscriber.receiver.poll_recv(cx) {
385            Poll::Ready(maybe_message) => match maybe_message {
386                Some(message) => match message.status.unwrap_or(StatusCode::OK) {
387                    StatusCode::TIMEOUT => {
388                        debug!("received timeout. Iterator done");
389                        self.terminated = true;
390                        Poll::Ready(None)
391                    }
392                    StatusCode::IDLE_HEARTBEAT => {
393                        debug!("received heartbeat");
394                        Poll::Pending
395                    }
396                    // If this is fetch variant, terminate on no more messages.
397                    // We do not need to check if this is a fetch, not batch,
398                    // as only fetch will send back `NO_MESSAGES` status.
399                    StatusCode::NOT_FOUND => {
400                        debug!("received `NO_MESSAGES`. Iterator done");
401                        self.terminated = true;
402                        Poll::Ready(None)
403                    }
404                    StatusCode::OK => {
405                        debug!("received message");
406                        self.pending_messages -= 1;
407                        Poll::Ready(Some(Ok(jetstream::Message {
408                            context: self.context.clone(),
409                            message,
410                        })))
411                    }
412                    status => {
413                        debug!("received error");
414                        self.terminated = true;
415                        Poll::Ready(Some(Err(Box::new(std::io::Error::new(
416                            std::io::ErrorKind::Other,
417                            format!(
418                                "error while processing messages from the stream: {}, {:?}",
419                                status, message.description
420                            ),
421                        )))))
422                    }
423                },
424                None => Poll::Ready(None),
425            },
426            std::task::Poll::Pending => std::task::Poll::Pending,
427        }
428    }
429}
430
431pub struct Sequence {
432    context: Context,
433    subject: String,
434    request: Bytes,
435    pending_messages: usize,
436    next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
437}
438
439impl futures::Stream for Sequence {
440    type Item = Result<Batch, MessagesError>;
441
442    fn poll_next(
443        mut self: std::pin::Pin<&mut Self>,
444        cx: &mut std::task::Context<'_>,
445    ) -> std::task::Poll<Option<Self::Item>> {
446        match self.next.as_mut() {
447            None => {
448                let context = self.context.clone();
449                let subject = self.subject.clone();
450                let request = self.request.clone();
451                let pending_messages = self.pending_messages;
452
453                let next = self.next.insert(Box::pin(async move {
454                    let inbox = context.client.new_inbox();
455                    let subscriber = context
456                        .client
457                        .subscribe(inbox.clone())
458                        .await
459                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
460
461                    context
462                        .client
463                        .publish_with_reply(subject, inbox, request)
464                        .await
465                        .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
466
467                    // TODO(tp): Add timeout config and defaults.
468                    Ok(Batch {
469                        pending_messages,
470                        subscriber,
471                        context,
472                        terminated: false,
473                        timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
474                    })
475                }));
476
477                match next.as_mut().poll(cx) {
478                    Poll::Ready(result) => {
479                        self.next = None;
480                        Poll::Ready(Some(result.map_err(|err| {
481                            MessagesError::with_source(MessagesErrorKind::Pull, err)
482                        })))
483                    }
484                    Poll::Pending => Poll::Pending,
485                }
486            }
487
488            Some(next) => match next.as_mut().poll(cx) {
489                Poll::Ready(result) => {
490                    self.next = None;
491                    Poll::Ready(Some(result.map_err(|err| {
492                        MessagesError::with_source(MessagesErrorKind::Pull, err)
493                    })))
494                }
495                Poll::Pending => Poll::Pending,
496            },
497        }
498    }
499}
500
501impl Consumer<OrderedConfig> {
502    /// Returns a stream of messages for Ordered Pull Consumer.
503    ///
504    /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
505    /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
506    /// sees mismatch.
507    ///
508    /// # Example
509    ///
510    /// ```no_run
511    /// # #[tokio::main]
512    /// # async fn mains() -> Result<(), async_nats::Error> {
513    /// use futures::StreamExt;
514    /// use futures::TryStreamExt;
515    ///
516    /// let client = async_nats::connect("localhost:4222").await?;
517    /// let jetstream = async_nats::jetstream::new(client);
518    ///
519    /// let stream = jetstream
520    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
521    ///         name: "events".to_string(),
522    ///         max_messages: 10_000,
523    ///         ..Default::default()
524    ///     })
525    ///     .await?;
526    ///
527    /// jetstream.publish("events", "data".into()).await?;
528    ///
529    /// let consumer = stream
530    ///     .get_or_create_consumer(
531    ///         "consumer",
532    ///         async_nats::jetstream::consumer::pull::OrderedConfig {
533    ///             name: Some("consumer".to_string()),
534    ///             ..Default::default()
535    ///         },
536    ///     )
537    ///     .await?;
538    ///
539    /// let mut messages = consumer.messages().await?.take(100);
540    /// while let Some(Ok(message)) = messages.next().await {
541    ///     println!("got message {:?}", message);
542    /// }
543    /// Ok(())
544    /// # }
545    /// ```
546    pub async fn messages(self) -> Result<Ordered, StreamError> {
547        let config = Consumer {
548            config: self.config.clone().into(),
549            context: self.context.clone(),
550            info: self.info.clone(),
551        };
552        let stream = Stream::stream(
553            BatchConfig {
554                batch: 500,
555                expires: Some(Duration::from_secs(30)),
556                no_wait: false,
557                max_bytes: 0,
558                idle_heartbeat: Duration::from_secs(15),
559            },
560            &config,
561        )
562        .await?;
563
564        Ok(Ordered {
565            consumer_sequence: 0,
566            stream_sequence: 0,
567            missed_heartbeats: false,
568            create_stream: None,
569            context: self.context.clone(),
570            consumer_name: self
571                .config
572                .name
573                .clone()
574                .unwrap_or_else(|| self.context.client.new_inbox()),
575            consumer: self.config,
576            stream: Some(stream),
577            stream_name: self.info.stream_name.clone(),
578        })
579    }
580}
581
582/// Configuration for consumers. From a high level, the
583/// `durable_name` and `deliver_subject` fields have a particularly
584/// strong influence on the consumer's overall behavior.
585#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
586pub struct OrderedConfig {
587    /// A name of the consumer. Can be specified for both durable and ephemeral
588    /// consumers.
589    #[serde(default, skip_serializing_if = "Option::is_none")]
590    pub name: Option<String>,
591    /// A short description of the purpose of this consumer.
592    #[serde(default, skip_serializing_if = "Option::is_none")]
593    pub description: Option<String>,
594    #[serde(default, skip_serializing_if = "is_default")]
595    pub filter_subject: String,
596    #[cfg(feature = "server_2_10")]
597    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
598    #[serde(default, skip_serializing_if = "is_default")]
599    pub filter_subjects: Vec<String>,
600    /// Whether messages are sent as quickly as possible or at the rate of receipt
601    pub replay_policy: ReplayPolicy,
602    /// The rate of message delivery in bits per second
603    #[serde(default, skip_serializing_if = "is_default")]
604    pub rate_limit: u64,
605    /// What percentage of acknowledgments should be samples for observability, 0-100
606    #[serde(default, skip_serializing_if = "is_default")]
607    pub sample_frequency: u8,
608    /// Only deliver headers without payloads.
609    #[serde(default, skip_serializing_if = "is_default")]
610    pub headers_only: bool,
611    /// Allows for a variety of options that determine how this consumer will receive messages
612    #[serde(flatten)]
613    pub deliver_policy: DeliverPolicy,
614    /// The maximum number of waiting consumers.
615    #[serde(default, skip_serializing_if = "is_default")]
616    pub max_waiting: i64,
617    #[cfg(feature = "server_2_10")]
618    // Additional consumer metadata.
619    #[serde(default, skip_serializing_if = "is_default")]
620    pub metadata: HashMap<String, String>,
621    // Maximum number of messages that can be requested in single Pull Request.
622    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
623    // [stream]
624    pub max_batch: i64,
625    // Maximum number of bytes that can be requested in single Pull Request.
626    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
627    // [stream]
628    pub max_bytes: i64,
629    // Maximum expiry that can be set for a single Pull Request.
630    // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
631    // [stream]
632    pub max_expires: Duration,
633}
634
635impl From<OrderedConfig> for Config {
636    fn from(config: OrderedConfig) -> Self {
637        Config {
638            durable_name: None,
639            name: config.name,
640            description: config.description,
641            deliver_policy: config.deliver_policy,
642            ack_policy: AckPolicy::None,
643            ack_wait: Duration::default(),
644            max_deliver: 1,
645            filter_subject: config.filter_subject,
646            #[cfg(feature = "server_2_10")]
647            filter_subjects: config.filter_subjects,
648            replay_policy: config.replay_policy,
649            rate_limit: config.rate_limit,
650            sample_frequency: config.sample_frequency,
651            max_waiting: config.max_waiting,
652            max_ack_pending: 0,
653            headers_only: config.headers_only,
654            max_batch: config.max_batch,
655            max_bytes: config.max_bytes,
656            max_expires: config.max_expires,
657            inactive_threshold: Duration::from_secs(30),
658            num_replicas: 1,
659            memory_storage: true,
660            #[cfg(feature = "server_2_10")]
661            metadata: config.metadata,
662            backoff: Vec::new(),
663        }
664    }
665}
666
667impl FromConsumer for OrderedConfig {
668    fn try_from_consumer_config(
669        config: crate::jetstream::consumer::Config,
670    ) -> Result<Self, crate::Error>
671    where
672        Self: Sized,
673    {
674        Ok(OrderedConfig {
675            name: config.name,
676            description: config.description,
677            filter_subject: config.filter_subject,
678            #[cfg(feature = "server_2_10")]
679            filter_subjects: config.filter_subjects,
680            replay_policy: config.replay_policy,
681            rate_limit: config.rate_limit,
682            sample_frequency: config.sample_frequency,
683            headers_only: config.headers_only,
684            deliver_policy: config.deliver_policy,
685            max_waiting: config.max_waiting,
686            #[cfg(feature = "server_2_10")]
687            metadata: config.metadata,
688            max_batch: config.max_batch,
689            max_bytes: config.max_bytes,
690            max_expires: config.max_expires,
691        })
692    }
693}
694
695impl IntoConsumerConfig for OrderedConfig {
696    fn into_consumer_config(self) -> super::Config {
697        jetstream::consumer::Config {
698            deliver_subject: None,
699            durable_name: None,
700            name: self.name,
701            description: self.description,
702            deliver_group: None,
703            deliver_policy: self.deliver_policy,
704            ack_policy: AckPolicy::None,
705            ack_wait: Duration::default(),
706            max_deliver: 1,
707            filter_subject: self.filter_subject,
708            #[cfg(feature = "server_2_10")]
709            filter_subjects: self.filter_subjects,
710            replay_policy: self.replay_policy,
711            rate_limit: self.rate_limit,
712            sample_frequency: self.sample_frequency,
713            max_waiting: self.max_waiting,
714            max_ack_pending: 0,
715            headers_only: self.headers_only,
716            flow_control: false,
717            idle_heartbeat: Duration::default(),
718            max_batch: 0,
719            max_bytes: 0,
720            max_expires: Duration::default(),
721            inactive_threshold: Duration::from_secs(30),
722            num_replicas: 1,
723            memory_storage: true,
724            #[cfg(feature = "server_2_10")]
725            metadata: self.metadata,
726            backoff: Vec::new(),
727        }
728    }
729}
730
731pub struct Ordered {
732    context: Context,
733    stream_name: String,
734    consumer: OrderedConfig,
735    consumer_name: String,
736    stream: Option<Stream>,
737    create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
738    consumer_sequence: u64,
739    stream_sequence: u64,
740    missed_heartbeats: bool,
741}
742
743impl futures::Stream for Ordered {
744    type Item = Result<jetstream::Message, OrderedError>;
745
746    fn poll_next(
747        mut self: Pin<&mut Self>,
748        cx: &mut std::task::Context<'_>,
749    ) -> Poll<Option<Self::Item>> {
750        let mut recreate = false;
751        // Poll messages
752        if let Some(stream) = self.stream.as_mut() {
753            match stream.poll_next_unpin(cx) {
754                Poll::Ready(message) => match message {
755                    Some(message) => match message {
756                        Ok(message) => {
757                            self.missed_heartbeats = false;
758                            let info = message.info().map_err(|err| {
759                                OrderedError::with_source(OrderedErrorKind::Other, err)
760                            })?;
761                            trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
762                                           self.consumer_sequence,
763                                           self.stream_sequence,
764                                           info.consumer_sequence,
765                                           info.stream_sequence);
766                            if info.consumer_sequence != self.consumer_sequence + 1 {
767                                debug!(
768                                    "ordered consumer mismatch. current {}, info: {}",
769                                    self.consumer_sequence, info.consumer_sequence
770                                );
771                                recreate = true;
772                                self.consumer_sequence = 0;
773                            } else {
774                                self.stream_sequence = info.stream_sequence;
775                                self.consumer_sequence = info.consumer_sequence;
776                                return Poll::Ready(Some(Ok(message)));
777                            }
778                        }
779                        Err(err) => match err.kind() {
780                            MessagesErrorKind::MissingHeartbeat => {
781                                // If we have missed heartbeats set, it means this is a second
782                                // missed heartbeat, so we need to recreate consumer.
783                                if self.missed_heartbeats {
784                                    self.consumer_sequence = 0;
785                                    recreate = true;
786                                } else {
787                                    self.missed_heartbeats = true;
788                                }
789                            }
790                            MessagesErrorKind::ConsumerDeleted => {
791                                recreate = true;
792                                self.consumer_sequence = 0;
793                            }
794                            MessagesErrorKind::Pull
795                            | MessagesErrorKind::PushBasedConsumer
796                            | MessagesErrorKind::Other => {
797                                return Poll::Ready(Some(Err(err.into())));
798                            }
799                        },
800                    },
801                    None => return Poll::Ready(None),
802                },
803                Poll::Pending => (),
804            }
805        }
806        // Recreate consumer if needed
807        if recreate {
808            self.stream = None;
809            self.create_stream = Some(Box::pin({
810                let context = self.context.clone();
811                let config = self.consumer.clone();
812                let stream_name = self.stream_name.clone();
813                let consumer_name = self.consumer_name.clone();
814                let sequence = self.stream_sequence;
815                async move {
816                    tryhard::retry_fn(|| {
817                        recreate_consumer_stream(
818                            &context,
819                            &config,
820                            &stream_name,
821                            &consumer_name,
822                            sequence,
823                        )
824                    })
825                    .retries(5)
826                    .exponential_backoff(Duration::from_millis(500))
827                    .await
828                }
829            }))
830        }
831        // check for recreation future
832        if let Some(result) = self.create_stream.as_mut() {
833            match result.poll_unpin(cx) {
834                Poll::Ready(result) => match result {
835                    Ok(stream) => {
836                        self.create_stream = None;
837                        self.stream = Some(stream);
838                        return self.poll_next(cx);
839                    }
840                    Err(err) => {
841                        return Poll::Ready(Some(Err(OrderedError::with_source(
842                            OrderedErrorKind::Recreate,
843                            err,
844                        ))))
845                    }
846                },
847                Poll::Pending => (),
848            }
849        }
850        Poll::Pending
851    }
852}
853
854pub struct Stream {
855    pending_messages: usize,
856    pending_bytes: usize,
857    request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
858    request_tx: tokio::sync::watch::Sender<()>,
859    subscriber: Subscriber,
860    batch_config: BatchConfig,
861    context: Context,
862    pending_request: bool,
863    task_handle: JoinHandle<()>,
864    terminated: bool,
865    heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
866}
867
868impl Drop for Stream {
869    fn drop(&mut self) {
870        self.task_handle.abort();
871    }
872}
873
874impl Stream {
875    async fn stream(
876        batch_config: BatchConfig,
877        consumer: &Consumer<Config>,
878    ) -> Result<Stream, StreamError> {
879        let inbox = consumer.context.client.new_inbox();
880        let subscription = consumer
881            .context
882            .client
883            .subscribe(inbox.clone())
884            .await
885            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
886        let subject = format!(
887            "{}.CONSUMER.MSG.NEXT.{}.{}",
888            consumer.context.prefix, consumer.info.stream_name, consumer.info.name
889        );
890
891        let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
892        let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
893        let task_handle = tokio::task::spawn({
894            let batch = batch_config;
895            let consumer = consumer.clone();
896            let mut context = consumer.context.clone();
897            let inbox = inbox.clone();
898            async move {
899                loop {
900                    // this is just in edge case of missing response for some reason.
901                    let expires = batch_config
902                        .expires
903                        .map(|expires| {
904                            if expires.is_zero() {
905                                Either::Left(future::pending())
906                            } else {
907                                Either::Right(tokio::time::sleep(
908                                    expires.saturating_add(Duration::from_secs(5)),
909                                ))
910                            }
911                        })
912                        .unwrap_or_else(|| Either::Left(future::pending()));
913                    // Need to check previous state, as `changed` will always fire on first
914                    // call.
915                    let prev_state = context.client.state.borrow().to_owned();
916                    let mut pending_reset = false;
917
918                    tokio::select! {
919                       _ = context.client.state.changed() => {
920                            let state = context.client.state.borrow().to_owned();
921                            if !(state == crate::connection::State::Connected
922                                && prev_state != State::Connected) {
923                                    continue;
924                                }
925                            debug!("detected !Connected -> Connected state change");
926
927                            match tryhard::retry_fn(|| consumer.fetch_info()).retries(5).exponential_backoff(Duration::from_millis(500)).await {
928                                Ok(info) => {
929                                    if info.num_waiting == 0 {
930                                        pending_reset = true;
931                                    }
932                                }
933                                Err(err) => {
934                                     if let Err(err) = request_result_tx.send(Err(err)).await {
935                                        debug!("failed to sent request result: {}", err);
936                                    }
937                                },
938                            }
939                        },
940                        _ = request_rx.changed() => debug!("task received request request"),
941                        _ = expires => {
942                            pending_reset = true;
943                            debug!("expired pull request")},
944                    }
945
946                    let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
947                    let result = context
948                        .client
949                        .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
950                        .await
951                        .map(|_| pending_reset);
952                    // TODO: add tracing instead of ignoring this.
953                    request_result_tx
954                        .send(result.map(|_| pending_reset).map_err(|err| {
955                            crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
956                                .into()
957                        }))
958                        .await
959                        .unwrap();
960                    trace!("result send over tx");
961                }
962            }
963        });
964
965        Ok(Stream {
966            task_handle,
967            request_result_rx,
968            request_tx,
969            batch_config,
970            pending_messages: 0,
971            pending_bytes: 0,
972            subscriber: subscription,
973            context: consumer.context.clone(),
974            pending_request: false,
975            terminated: false,
976            heartbeat_timeout: None,
977        })
978    }
979}
980
981#[derive(Clone, Copy, Debug, PartialEq)]
982pub enum OrderedErrorKind {
983    MissingHeartbeat,
984    ConsumerDeleted,
985    Pull,
986    PushBasedConsumer,
987    Recreate,
988    Other,
989}
990
991impl std::fmt::Display for OrderedErrorKind {
992    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
993        match self {
994            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
995            Self::ConsumerDeleted => write!(f, "consumer deleted"),
996            Self::Pull => write!(f, "pull request failed"),
997            Self::Other => write!(f, "error"),
998            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
999            Self::Recreate => write!(f, "consumer recreation failed"),
1000        }
1001    }
1002}
1003
1004pub type OrderedError = Error<OrderedErrorKind>;
1005
1006impl From<MessagesError> for OrderedError {
1007    fn from(err: MessagesError) -> Self {
1008        match err.kind() {
1009            MessagesErrorKind::MissingHeartbeat => {
1010                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1011            }
1012            MessagesErrorKind::ConsumerDeleted => {
1013                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1014            }
1015            MessagesErrorKind::Pull => OrderedError {
1016                kind: OrderedErrorKind::Pull,
1017                source: err.source,
1018            },
1019            MessagesErrorKind::PushBasedConsumer => {
1020                OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1021            }
1022            MessagesErrorKind::Other => OrderedError {
1023                kind: OrderedErrorKind::Other,
1024                source: err.source,
1025            },
1026        }
1027    }
1028}
1029
1030#[derive(Clone, Copy, Debug, PartialEq)]
1031pub enum MessagesErrorKind {
1032    MissingHeartbeat,
1033    ConsumerDeleted,
1034    Pull,
1035    PushBasedConsumer,
1036    Other,
1037}
1038
1039impl std::fmt::Display for MessagesErrorKind {
1040    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1041        match self {
1042            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1043            Self::ConsumerDeleted => write!(f, "consumer deleted"),
1044            Self::Pull => write!(f, "pull request failed"),
1045            Self::Other => write!(f, "error"),
1046            Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1047        }
1048    }
1049}
1050
1051pub type MessagesError = Error<MessagesErrorKind>;
1052
1053impl futures::Stream for Stream {
1054    type Item = Result<jetstream::Message, MessagesError>;
1055
1056    fn poll_next(
1057        mut self: std::pin::Pin<&mut Self>,
1058        cx: &mut std::task::Context<'_>,
1059    ) -> std::task::Poll<Option<Self::Item>> {
1060        if self.terminated {
1061            return Poll::Ready(None);
1062        }
1063
1064        if !self.batch_config.idle_heartbeat.is_zero() {
1065            trace!("checking idle hearbeats");
1066            let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1067            match self
1068                .heartbeat_timeout
1069                .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1070                .poll_unpin(cx)
1071            {
1072                Poll::Ready(_) => {
1073                    self.heartbeat_timeout = None;
1074                    return Poll::Ready(Some(Err(MessagesError::new(
1075                        MessagesErrorKind::MissingHeartbeat,
1076                    ))));
1077                }
1078                Poll::Pending => (),
1079            }
1080        }
1081
1082        loop {
1083            trace!("pending messages: {}", self.pending_messages);
1084            if (self.pending_messages <= self.batch_config.batch / 2
1085                || (self.batch_config.max_bytes > 0
1086                    && self.pending_bytes <= self.batch_config.max_bytes / 2))
1087                && !self.pending_request
1088            {
1089                debug!("pending messages reached threshold to send new fetch request");
1090                self.request_tx.send(()).unwrap();
1091                self.pending_request = true;
1092            }
1093
1094            match self.request_result_rx.poll_recv(cx) {
1095                Poll::Ready(resp) => match resp {
1096                    Some(resp) => match resp {
1097                        Ok(reset) => {
1098                            trace!("request response: {:?}", reset);
1099                            debug!("request sent, setting pending messages");
1100                            if reset {
1101                                self.pending_messages = self.batch_config.batch;
1102                                self.pending_bytes = self.batch_config.max_bytes;
1103                            } else {
1104                                self.pending_messages += self.batch_config.batch;
1105                                self.pending_bytes += self.batch_config.max_bytes;
1106                            }
1107                            self.pending_request = false;
1108                            continue;
1109                        }
1110                        Err(err) => {
1111                            return Poll::Ready(Some(Err(MessagesError::with_source(
1112                                MessagesErrorKind::Pull,
1113                                err,
1114                            ))))
1115                        }
1116                    },
1117                    None => return Poll::Ready(None),
1118                },
1119                Poll::Pending => {
1120                    trace!("pending result");
1121                }
1122            }
1123
1124            trace!("polling subscriber");
1125            match self.subscriber.receiver.poll_recv(cx) {
1126                Poll::Ready(maybe_message) => {
1127                    self.heartbeat_timeout = None;
1128                    match maybe_message {
1129                        Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1130                            StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1131                                debug!("received status message: {:?}", message);
1132                                // If consumer has been deleted, error and shutdown the iterator.
1133                                if message.description.as_deref() == Some("Consumer Deleted") {
1134                                    self.terminated = true;
1135                                    return Poll::Ready(Some(Err(MessagesError::new(
1136                                        MessagesErrorKind::ConsumerDeleted,
1137                                    ))));
1138                                }
1139                                // If consumer is not pull based, error and shutdown the iterator.
1140                                if message.description.as_deref() == Some("Consumer is push based")
1141                                {
1142                                    self.terminated = true;
1143                                    return Poll::Ready(Some(Err(MessagesError::new(
1144                                        MessagesErrorKind::PushBasedConsumer,
1145                                    ))));
1146                                }
1147
1148                                // Do accounting for messages left after terminated/completed pull request.
1149                                let pending_messages = message
1150                                    .headers
1151                                    .as_ref()
1152                                    .and_then(|headers| headers.get("Nats-Pending-Messages"))
1153                                    .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1154                                    .map_err(|err| {
1155                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1156                                    })?;
1157
1158                                let pending_bytes = message
1159                                    .headers
1160                                    .as_ref()
1161                                    .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1162                                    .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1163                                    .map_err(|err| {
1164                                        MessagesError::with_source(MessagesErrorKind::Other, err)
1165                                    })?;
1166
1167                                debug!(
1168                                    "timeout reached. remaining messages: {}, bytes {}",
1169                                    pending_messages, pending_bytes
1170                                );
1171                                self.pending_messages =
1172                                    self.pending_messages.saturating_sub(pending_messages);
1173                                trace!("message bytes len: {}", pending_bytes);
1174                                self.pending_bytes =
1175                                    self.pending_bytes.saturating_sub(pending_bytes);
1176                                continue;
1177                            }
1178                            // Idle Hearbeat means we have no messages, but consumer is fine.
1179                            StatusCode::IDLE_HEARTBEAT => {
1180                                debug!("received idle heartbeat");
1181                                continue;
1182                            }
1183                            // We got an message from a stream.
1184                            StatusCode::OK => {
1185                                trace!("message received");
1186                                self.pending_messages = self.pending_messages.saturating_sub(1);
1187                                self.pending_bytes =
1188                                    self.pending_bytes.saturating_sub(message.length);
1189                                return Poll::Ready(Some(Ok(jetstream::Message {
1190                                    context: self.context.clone(),
1191                                    message,
1192                                })));
1193                            }
1194                            status => {
1195                                debug!("received unknown message: {:?}", message);
1196                                return Poll::Ready(Some(Err(MessagesError::with_source(
1197                                    MessagesErrorKind::Other,
1198                                    format!(
1199                                        "error while processing messages from the stream: {}, {:?}",
1200                                        status, message.description
1201                                    ),
1202                                ))));
1203                            }
1204                        },
1205                        None => return Poll::Ready(None),
1206                    }
1207                }
1208                Poll::Pending => {
1209                    debug!("subscriber still pending");
1210                    return std::task::Poll::Pending;
1211                }
1212            }
1213        }
1214    }
1215}
1216
1217/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1218///
1219/// # Examples
1220///
1221/// ```no_run
1222/// # #[tokio::main]
1223/// # async fn main() -> Result<(), async_nats::Error>  {
1224/// use futures::StreamExt;
1225/// use async_nats::jetstream::consumer::PullConsumer;
1226/// let client = async_nats::connect("localhost:4222").await?;
1227/// let jetstream = async_nats::jetstream::new(client);
1228///
1229/// let consumer: PullConsumer = jetstream
1230///     .get_stream("events").await?
1231///     .get_consumer("pull").await?;
1232///
1233/// let mut messages = consumer.stream()
1234///     .max_messages_per_batch(100)
1235///     .max_bytes_per_batch(1024)
1236///     .messages().await?;
1237///
1238/// while let Some(message) = messages.next().await {
1239///     let message = message?;
1240///     println!("message: {:?}", message);
1241///     message.ack().await?;
1242/// }
1243/// # Ok(())
1244/// # }
1245pub struct StreamBuilder<'a> {
1246    batch: usize,
1247    max_bytes: usize,
1248    heartbeat: Duration,
1249    expires: Duration,
1250    consumer: &'a Consumer<Config>,
1251}
1252
1253impl<'a> StreamBuilder<'a> {
1254    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1255        StreamBuilder {
1256            consumer,
1257            batch: 200,
1258            max_bytes: 0,
1259            expires: Duration::from_secs(30),
1260            heartbeat: Duration::default(),
1261        }
1262    }
1263
1264    /// Sets max bytes that can be buffered on the Client while processing already received
1265    /// messages.
1266    /// Higher values will yield better performance, but also potentially increase memory usage if
1267    /// application is acknowledging messages much slower than they arrive.
1268    ///
1269    /// Default values should provide reasonable balance between performance and memory usage.
1270    ///
1271    /// # Examples
1272    ///
1273    /// ```no_run
1274    /// # #[tokio::main]
1275    /// # async fn main() -> Result<(), async_nats::Error>  {
1276    /// use async_nats::jetstream::consumer::PullConsumer;
1277    /// use futures::StreamExt;
1278    /// let client = async_nats::connect("localhost:4222").await?;
1279    /// let jetstream = async_nats::jetstream::new(client);
1280    ///
1281    /// let consumer: PullConsumer = jetstream
1282    ///     .get_stream("events")
1283    ///     .await?
1284    ///     .get_consumer("pull")
1285    ///     .await?;
1286    ///
1287    /// let mut messages = consumer
1288    ///     .stream()
1289    ///     .max_bytes_per_batch(1024)
1290    ///     .messages()
1291    ///     .await?;
1292    ///
1293    /// while let Some(message) = messages.next().await {
1294    ///     let message = message?;
1295    ///     println!("message: {:?}", message);
1296    ///     message.ack().await?;
1297    /// }
1298    /// # Ok(())
1299    /// # }
1300    /// ```
1301    pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1302        self.max_bytes = max_bytes;
1303        self
1304    }
1305
1306    /// Sets max number of messages that can be buffered on the Client while processing already received
1307    /// messages.
1308    /// Higher values will yield better performance, but also potentially increase memory usage if
1309    /// application is acknowledging messages much slower than they arrive.
1310    ///
1311    /// Default values should provide reasonable balance between performance and memory usage.
1312    ///
1313    /// # Examples
1314    ///
1315    /// ```no_run
1316    /// # #[tokio::main]
1317    /// # async fn main() -> Result<(), async_nats::Error>  {
1318    /// use async_nats::jetstream::consumer::PullConsumer;
1319    /// use futures::StreamExt;
1320    /// let client = async_nats::connect("localhost:4222").await?;
1321    /// let jetstream = async_nats::jetstream::new(client);
1322    ///
1323    /// let consumer: PullConsumer = jetstream
1324    ///     .get_stream("events")
1325    ///     .await?
1326    ///     .get_consumer("pull")
1327    ///     .await?;
1328    ///
1329    /// let mut messages = consumer
1330    ///     .stream()
1331    ///     .max_messages_per_batch(100)
1332    ///     .messages()
1333    ///     .await?;
1334    ///
1335    /// while let Some(message) = messages.next().await {
1336    ///     let message = message?;
1337    ///     println!("message: {:?}", message);
1338    ///     message.ack().await?;
1339    /// }
1340    /// # Ok(())
1341    /// # }
1342    /// ```
1343    pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1344        self.batch = batch;
1345        self
1346    }
1347
1348    /// Sets heartbeat which will be send by the server if there are no messages for a given
1349    /// [Consumer] pending.
1350    ///
1351    /// # Examples
1352    ///
1353    /// ```no_run
1354    /// # #[tokio::main]
1355    /// # async fn main() -> Result<(), async_nats::Error>  {
1356    /// use async_nats::jetstream::consumer::PullConsumer;
1357    /// use futures::StreamExt;
1358    /// let client = async_nats::connect("localhost:4222").await?;
1359    /// let jetstream = async_nats::jetstream::new(client);
1360    ///
1361    /// let consumer: PullConsumer = jetstream
1362    ///     .get_stream("events")
1363    ///     .await?
1364    ///     .get_consumer("pull")
1365    ///     .await?;
1366    ///
1367    /// let mut messages = consumer
1368    ///     .stream()
1369    ///     .heartbeat(std::time::Duration::from_secs(10))
1370    ///     .messages()
1371    ///     .await?;
1372    ///
1373    /// while let Some(message) = messages.next().await {
1374    ///     let message = message?;
1375    ///     println!("message: {:?}", message);
1376    ///     message.ack().await?;
1377    /// }
1378    /// # Ok(())
1379    /// # }
1380    /// ```
1381    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1382        self.heartbeat = heartbeat;
1383        self
1384    }
1385
1386    /// Low level API that does not need tweaking for most use cases.
1387    /// Sets how long each batch request waits for whole batch of messages before timing out.
1388    /// [Consumer] pending.
1389    ///
1390    /// # Examples
1391    ///
1392    /// ```no_run
1393    /// # #[tokio::main]
1394    /// # async fn main() -> Result<(), async_nats::Error>  {
1395    /// use async_nats::jetstream::consumer::PullConsumer;
1396    /// use futures::StreamExt;
1397    /// let client = async_nats::connect("localhost:4222").await?;
1398    /// let jetstream = async_nats::jetstream::new(client);
1399    ///
1400    /// let consumer: PullConsumer = jetstream
1401    ///     .get_stream("events")
1402    ///     .await?
1403    ///     .get_consumer("pull")
1404    ///     .await?;
1405    ///
1406    /// let mut messages = consumer
1407    ///     .stream()
1408    ///     .expires(std::time::Duration::from_secs(30))
1409    ///     .messages()
1410    ///     .await?;
1411    ///
1412    /// while let Some(message) = messages.next().await {
1413    ///     let message = message?;
1414    ///     println!("message: {:?}", message);
1415    ///     message.ack().await?;
1416    /// }
1417    /// # Ok(())
1418    /// # }
1419    /// ```
1420    pub fn expires(mut self, expires: Duration) -> Self {
1421        self.expires = expires;
1422        self
1423    }
1424
1425    /// Creates actual [Stream] with provided configuration.
1426    ///
1427    /// # Examples
1428    ///
1429    /// ```no_run
1430    /// # #[tokio::main]
1431    /// # async fn main() -> Result<(), async_nats::Error>  {
1432    /// use async_nats::jetstream::consumer::PullConsumer;
1433    /// use futures::StreamExt;
1434    /// let client = async_nats::connect("localhost:4222").await?;
1435    /// let jetstream = async_nats::jetstream::new(client);
1436    ///
1437    /// let consumer: PullConsumer = jetstream
1438    ///     .get_stream("events")
1439    ///     .await?
1440    ///     .get_consumer("pull")
1441    ///     .await?;
1442    ///
1443    /// let mut messages = consumer
1444    ///     .stream()
1445    ///     .max_messages_per_batch(100)
1446    ///     .messages()
1447    ///     .await?;
1448    ///
1449    /// while let Some(message) = messages.next().await {
1450    ///     let message = message?;
1451    ///     println!("message: {:?}", message);
1452    ///     message.ack().await?;
1453    /// }
1454    /// # Ok(())
1455    /// # }
1456    /// ```
1457    pub async fn messages(self) -> Result<Stream, StreamError> {
1458        Stream::stream(
1459            BatchConfig {
1460                batch: self.batch,
1461                expires: Some(self.expires),
1462                no_wait: false,
1463                max_bytes: self.max_bytes,
1464                idle_heartbeat: self.heartbeat,
1465            },
1466            self.consumer,
1467        )
1468        .await
1469    }
1470}
1471
1472/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1473///
1474/// # Examples
1475///
1476/// ```no_run
1477/// # #[tokio::main]
1478/// # async fn main() -> Result<(), async_nats::Error>  {
1479/// use async_nats::jetstream::consumer::PullConsumer;
1480/// use futures::StreamExt;
1481/// let client = async_nats::connect("localhost:4222").await?;
1482/// let jetstream = async_nats::jetstream::new(client);
1483///
1484/// let consumer: PullConsumer = jetstream
1485///     .get_stream("events")
1486///     .await?
1487///     .get_consumer("pull")
1488///     .await?;
1489///
1490/// let mut messages = consumer
1491///     .fetch()
1492///     .max_messages(100)
1493///     .max_bytes(1024)
1494///     .messages()
1495///     .await?;
1496///
1497/// while let Some(message) = messages.next().await {
1498///     let message = message?;
1499///     println!("message: {:?}", message);
1500///     message.ack().await?;
1501/// }
1502/// # Ok(())
1503/// # }
1504/// ```
1505pub struct FetchBuilder<'a> {
1506    batch: usize,
1507    max_bytes: usize,
1508    heartbeat: Duration,
1509    expires: Option<Duration>,
1510    consumer: &'a Consumer<Config>,
1511}
1512
1513impl<'a> FetchBuilder<'a> {
1514    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1515        FetchBuilder {
1516            consumer,
1517            batch: 200,
1518            max_bytes: 0,
1519            expires: None,
1520            heartbeat: Duration::default(),
1521        }
1522    }
1523
1524    /// Sets max bytes that can be buffered on the Client while processing already received
1525    /// messages.
1526    /// Higher values will yield better performance, but also potentially increase memory usage if
1527    /// application is acknowledging messages much slower than they arrive.
1528    ///
1529    /// Default values should provide reasonable balance between performance and memory usage.
1530    ///
1531    /// # Examples
1532    ///
1533    /// ```no_run
1534    /// # #[tokio::main]
1535    /// # async fn main() -> Result<(), async_nats::Error>  {
1536    /// use futures::StreamExt;
1537    /// let client = async_nats::connect("localhost:4222").await?;
1538    /// let jetstream = async_nats::jetstream::new(client);
1539    ///
1540    /// let consumer = jetstream
1541    ///     .get_stream("events")
1542    ///     .await?
1543    ///     .get_consumer("pull")
1544    ///     .await?;
1545    ///
1546    /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1547    ///
1548    /// while let Some(message) = messages.next().await {
1549    ///     let message = message?;
1550    ///     println!("message: {:?}", message);
1551    ///     message.ack().await?;
1552    /// }
1553    /// # Ok(())
1554    /// # }
1555    /// ```
1556    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1557        self.max_bytes = max_bytes;
1558        self
1559    }
1560
1561    /// Sets max number of messages that can be buffered on the Client while processing already received
1562    /// messages.
1563    /// Higher values will yield better performance, but also potentially increase memory usage if
1564    /// application is acknowledging messages much slower than they arrive.
1565    ///
1566    /// Default values should provide reasonable balance between performance and memory usage.
1567    ///
1568    /// # Examples
1569    ///
1570    /// ```no_run
1571    /// # #[tokio::main]
1572    /// # async fn main() -> Result<(), async_nats::Error>  {
1573    /// use futures::StreamExt;
1574    /// let client = async_nats::connect("localhost:4222").await?;
1575    /// let jetstream = async_nats::jetstream::new(client);
1576    ///
1577    /// let consumer = jetstream
1578    ///     .get_stream("events")
1579    ///     .await?
1580    ///     .get_consumer("pull")
1581    ///     .await?;
1582    ///
1583    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1584    ///
1585    /// while let Some(message) = messages.next().await {
1586    ///     let message = message?;
1587    ///     println!("message: {:?}", message);
1588    ///     message.ack().await?;
1589    /// }
1590    /// # Ok(())
1591    /// # }
1592    /// ```
1593    pub fn max_messages(mut self, batch: usize) -> Self {
1594        self.batch = batch;
1595        self
1596    }
1597
1598    /// Sets heartbeat which will be send by the server if there are no messages for a given
1599    /// [Consumer] pending.
1600    ///
1601    /// # Examples
1602    ///
1603    /// ```no_run
1604    /// # #[tokio::main]
1605    /// # async fn main() -> Result<(), async_nats::Error>  {
1606    /// use async_nats::jetstream::consumer::PullConsumer;
1607    /// use futures::StreamExt;
1608    /// let client = async_nats::connect("localhost:4222").await?;
1609    /// let jetstream = async_nats::jetstream::new(client);
1610    ///
1611    /// let consumer = jetstream
1612    ///     .get_stream("events")
1613    ///     .await?
1614    ///     .get_consumer("pull")
1615    ///     .await?;
1616    ///
1617    /// let mut messages = consumer
1618    ///     .fetch()
1619    ///     .heartbeat(std::time::Duration::from_secs(10))
1620    ///     .messages()
1621    ///     .await?;
1622    ///
1623    /// while let Some(message) = messages.next().await {
1624    ///     let message = message?;
1625    ///     println!("message: {:?}", message);
1626    ///     message.ack().await?;
1627    /// }
1628    /// # Ok(())
1629    /// # }
1630    /// ```
1631    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1632        self.heartbeat = heartbeat;
1633        self
1634    }
1635
1636    /// Low level API that does not need tweaking for most use cases.
1637    /// Sets how long each batch request waits for whole batch of messages before timing out.
1638    /// [Consumer] pending.
1639    ///
1640    /// # Examples
1641    ///
1642    /// ```no_run
1643    /// # #[tokio::main]
1644    /// # async fn main() -> Result<(), async_nats::Error>  {
1645    /// use async_nats::jetstream::consumer::PullConsumer;
1646    /// use futures::StreamExt;
1647    ///
1648    /// let client = async_nats::connect("localhost:4222").await?;
1649    /// let jetstream = async_nats::jetstream::new(client);
1650    ///
1651    /// let consumer: PullConsumer = jetstream
1652    ///     .get_stream("events")
1653    ///     .await?
1654    ///     .get_consumer("pull")
1655    ///     .await?;
1656    ///
1657    /// let mut messages = consumer
1658    ///     .fetch()
1659    ///     .expires(std::time::Duration::from_secs(30))
1660    ///     .messages()
1661    ///     .await?;
1662    ///
1663    /// while let Some(message) = messages.next().await {
1664    ///     let message = message?;
1665    ///     println!("message: {:?}", message);
1666    ///     message.ack().await?;
1667    /// }
1668    /// # Ok(())
1669    /// # }
1670    /// ```
1671    pub fn expires(mut self, expires: Duration) -> Self {
1672        self.expires = Some(expires);
1673        self
1674    }
1675
1676    /// Creates actual [Stream] with provided configuration.
1677    ///
1678    /// # Examples
1679    ///
1680    /// ```no_run
1681    /// # #[tokio::main]
1682    /// # async fn main() -> Result<(), async_nats::Error>  {
1683    /// use async_nats::jetstream::consumer::PullConsumer;
1684    /// use futures::StreamExt;
1685    /// let client = async_nats::connect("localhost:4222").await?;
1686    /// let jetstream = async_nats::jetstream::new(client);
1687    ///
1688    /// let consumer: PullConsumer = jetstream
1689    ///     .get_stream("events")
1690    ///     .await?
1691    ///     .get_consumer("pull")
1692    ///     .await?;
1693    ///
1694    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1695    ///
1696    /// while let Some(message) = messages.next().await {
1697    ///     let message = message?;
1698    ///     println!("message: {:?}", message);
1699    ///     message.ack().await?;
1700    /// }
1701    /// # Ok(())
1702    /// # }
1703    /// ```
1704    pub async fn messages(self) -> Result<Batch, BatchError> {
1705        Batch::batch(
1706            BatchConfig {
1707                batch: self.batch,
1708                expires: self.expires,
1709                no_wait: true,
1710                max_bytes: self.max_bytes,
1711                idle_heartbeat: self.heartbeat,
1712            },
1713            self.consumer,
1714        )
1715        .await
1716    }
1717}
1718
1719/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
1720///
1721/// # Examples
1722///
1723/// ```no_run
1724/// # #[tokio::main]
1725/// # async fn main() -> Result<(), async_nats::Error>  {
1726/// use async_nats::jetstream::consumer::PullConsumer;
1727/// use futures::StreamExt;
1728/// let client = async_nats::connect("localhost:4222").await?;
1729/// let jetstream = async_nats::jetstream::new(client);
1730///
1731/// let consumer: PullConsumer = jetstream
1732///     .get_stream("events")
1733///     .await?
1734///     .get_consumer("pull")
1735///     .await?;
1736///
1737/// let mut messages = consumer
1738///     .batch()
1739///     .max_messages(100)
1740///     .max_bytes(1024)
1741///     .messages()
1742///     .await?;
1743///
1744/// while let Some(message) = messages.next().await {
1745///     let message = message?;
1746///     println!("message: {:?}", message);
1747///     message.ack().await?;
1748/// }
1749/// # Ok(())
1750/// # }
1751/// ```
1752pub struct BatchBuilder<'a> {
1753    batch: usize,
1754    max_bytes: usize,
1755    heartbeat: Duration,
1756    expires: Duration,
1757    consumer: &'a Consumer<Config>,
1758}
1759
1760impl<'a> BatchBuilder<'a> {
1761    pub fn new(consumer: &'a Consumer<Config>) -> Self {
1762        BatchBuilder {
1763            consumer,
1764            batch: 200,
1765            max_bytes: 0,
1766            expires: Duration::ZERO,
1767            heartbeat: Duration::default(),
1768        }
1769    }
1770
1771    /// Sets max bytes that can be buffered on the Client while processing already received
1772    /// messages.
1773    /// Higher values will yield better performance, but also potentially increase memory usage if
1774    /// application is acknowledging messages much slower than they arrive.
1775    ///
1776    /// Default values should provide reasonable balance between performance and memory usage.
1777    ///
1778    /// # Examples
1779    ///
1780    /// ```no_run
1781    /// # #[tokio::main]
1782    /// # async fn main() -> Result<(), async_nats::Error>  {
1783    /// use async_nats::jetstream::consumer::PullConsumer;
1784    /// use futures::StreamExt;
1785    /// let client = async_nats::connect("localhost:4222").await?;
1786    /// let jetstream = async_nats::jetstream::new(client);
1787    ///
1788    /// let consumer: PullConsumer = jetstream
1789    ///     .get_stream("events")
1790    ///     .await?
1791    ///     .get_consumer("pull")
1792    ///     .await?;
1793    ///
1794    /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
1795    ///
1796    /// while let Some(message) = messages.next().await {
1797    ///     let message = message?;
1798    ///     println!("message: {:?}", message);
1799    ///     message.ack().await?;
1800    /// }
1801    /// # Ok(())
1802    /// # }
1803    /// ```
1804    pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1805        self.max_bytes = max_bytes;
1806        self
1807    }
1808
1809    /// Sets max number of messages that can be buffered on the Client while processing already received
1810    /// messages.
1811    /// Higher values will yield better performance, but also potentially increase memory usage if
1812    /// application is acknowledging messages much slower than they arrive.
1813    ///
1814    /// Default values should provide reasonable balance between performance and memory usage.
1815    ///
1816    /// # Examples
1817    ///
1818    /// ```no_run
1819    /// # #[tokio::main]
1820    /// # async fn main() -> Result<(), async_nats::Error>  {
1821    /// use async_nats::jetstream::consumer::PullConsumer;
1822    /// use futures::StreamExt;
1823    /// let client = async_nats::connect("localhost:4222").await?;
1824    /// let jetstream = async_nats::jetstream::new(client);
1825    ///
1826    /// let consumer: PullConsumer = jetstream
1827    ///     .get_stream("events")
1828    ///     .await?
1829    ///     .get_consumer("pull")
1830    ///     .await?;
1831    ///
1832    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1833    ///
1834    /// while let Some(message) = messages.next().await {
1835    ///     let message = message?;
1836    ///     println!("message: {:?}", message);
1837    ///     message.ack().await?;
1838    /// }
1839    /// # Ok(())
1840    /// # }
1841    /// ```
1842    pub fn max_messages(mut self, batch: usize) -> Self {
1843        self.batch = batch;
1844        self
1845    }
1846
1847    /// Sets heartbeat which will be send by the server if there are no messages for a given
1848    /// [Consumer] pending.
1849    ///
1850    /// # Examples
1851    ///
1852    /// ```no_run
1853    /// # #[tokio::main]
1854    /// # async fn main() -> Result<(), async_nats::Error>  {
1855    /// use async_nats::jetstream::consumer::PullConsumer;
1856    /// use futures::StreamExt;
1857    /// let client = async_nats::connect("localhost:4222").await?;
1858    /// let jetstream = async_nats::jetstream::new(client);
1859    ///
1860    /// let consumer: PullConsumer = jetstream
1861    ///     .get_stream("events")
1862    ///     .await?
1863    ///     .get_consumer("pull")
1864    ///     .await?;
1865    ///
1866    /// let mut messages = consumer
1867    ///     .batch()
1868    ///     .heartbeat(std::time::Duration::from_secs(10))
1869    ///     .messages()
1870    ///     .await?;
1871    ///
1872    /// while let Some(message) = messages.next().await {
1873    ///     let message = message?;
1874    ///     println!("message: {:?}", message);
1875    ///     message.ack().await?;
1876    /// }
1877    /// # Ok(())
1878    /// # }
1879    /// ```
1880    pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1881        self.heartbeat = heartbeat;
1882        self
1883    }
1884
1885    /// Low level API that does not need tweaking for most use cases.
1886    /// Sets how long each batch request waits for whole batch of messages before timing out.
1887    /// [Consumer] pending.
1888    ///
1889    /// # Examples
1890    ///
1891    /// ```no_run
1892    /// # #[tokio::main]
1893    /// # async fn main() -> Result<(), async_nats::Error>  {
1894    /// use async_nats::jetstream::consumer::PullConsumer;
1895    /// use futures::StreamExt;
1896    /// let client = async_nats::connect("localhost:4222").await?;
1897    /// let jetstream = async_nats::jetstream::new(client);
1898    ///
1899    /// let consumer: PullConsumer = jetstream
1900    ///     .get_stream("events")
1901    ///     .await?
1902    ///     .get_consumer("pull")
1903    ///     .await?;
1904    ///
1905    /// let mut messages = consumer
1906    ///     .batch()
1907    ///     .expires(std::time::Duration::from_secs(30))
1908    ///     .messages()
1909    ///     .await?;
1910    ///
1911    /// while let Some(message) = messages.next().await {
1912    ///     let message = message?;
1913    ///     println!("message: {:?}", message);
1914    ///     message.ack().await?;
1915    /// }
1916    /// # Ok(())
1917    /// # }
1918    /// ```
1919    pub fn expires(mut self, expires: Duration) -> Self {
1920        self.expires = expires;
1921        self
1922    }
1923
1924    /// Creates actual [Stream] with provided configuration.
1925    ///
1926    /// # Examples
1927    ///
1928    /// ```no_run
1929    /// # #[tokio::main]
1930    /// # async fn main() -> Result<(), async_nats::Error>  {
1931    /// use async_nats::jetstream::consumer::PullConsumer;
1932    /// use futures::StreamExt;
1933    /// let client = async_nats::connect("localhost:4222").await?;
1934    /// let jetstream = async_nats::jetstream::new(client);
1935    ///
1936    /// let consumer: PullConsumer = jetstream
1937    ///     .get_stream("events")
1938    ///     .await?
1939    ///     .get_consumer("pull")
1940    ///     .await?;
1941    ///
1942    /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1943    ///
1944    /// while let Some(message) = messages.next().await {
1945    ///     let message = message?;
1946    ///     println!("message: {:?}", message);
1947    ///     message.ack().await?;
1948    /// }
1949    /// # Ok(())
1950    /// # }
1951    /// ```
1952    pub async fn messages(self) -> Result<Batch, BatchError> {
1953        Batch::batch(
1954            BatchConfig {
1955                batch: self.batch,
1956                expires: Some(self.expires),
1957                no_wait: false,
1958                max_bytes: self.max_bytes,
1959                idle_heartbeat: self.heartbeat,
1960            },
1961            self.consumer,
1962        )
1963        .await
1964    }
1965}
1966
1967/// Used for next Pull Request for Pull Consumer
1968#[derive(Debug, Default, Serialize, Clone, Copy, PartialEq, Eq)]
1969pub struct BatchConfig {
1970    /// The number of messages that are being requested to be delivered.
1971    pub batch: usize,
1972    /// The optional number of nanoseconds that the server will store this next request for
1973    /// before forgetting about the pending batch size.
1974    #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1975    pub expires: Option<Duration>,
1976    /// This optionally causes the server not to store this pending request at all, but when there are no
1977    /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
1978    /// can know when you reached the end of the stream for example. A 409 is returned if the
1979    /// Consumer has reached MaxAckPending limits.
1980    #[serde(skip_serializing_if = "is_default")]
1981    pub no_wait: bool,
1982
1983    /// Sets max number of bytes in total in given batch size. This works together with `batch`.
1984    /// Whichever value is reached first, batch will complete.
1985    pub max_bytes: usize,
1986
1987    /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
1988    /// client
1989    #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
1990    pub idle_heartbeat: Duration,
1991}
1992
1993fn is_default<T: Default + Eq>(t: &T) -> bool {
1994    t == &T::default()
1995}
1996
1997#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1998pub struct Config {
1999    /// Setting `durable_name` to `Some(...)` will cause this consumer
2000    /// to be "durable". This may be a good choice for workloads that
2001    /// benefit from the `JetStream` server or cluster remembering the
2002    /// progress of consumers for fault tolerance purposes. If a consumer
2003    /// crashes, the `JetStream` server or cluster will remember which
2004    /// messages the consumer acknowledged. When the consumer recovers,
2005    /// this information will allow the consumer to resume processing
2006    /// where it left off. If you're unsure, set this to `Some(...)`.
2007    ///
2008    /// Setting `durable_name` to `None` will cause this consumer to
2009    /// be "ephemeral". This may be a good choice for workloads where
2010    /// you don't need the `JetStream` server to remember the consumer's
2011    /// progress in the case of a crash, such as certain "high churn"
2012    /// workloads or workloads where a crashed instance is not required
2013    /// to recover.
2014    #[serde(default, skip_serializing_if = "Option::is_none")]
2015    pub durable_name: Option<String>,
2016    /// A name of the consumer. Can be specified for both durable and ephemeral
2017    /// consumers.
2018    #[serde(default, skip_serializing_if = "Option::is_none")]
2019    pub name: Option<String>,
2020    /// A short description of the purpose of this consumer.
2021    #[serde(default, skip_serializing_if = "Option::is_none")]
2022    pub description: Option<String>,
2023    /// Allows for a variety of options that determine how this consumer will receive messages
2024    #[serde(flatten)]
2025    pub deliver_policy: DeliverPolicy,
2026    /// How messages should be acknowledged
2027    pub ack_policy: AckPolicy,
2028    /// How long to allow messages to remain un-acknowledged before attempting redelivery
2029    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2030    pub ack_wait: Duration,
2031    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2032    #[serde(default, skip_serializing_if = "is_default")]
2033    pub max_deliver: i64,
2034    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2035    #[serde(default, skip_serializing_if = "is_default")]
2036    pub filter_subject: String,
2037    #[cfg(feature = "server_2_10")]
2038    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2039    #[serde(default, skip_serializing_if = "is_default")]
2040    pub filter_subjects: Vec<String>,
2041    /// Whether messages are sent as quickly as possible or at the rate of receipt
2042    pub replay_policy: ReplayPolicy,
2043    /// The rate of message delivery in bits per second
2044    #[serde(default, skip_serializing_if = "is_default")]
2045    pub rate_limit: u64,
2046    /// What percentage of acknowledgments should be samples for observability, 0-100
2047    #[serde(default, skip_serializing_if = "is_default")]
2048    pub sample_frequency: u8,
2049    /// The maximum number of waiting consumers.
2050    #[serde(default, skip_serializing_if = "is_default")]
2051    pub max_waiting: i64,
2052    /// The maximum number of unacknowledged messages that may be
2053    /// in-flight before pausing sending additional messages to
2054    /// this consumer.
2055    #[serde(default, skip_serializing_if = "is_default")]
2056    pub max_ack_pending: i64,
2057    /// Only deliver headers without payloads.
2058    #[serde(default, skip_serializing_if = "is_default")]
2059    pub headers_only: bool,
2060    /// Maximum size of a request batch
2061    #[serde(default, skip_serializing_if = "is_default")]
2062    pub max_batch: i64,
2063    /// Maximum value of request max_bytes
2064    #[serde(default, skip_serializing_if = "is_default")]
2065    pub max_bytes: i64,
2066    /// Maximum value for request expiration
2067    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2068    pub max_expires: Duration,
2069    /// Threshold for consumer inactivity
2070    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2071    pub inactive_threshold: Duration,
2072    /// Number of consumer replicas
2073    #[serde(default, skip_serializing_if = "is_default")]
2074    pub num_replicas: usize,
2075    /// Force consumer to use memory storage.
2076    #[serde(default, skip_serializing_if = "is_default")]
2077    pub memory_storage: bool,
2078    #[cfg(feature = "server_2_10")]
2079    // Additional consumer metadata.
2080    #[serde(default, skip_serializing_if = "is_default")]
2081    pub metadata: HashMap<String, String>,
2082    /// Custom backoff for missed acknowledgments.
2083    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2084    pub backoff: Vec<Duration>,
2085}
2086
2087impl IntoConsumerConfig for &Config {
2088    fn into_consumer_config(self) -> consumer::Config {
2089        self.clone().into_consumer_config()
2090    }
2091}
2092
2093impl IntoConsumerConfig for Config {
2094    fn into_consumer_config(self) -> consumer::Config {
2095        jetstream::consumer::Config {
2096            deliver_subject: None,
2097            name: self.name,
2098            durable_name: self.durable_name,
2099            description: self.description,
2100            deliver_group: None,
2101            deliver_policy: self.deliver_policy,
2102            ack_policy: self.ack_policy,
2103            ack_wait: self.ack_wait,
2104            max_deliver: self.max_deliver,
2105            filter_subject: self.filter_subject,
2106            #[cfg(feature = "server_2_10")]
2107            filter_subjects: self.filter_subjects,
2108            replay_policy: self.replay_policy,
2109            rate_limit: self.rate_limit,
2110            sample_frequency: self.sample_frequency,
2111            max_waiting: self.max_waiting,
2112            max_ack_pending: self.max_ack_pending,
2113            headers_only: self.headers_only,
2114            flow_control: false,
2115            idle_heartbeat: Duration::default(),
2116            max_batch: self.max_batch,
2117            max_bytes: self.max_bytes,
2118            max_expires: self.max_expires,
2119            inactive_threshold: self.inactive_threshold,
2120            num_replicas: self.num_replicas,
2121            memory_storage: self.memory_storage,
2122            #[cfg(feature = "server_2_10")]
2123            metadata: self.metadata,
2124            backoff: self.backoff,
2125        }
2126    }
2127}
2128impl FromConsumer for Config {
2129    fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2130        if config.deliver_subject.is_some() {
2131            return Err(Box::new(std::io::Error::new(
2132                std::io::ErrorKind::Other,
2133                "pull consumer cannot have delivery subject",
2134            )));
2135        }
2136        Ok(Config {
2137            durable_name: config.durable_name,
2138            name: config.name,
2139            description: config.description,
2140            deliver_policy: config.deliver_policy,
2141            ack_policy: config.ack_policy,
2142            ack_wait: config.ack_wait,
2143            max_deliver: config.max_deliver,
2144            filter_subject: config.filter_subject,
2145            #[cfg(feature = "server_2_10")]
2146            filter_subjects: config.filter_subjects,
2147            replay_policy: config.replay_policy,
2148            rate_limit: config.rate_limit,
2149            sample_frequency: config.sample_frequency,
2150            max_waiting: config.max_waiting,
2151            max_ack_pending: config.max_ack_pending,
2152            headers_only: config.headers_only,
2153            max_batch: config.max_batch,
2154            max_bytes: config.max_bytes,
2155            max_expires: config.max_expires,
2156            inactive_threshold: config.inactive_threshold,
2157            num_replicas: config.num_replicas,
2158            memory_storage: config.memory_storage,
2159            #[cfg(feature = "server_2_10")]
2160            metadata: config.metadata,
2161            backoff: config.backoff,
2162        })
2163    }
2164}
2165
2166#[derive(Clone, Copy, Debug, PartialEq)]
2167pub enum BatchRequestErrorKind {
2168    Publish,
2169    Flush,
2170    Serialize,
2171}
2172
2173impl std::fmt::Display for BatchRequestErrorKind {
2174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2175        match self {
2176            Self::Publish => write!(f, "publish failed"),
2177            Self::Flush => write!(f, "flush failed"),
2178            Self::Serialize => write!(f, "serialize failed"),
2179        }
2180    }
2181}
2182
2183pub type BatchRequestError = Error<BatchRequestErrorKind>;
2184
2185#[derive(Clone, Copy, Debug, PartialEq)]
2186pub enum BatchErrorKind {
2187    Subscribe,
2188    Pull,
2189    Flush,
2190    Serialize,
2191}
2192
2193impl std::fmt::Display for BatchErrorKind {
2194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2195        match self {
2196            Self::Pull => write!(f, "pull request failed"),
2197            Self::Flush => write!(f, "flush failed"),
2198            Self::Serialize => write!(f, "serialize failed"),
2199            Self::Subscribe => write!(f, "subscribe failed"),
2200        }
2201    }
2202}
2203
2204pub type BatchError = Error<BatchErrorKind>;
2205
2206impl From<SubscribeError> for BatchError {
2207    fn from(err: SubscribeError) -> Self {
2208        BatchError::with_source(BatchErrorKind::Subscribe, err)
2209    }
2210}
2211
2212impl From<BatchRequestError> for BatchError {
2213    fn from(err: BatchRequestError) -> Self {
2214        BatchError::with_source(BatchErrorKind::Pull, err)
2215    }
2216}
2217
2218#[derive(Clone, Copy, Debug, PartialEq)]
2219pub enum ConsumerRecreateErrorKind {
2220    GetStream,
2221    Recreate,
2222    TimedOut,
2223}
2224
2225impl std::fmt::Display for ConsumerRecreateErrorKind {
2226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2227        match self {
2228            Self::GetStream => write!(f, "error getting stream"),
2229            Self::Recreate => write!(f, "consumer creation failed"),
2230            Self::TimedOut => write!(f, "timed out"),
2231        }
2232    }
2233}
2234
2235pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2236
2237async fn recreate_consumer_stream(
2238    context: &Context,
2239    config: &OrderedConfig,
2240    stream_name: &str,
2241    consumer_name: &str,
2242    sequence: u64,
2243) -> Result<Stream, ConsumerRecreateError> {
2244    let span = tracing::span!(
2245        tracing::Level::DEBUG,
2246        "recreate_ordered_consumer",
2247        stream_name = stream_name,
2248        consumer_name = consumer_name,
2249        sequence = sequence
2250    );
2251    let _span_handle = span.enter();
2252    let config = config.to_owned();
2253    trace!("delete old consumer before creating new one");
2254
2255    tokio::time::timeout(
2256        Duration::from_secs(5),
2257        context.delete_consumer_from_stream(consumer_name, stream_name),
2258    )
2259    .await
2260    .ok();
2261
2262    let deliver_policy = {
2263        if sequence == 0 {
2264            DeliverPolicy::All
2265        } else {
2266            DeliverPolicy::ByStartSequence {
2267                start_sequence: sequence + 1,
2268            }
2269        }
2270    };
2271    trace!("create the new ordered consumer for sequence {}", sequence);
2272    let consumer = tokio::time::timeout(
2273        Duration::from_secs(5),
2274        context.create_consumer_on_stream(
2275            jetstream::consumer::pull::OrderedConfig {
2276                deliver_policy,
2277                ..config.clone()
2278            },
2279            stream_name,
2280        ),
2281    )
2282    .await
2283    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2284    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2285
2286    let config = Consumer {
2287        config: config.clone().into(),
2288        context: context.clone(),
2289        info: consumer.info,
2290    };
2291
2292    trace!("create iterator");
2293    let stream = tokio::time::timeout(
2294        Duration::from_secs(5),
2295        Stream::stream(
2296            BatchConfig {
2297                batch: 500,
2298                expires: Some(Duration::from_secs(30)),
2299                no_wait: false,
2300                max_bytes: 0,
2301                idle_heartbeat: Duration::from_secs(15),
2302            },
2303            &config,
2304        ),
2305    )
2306    .await
2307    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2308    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2309    trace!("recreated consumer");
2310    stream
2311}