async_nats_wrpc/jetstream/consumer/
push.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use super::{
15    AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16    StreamError, StreamErrorKind,
17};
18use crate::{
19    connection::State,
20    error::Error,
21    jetstream::{self, Context, Message},
22    StatusCode, Subscriber,
23};
24
25use bytes::Bytes;
26use futures::{future::BoxFuture, FutureExt};
27use portable_atomic::AtomicU64;
28use serde::{Deserialize, Serialize};
29#[cfg(feature = "server_2_10")]
30use std::collections::HashMap;
31use std::task::{self, Poll};
32use std::{
33    io::{self, ErrorKind},
34    pin::Pin,
35    sync::Arc,
36};
37use std::{sync::atomic::Ordering, time::Duration};
38use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle};
39use tracing::{debug, trace};
40
41const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
42
43impl Consumer<Config> {
44    /// Returns a stream of messages for Push Consumer.
45    ///
46    /// # Example
47    ///
48    /// ```no_run
49    /// # #[tokio::main]
50    /// # async fn mains() -> Result<(), async_nats::Error> {
51    /// use async_nats::jetstream::consumer::PushConsumer;
52    /// use futures::StreamExt;
53    /// use futures::TryStreamExt;
54    ///
55    /// let client = async_nats::connect("localhost:4222").await?;
56    /// let jetstream = async_nats::jetstream::new(client);
57    ///
58    /// let stream = jetstream
59    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
60    ///         name: "events".to_string(),
61    ///         max_messages: 10_000,
62    ///         ..Default::default()
63    ///     })
64    ///     .await?;
65    ///
66    /// jetstream.publish("events", "data".into()).await?;
67    ///
68    /// let consumer: PushConsumer = stream
69    ///     .get_or_create_consumer(
70    ///         "consumer",
71    ///         async_nats::jetstream::consumer::push::Config {
72    ///             durable_name: Some("consumer".to_string()),
73    ///             deliver_subject: "deliver".to_string(),
74    ///             ..Default::default()
75    ///         },
76    ///     )
77    ///     .await?;
78    ///
79    /// let mut messages = consumer.messages().await?.take(100);
80    /// while let Some(Ok(message)) = messages.next().await {
81    ///     println!("got message {:?}", message);
82    ///     message.ack().await?;
83    /// }
84    /// Ok(())
85    /// # }
86    /// ```
87    pub async fn messages(&self) -> Result<Messages, StreamError> {
88        let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
89        let subscriber = if let Some(ref group) = self.info.config.deliver_group {
90            self.context
91                .client
92                .queue_subscribe(deliver_subject, group.to_owned())
93                .await
94                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
95        } else {
96            self.context
97                .client
98                .subscribe(deliver_subject)
99                .await
100                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
101        };
102
103        Ok(Messages {
104            context: self.context.clone(),
105            config: self.config.clone(),
106            subscriber,
107            heartbeat_sleep: None,
108        })
109    }
110}
111
112pub struct Messages {
113    context: Context,
114    subscriber: Subscriber,
115    config: Config,
116    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
117}
118
119impl futures::Stream for Messages {
120    type Item = Result<Message, MessagesError>;
121
122    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
123        if !self.config.idle_heartbeat.is_zero() {
124            let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
125            match self
126                .heartbeat_sleep
127                .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
128                .poll_unpin(cx)
129            {
130                Poll::Ready(_) => {
131                    self.heartbeat_sleep = None;
132                    return Poll::Ready(Some(Err(MessagesError::new(
133                        MessagesErrorKind::MissingHeartbeat,
134                    ))));
135                }
136                Poll::Pending => (),
137            }
138        }
139        loop {
140            match self.subscriber.receiver.poll_recv(cx) {
141                Poll::Ready(maybe_message) => {
142                    self.heartbeat_sleep = None;
143                    match maybe_message {
144                        Some(message) => match message.status {
145                            Some(StatusCode::IDLE_HEARTBEAT) => {
146                                if let Some(subject) = message.reply {
147                                    // TODO store pending_publish as a future and return errors from it
148                                    let client = self.context.client.clone();
149                                    tokio::task::spawn(async move {
150                                        client
151                                            .publish(subject, Bytes::from_static(b""))
152                                            .await
153                                            .unwrap();
154                                    });
155                                }
156
157                                continue;
158                            }
159                            Some(_) => {
160                                continue;
161                            }
162                            None => {
163                                return Poll::Ready(Some(Ok(jetstream::Message {
164                                    context: self.context.clone(),
165                                    message,
166                                })))
167                            }
168                        },
169                        None => return Poll::Ready(None),
170                    }
171                }
172                Poll::Pending => return Poll::Pending,
173            }
174        }
175    }
176}
177
178/// Configuration for consumers. From a high level, the
179/// `durable_name` and `deliver_subject` fields have a particularly
180/// strong influence on the consumer's overall behavior.
181#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
182pub struct Config {
183    /// The delivery subject used by the push consumer.
184    #[serde(default)]
185    pub deliver_subject: String,
186    /// Setting `durable_name` to `Some(...)` will cause this consumer
187    /// to be "durable". This may be a good choice for workloads that
188    /// benefit from the `JetStream` server or cluster remembering the
189    /// progress of consumers for fault tolerance purposes. If a consumer
190    /// crashes, the `JetStream` server or cluster will remember which
191    /// messages the consumer acknowledged. When the consumer recovers,
192    /// this information will allow the consumer to resume processing
193    /// where it left off. If you're unsure, set this to `Some(...)`.
194    ///
195    /// Setting `durable_name` to `None` will cause this consumer to
196    /// be "ephemeral". This may be a good choice for workloads where
197    /// you don't need the `JetStream` server to remember the consumer's
198    /// progress in the case of a crash, such as certain "high churn"
199    /// workloads or workloads where a crashed instance is not required
200    /// to recover.
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub durable_name: Option<String>,
203    /// A name of the consumer. Can be specified for both durable and ephemeral
204    /// consumers.
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub name: Option<String>,
207    /// A short description of the purpose of this consumer.
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub description: Option<String>,
210    #[serde(default, skip_serializing_if = "Option::is_none")]
211    /// Deliver group to use.
212    pub deliver_group: Option<String>,
213    /// Allows for a variety of options that determine how this consumer will receive messages
214    #[serde(flatten)]
215    pub deliver_policy: DeliverPolicy,
216    /// How messages should be acknowledged
217    pub ack_policy: AckPolicy,
218    /// How long to allow messages to remain un-acknowledged before attempting redelivery
219    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
220    pub ack_wait: Duration,
221    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
222    #[serde(default, skip_serializing_if = "is_default")]
223    pub max_deliver: i64,
224    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
225    #[serde(default, skip_serializing_if = "is_default")]
226    pub filter_subject: String,
227    #[cfg(feature = "server_2_10")]
228    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
229    #[serde(default, skip_serializing_if = "is_default")]
230    pub filter_subjects: Vec<String>,
231    /// Whether messages are sent as quickly as possible or at the rate of receipt
232    pub replay_policy: ReplayPolicy,
233    /// The rate of message delivery in bits per second
234    #[serde(default, skip_serializing_if = "is_default")]
235    pub rate_limit: u64,
236    /// What percentage of acknowledgments should be samples for observability, 0-100
237    #[serde(default, skip_serializing_if = "is_default")]
238    pub sample_frequency: u8,
239    /// The maximum number of waiting consumers.
240    #[serde(default, skip_serializing_if = "is_default")]
241    pub max_waiting: i64,
242    /// The maximum number of unacknowledged messages that may be
243    /// in-flight before pausing sending additional messages to
244    /// this consumer.
245    #[serde(default, skip_serializing_if = "is_default")]
246    pub max_ack_pending: i64,
247    /// Only deliver headers without payloads.
248    #[serde(default, skip_serializing_if = "is_default")]
249    pub headers_only: bool,
250    /// Enable flow control messages
251    #[serde(default, skip_serializing_if = "is_default")]
252    pub flow_control: bool,
253    /// Enable idle heartbeat messages
254    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
255    pub idle_heartbeat: Duration,
256    /// Number of consumer replicas
257    #[serde(default, skip_serializing_if = "is_default")]
258    pub num_replicas: usize,
259    /// Force consumer to use memory storage.
260    #[serde(default, skip_serializing_if = "is_default")]
261    pub memory_storage: bool,
262    #[cfg(feature = "server_2_10")]
263    // Additional consumer metadata.
264    #[serde(default, skip_serializing_if = "is_default")]
265    pub metadata: HashMap<String, String>,
266    /// Custom backoff for missed acknowledgments.
267    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
268    pub backoff: Vec<Duration>,
269    /// Threshold for consumer inactivity
270    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
271    pub inactive_threshold: Duration,
272}
273
274impl FromConsumer for Config {
275    fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
276        if config.deliver_subject.is_none() {
277            return Err(Box::new(io::Error::new(
278                ErrorKind::Other,
279                "push consumer must have delivery subject",
280            )));
281        }
282
283        Ok(Config {
284            deliver_subject: config.deliver_subject.unwrap(),
285            durable_name: config.durable_name,
286            name: config.name,
287            description: config.description,
288            deliver_group: config.deliver_group,
289            deliver_policy: config.deliver_policy,
290            ack_policy: config.ack_policy,
291            ack_wait: config.ack_wait,
292            max_deliver: config.max_deliver,
293            filter_subject: config.filter_subject,
294            #[cfg(feature = "server_2_10")]
295            filter_subjects: config.filter_subjects,
296            replay_policy: config.replay_policy,
297            rate_limit: config.rate_limit,
298            sample_frequency: config.sample_frequency,
299            max_waiting: config.max_waiting,
300            max_ack_pending: config.max_ack_pending,
301            headers_only: config.headers_only,
302            flow_control: config.flow_control,
303            idle_heartbeat: config.idle_heartbeat,
304            num_replicas: config.num_replicas,
305            memory_storage: config.memory_storage,
306            #[cfg(feature = "server_2_10")]
307            metadata: config.metadata,
308            backoff: config.backoff,
309            inactive_threshold: config.inactive_threshold,
310        })
311    }
312}
313
314impl IntoConsumerConfig for Config {
315    fn into_consumer_config(self) -> jetstream::consumer::Config {
316        jetstream::consumer::Config {
317            deliver_subject: Some(self.deliver_subject),
318            durable_name: self.durable_name,
319            name: self.name,
320            description: self.description,
321            deliver_group: self.deliver_group,
322            deliver_policy: self.deliver_policy,
323            ack_policy: self.ack_policy,
324            ack_wait: self.ack_wait,
325            max_deliver: self.max_deliver,
326            filter_subject: self.filter_subject,
327            #[cfg(feature = "server_2_10")]
328            filter_subjects: self.filter_subjects,
329            replay_policy: self.replay_policy,
330            rate_limit: self.rate_limit,
331            sample_frequency: self.sample_frequency,
332            max_waiting: self.max_waiting,
333            max_ack_pending: self.max_ack_pending,
334            headers_only: self.headers_only,
335            flow_control: self.flow_control,
336            idle_heartbeat: self.idle_heartbeat,
337            max_batch: 0,
338            max_bytes: 0,
339            max_expires: Duration::default(),
340            inactive_threshold: self.inactive_threshold,
341            num_replicas: self.num_replicas,
342            memory_storage: self.memory_storage,
343            #[cfg(feature = "server_2_10")]
344            metadata: self.metadata,
345            backoff: self.backoff,
346        }
347    }
348}
349impl IntoConsumerConfig for &Config {
350    fn into_consumer_config(self) -> jetstream::consumer::Config {
351        self.clone().into_consumer_config()
352    }
353}
354fn is_default<T: Default + Eq>(t: &T) -> bool {
355    t == &T::default()
356}
357
358/// Configuration for consumers. From a high level, the
359/// `durable_name` and `deliver_subject` fields have a particularly
360/// strong influence on the consumer's overall behavior.
361#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
362pub struct OrderedConfig {
363    /// The delivery subject used by the push consumer.
364    #[serde(default)]
365    pub deliver_subject: String,
366    /// A name of the consumer. Can be specified for both durable and ephemeral
367    /// consumers.
368    #[serde(default, skip_serializing_if = "Option::is_none")]
369    pub name: Option<String>,
370    /// A short description of the purpose of this consumer.
371    #[serde(default, skip_serializing_if = "Option::is_none")]
372    pub description: Option<String>,
373    #[serde(default, skip_serializing_if = "is_default")]
374    pub filter_subject: String,
375    #[cfg(feature = "server_2_10")]
376    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
377    #[serde(default, skip_serializing_if = "is_default")]
378    pub filter_subjects: Vec<String>,
379    /// Whether messages are sent as quickly as possible or at the rate of receipt
380    pub replay_policy: ReplayPolicy,
381    /// The rate of message delivery in bits per second
382    #[serde(default, skip_serializing_if = "is_default")]
383    pub rate_limit: u64,
384    /// What percentage of acknowledgments should be samples for observability, 0-100
385    #[serde(default, skip_serializing_if = "is_default")]
386    pub sample_frequency: u8,
387    /// Only deliver headers without payloads.
388    #[serde(default, skip_serializing_if = "is_default")]
389    pub headers_only: bool,
390    /// Allows for a variety of options that determine how this consumer will receive messages
391    #[serde(flatten)]
392    pub deliver_policy: DeliverPolicy,
393    /// The maximum number of waiting consumers.
394    #[serde(default, skip_serializing_if = "is_default")]
395    pub max_waiting: i64,
396    #[cfg(feature = "server_2_10")]
397    // Additional consumer metadata.
398    #[serde(default, skip_serializing_if = "is_default")]
399    pub metadata: HashMap<String, String>,
400}
401
402impl FromConsumer for OrderedConfig {
403    fn try_from_consumer_config(
404        config: crate::jetstream::consumer::Config,
405    ) -> Result<Self, crate::Error>
406    where
407        Self: Sized,
408    {
409        if config.deliver_subject.is_none() {
410            return Err(Box::new(io::Error::new(
411                ErrorKind::Other,
412                "push consumer must have delivery subject",
413            )));
414        }
415        Ok(OrderedConfig {
416            name: config.name,
417            deliver_subject: config.deliver_subject.unwrap(),
418            description: config.description,
419            filter_subject: config.filter_subject,
420            #[cfg(feature = "server_2_10")]
421            filter_subjects: config.filter_subjects,
422            replay_policy: config.replay_policy,
423            rate_limit: config.rate_limit,
424            sample_frequency: config.sample_frequency,
425            headers_only: config.headers_only,
426            deliver_policy: config.deliver_policy,
427            max_waiting: config.max_waiting,
428            #[cfg(feature = "server_2_10")]
429            metadata: config.metadata,
430        })
431    }
432}
433
434impl IntoConsumerConfig for OrderedConfig {
435    fn into_consumer_config(self) -> super::Config {
436        jetstream::consumer::Config {
437            deliver_subject: Some(self.deliver_subject),
438            durable_name: None,
439            name: self.name,
440            description: self.description,
441            deliver_group: None,
442            deliver_policy: self.deliver_policy,
443            ack_policy: AckPolicy::None,
444            ack_wait: Duration::default(),
445            max_deliver: 1,
446            filter_subject: self.filter_subject,
447            #[cfg(feature = "server_2_10")]
448            filter_subjects: self.filter_subjects,
449            replay_policy: self.replay_policy,
450            rate_limit: self.rate_limit,
451            sample_frequency: self.sample_frequency,
452            max_waiting: self.max_waiting,
453            max_ack_pending: 0,
454            headers_only: self.headers_only,
455            flow_control: true,
456            idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
457            max_batch: 0,
458            max_bytes: 0,
459            max_expires: Duration::default(),
460            inactive_threshold: Duration::from_secs(30),
461            num_replicas: 1,
462            memory_storage: true,
463            #[cfg(feature = "server_2_10")]
464            metadata: self.metadata,
465            backoff: Vec::new(),
466        }
467    }
468}
469
470impl Consumer<OrderedConfig> {
471    pub async fn messages<'a>(self) -> Result<Ordered, StreamError> {
472        let subscriber = self
473            .context
474            .client
475            .subscribe(self.info.config.deliver_subject.clone().unwrap())
476            .await
477            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
478
479        let last_sequence = Arc::new(AtomicU64::new(0));
480        let consumer_sequence = Arc::new(AtomicU64::new(0));
481        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
482        let handle = tokio::task::spawn({
483            let stream_name = self.info.stream_name.clone();
484            let config = self.config.clone();
485            let mut context = self.context.clone();
486            let last_sequence = last_sequence.clone();
487            let consumer_sequence = consumer_sequence.clone();
488            let state = self.context.client.state.clone();
489            async move {
490                loop {
491                    let current_state = state.borrow().to_owned();
492
493                    context.client.state.changed().await.unwrap();
494                    // State change notification received within the timeout
495                    if state.borrow().to_owned() != State::Connected
496                        || current_state == State::Connected
497                    {
498                        continue;
499                    }
500                    debug!("reconnected. trigger consumer recreation");
501
502                    debug!(
503                        "idle heartbeats expired. recreating consumer s: {},  {:?}",
504                        stream_name, config
505                    );
506                    let consumer = tryhard::retry_fn(|| {
507                        recreate_ephemeral_consumer(
508                            context.clone(),
509                            config.clone(),
510                            stream_name.clone(),
511                            last_sequence.load(Ordering::Relaxed),
512                        )
513                    })
514                    .retries(5)
515                    .exponential_backoff(Duration::from_millis(500))
516                    .await;
517                    if let Err(err) = consumer {
518                        shutdown_tx.send(err).unwrap();
519                        break;
520                    }
521                    debug!("resetting consume sequence to 0");
522                    consumer_sequence.store(0, Ordering::Relaxed);
523                }
524            }
525        });
526
527        Ok(Ordered {
528            context: self.context.clone(),
529            consumer: self,
530            subscriber: Some(subscriber),
531            subscriber_future: None,
532            stream_sequence: last_sequence,
533            consumer_sequence,
534            shutdown: shutdown_rx,
535            handle,
536            heartbeat_sleep: None,
537        })
538    }
539}
540
541pub struct Ordered {
542    context: Context,
543    consumer: Consumer<OrderedConfig>,
544    subscriber: Option<Subscriber>,
545    subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
546    stream_sequence: Arc<AtomicU64>,
547    consumer_sequence: Arc<AtomicU64>,
548    shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
549    handle: JoinHandle<()>,
550    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
551}
552
553impl Drop for Ordered {
554    fn drop(&mut self) {
555        // Stop trying to recreate the consumer
556        self.handle.abort()
557    }
558}
559
560impl futures::Stream for Ordered {
561    type Item = Result<Message, OrderedError>;
562
563    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
564        match self
565            .heartbeat_sleep
566            .get_or_insert_with(|| {
567                Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
568            })
569            .poll_unpin(cx)
570        {
571            Poll::Ready(_) => {
572                self.heartbeat_sleep = None;
573                return Poll::Ready(Some(Err(OrderedError::new(
574                    OrderedErrorKind::MissingHeartbeat,
575                ))));
576            }
577            Poll::Pending => (),
578        }
579
580        loop {
581            match self.shutdown.try_recv() {
582                Ok(err) => {
583                    return Poll::Ready(Some(Err(OrderedError::with_source(
584                        OrderedErrorKind::Other,
585                        err,
586                    ))))
587                }
588                Err(TryRecvError::Closed) => {
589                    return Poll::Ready(Some(Err(OrderedError::with_source(
590                        OrderedErrorKind::Other,
591                        "consumer task closed",
592                    ))))
593                }
594                Err(TryRecvError::Empty) => {}
595            }
596            if self.subscriber.is_none() {
597                match self.subscriber_future.as_mut() {
598                    None => {
599                        trace!(
600                            "subscriber and subscriber future are None. Recreating the consumer"
601                        );
602                        let context = self.context.clone();
603                        let sequence = self.stream_sequence.clone();
604                        let config = self.consumer.config.clone();
605                        let stream_name = self.consumer.info.stream_name.clone();
606                        let subscriber_future =
607                            self.subscriber_future.insert(Box::pin(async move {
608                                recreate_consumer_and_subscription(
609                                    context,
610                                    config,
611                                    stream_name,
612                                    sequence.load(Ordering::Relaxed),
613                                )
614                                .await
615                            }));
616                        match subscriber_future.as_mut().poll(cx) {
617                            Poll::Ready(subscriber) => {
618                                self.subscriber_future = None;
619                                self.consumer_sequence.store(0, Ordering::Relaxed);
620                                self.subscriber = Some(subscriber.map_err(|err| {
621                                    OrderedError::with_source(OrderedErrorKind::Recreate, err)
622                                })?);
623                            }
624                            Poll::Pending => {
625                                return Poll::Pending;
626                            }
627                        }
628                    }
629                    Some(subscriber) => match subscriber.as_mut().poll(cx) {
630                        Poll::Ready(subscriber) => {
631                            self.subscriber_future = None;
632                            self.consumer_sequence.store(0, Ordering::Relaxed);
633                            self.subscriber = Some(subscriber.map_err(|err| {
634                                OrderedError::with_source(OrderedErrorKind::Recreate, err)
635                            })?);
636                        }
637                        Poll::Pending => {
638                            return Poll::Pending;
639                        }
640                    },
641                }
642            }
643            if let Some(subscriber) = self.subscriber.as_mut() {
644                match subscriber.receiver.poll_recv(cx) {
645                    Poll::Ready(maybe_message) => match maybe_message {
646                        Some(message) => {
647                            self.heartbeat_sleep = None;
648                            match message.status {
649                                Some(StatusCode::IDLE_HEARTBEAT) => {
650                                    debug!("received idle heartbeats");
651                                    if let Some(headers) = message.headers.as_ref() {
652                                        if let Some(sequence) =
653                                            headers.get_last(crate::header::NATS_LAST_CONSUMER)
654                                        {
655                                            let sequence: u64 =
656                                                sequence.as_str().parse().map_err(|err| {
657                                                    OrderedError::with_source(
658                                                        OrderedErrorKind::Other,
659                                                        err,
660                                                    )
661                                                })?;
662
663                                            let last_sequence =
664                                                self.consumer_sequence.load(Ordering::Relaxed);
665
666                                            if sequence != last_sequence {
667                                                debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
668                                                self.subscriber = None;
669                                            }
670                                        }
671                                    }
672                                    // flow control.
673                                    if let Some(subject) = message.reply.clone() {
674                                        trace!("received flow control message");
675                                        let client = self.context.client.clone();
676                                        tokio::task::spawn(async move {
677                                            client
678                                                .publish(subject, Bytes::from_static(b""))
679                                                .await
680                                                .ok();
681                                        });
682                                    }
683                                    continue;
684                                }
685                                Some(status) => {
686                                    debug!("received status message: {}", status);
687                                    continue;
688                                }
689                                None => {
690                                    trace!("received a message");
691                                    let jetstream_message = jetstream::message::Message {
692                                        message,
693                                        context: self.context.clone(),
694                                    };
695
696                                    let info = jetstream_message.info().map_err(|err| {
697                                        OrderedError::with_source(OrderedErrorKind::Other, err)
698                                    })?;
699                                    trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
700                                               self.consumer_sequence,
701                                               self.stream_sequence,
702                                               info.consumer_sequence,
703                                               info.stream_sequence);
704                                    if info.consumer_sequence
705                                        != self.consumer_sequence.load(Ordering::Relaxed) + 1
706                                    {
707                                        debug!(
708                                            "ordered consumer mismatch. current {}, info: {}",
709                                            self.consumer_sequence.load(Ordering::Relaxed),
710                                            info.consumer_sequence
711                                        );
712                                        self.subscriber = None;
713                                        self.consumer_sequence.store(0, Ordering::Relaxed);
714                                        continue;
715                                    }
716                                    self.stream_sequence
717                                        .store(info.stream_sequence, Ordering::Relaxed);
718                                    self.consumer_sequence
719                                        .store(info.consumer_sequence, Ordering::Relaxed);
720                                    return Poll::Ready(Some(Ok(jetstream_message)));
721                                }
722                            }
723                        }
724                        None => {
725                            return Poll::Ready(None);
726                        }
727                    },
728                    Poll::Pending => return Poll::Pending,
729                }
730            }
731        }
732    }
733}
734
735#[derive(Clone, Debug, PartialEq)]
736pub enum OrderedErrorKind {
737    MissingHeartbeat,
738    ConsumerDeleted,
739    PullBasedConsumer,
740    Recreate,
741    Other,
742}
743
744impl std::fmt::Display for OrderedErrorKind {
745    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746        match self {
747            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
748            Self::ConsumerDeleted => write!(f, "consumer deleted"),
749            Self::Other => write!(f, "error"),
750            Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
751            Self::Recreate => write!(f, "consumer recreation failed"),
752        }
753    }
754}
755
756pub type OrderedError = Error<OrderedErrorKind>;
757
758impl From<MessagesError> for OrderedError {
759    fn from(err: MessagesError) -> Self {
760        match err.kind() {
761            MessagesErrorKind::MissingHeartbeat => {
762                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
763            }
764            MessagesErrorKind::ConsumerDeleted => {
765                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
766            }
767            MessagesErrorKind::PullBasedConsumer => {
768                OrderedError::new(OrderedErrorKind::PullBasedConsumer)
769            }
770            MessagesErrorKind::Other => OrderedError {
771                kind: OrderedErrorKind::Other,
772                source: err.source,
773            },
774        }
775    }
776}
777
778#[derive(Clone, Copy, Debug, PartialEq)]
779pub enum MessagesErrorKind {
780    MissingHeartbeat,
781    ConsumerDeleted,
782    PullBasedConsumer,
783    Other,
784}
785
786impl std::fmt::Display for MessagesErrorKind {
787    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
788        match self {
789            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
790            Self::ConsumerDeleted => write!(f, "consumer deleted"),
791            Self::Other => write!(f, "error"),
792            Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
793        }
794    }
795}
796
797pub type MessagesError = Error<MessagesErrorKind>;
798
799#[derive(Clone, Copy, Debug, PartialEq)]
800pub enum ConsumerRecreateErrorKind {
801    GetStream,
802    Subscription,
803    Recreate,
804    TimedOut,
805}
806
807impl std::fmt::Display for ConsumerRecreateErrorKind {
808    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809        match self {
810            Self::GetStream => write!(f, "error getting stream"),
811            Self::Recreate => write!(f, "consumer creation failed"),
812            Self::TimedOut => write!(f, "timed out"),
813            Self::Subscription => write!(f, "failed to resubscribe"),
814        }
815    }
816}
817
818pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
819
820async fn recreate_consumer_and_subscription(
821    context: Context,
822    mut config: OrderedConfig,
823    stream_name: String,
824    sequence: u64,
825) -> Result<Subscriber, ConsumerRecreateError> {
826    let delivery_subject = context.client.new_inbox();
827    config.deliver_subject = delivery_subject;
828
829    let subscriber = context
830        .client
831        .subscribe(config.deliver_subject.clone())
832        .await
833        .map_err(|err| {
834            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
835        })?;
836
837    recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
838    Ok(subscriber)
839}
840async fn recreate_ephemeral_consumer(
841    context: Context,
842    config: OrderedConfig,
843    stream_name: String,
844    sequence: u64,
845) -> Result<(), ConsumerRecreateError> {
846    let strategy =
847        tryhard::RetryFutureConfig::new(5).exponential_backoff(Duration::from_millis(500));
848
849    let stream = tryhard::retry_fn(|| context.get_stream(stream_name.clone()))
850        .with_config(strategy)
851        .await
852        .map_err(|err| {
853            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
854        })?;
855
856    let deliver_policy = {
857        if sequence == 0 {
858            DeliverPolicy::All
859        } else {
860            DeliverPolicy::ByStartSequence {
861                start_sequence: sequence + 1,
862            }
863        }
864    };
865
866    tryhard::retry_fn(|| {
867        let config = config.clone();
868        tokio::time::timeout(
869            Duration::from_secs(5),
870            stream.create_consumer(jetstream::consumer::push::OrderedConfig {
871                deliver_policy,
872                ..config
873            }),
874        )
875    })
876    .with_config(strategy)
877    .await
878    .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
879    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
880
881    Ok(())
882}