async_nats/jetstream/consumer/
push.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use super::{
15    backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16    StreamError, StreamErrorKind,
17};
18
19#[cfg(feature = "server_2_11")]
20use super::PriorityPolicy;
21
22use crate::{
23    connection::State,
24    error::Error,
25    jetstream::{self, Context, Message},
26    StatusCode, Subscriber,
27};
28
29use bytes::Bytes;
30use futures::{future::BoxFuture, FutureExt};
31use portable_atomic::AtomicU64;
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "server_2_10")]
34use std::collections::HashMap;
35use std::task::{self, Poll};
36use std::{
37    io::{self, ErrorKind},
38    pin::Pin,
39    sync::Arc,
40};
41use std::{sync::atomic::Ordering, time::Duration};
42#[cfg(feature = "server_2_11")]
43use time::{serde::rfc3339, OffsetDateTime};
44use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle};
45use tracing::{debug, trace};
46
47const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
48
49impl Consumer<Config> {
50    /// Returns a stream of messages for Push Consumer.
51    ///
52    /// # Example
53    ///
54    /// ```no_run
55    /// # #[tokio::main]
56    /// # async fn mains() -> Result<(), async_nats::Error> {
57    /// use async_nats::jetstream::consumer::PushConsumer;
58    /// use futures::StreamExt;
59    /// use futures::TryStreamExt;
60    ///
61    /// let client = async_nats::connect("localhost:4222").await?;
62    /// let jetstream = async_nats::jetstream::new(client);
63    ///
64    /// let stream = jetstream
65    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
66    ///         name: "events".to_string(),
67    ///         max_messages: 10_000,
68    ///         ..Default::default()
69    ///     })
70    ///     .await?;
71    ///
72    /// jetstream.publish("events", "data".into()).await?;
73    ///
74    /// let consumer: PushConsumer = stream
75    ///     .get_or_create_consumer(
76    ///         "consumer",
77    ///         async_nats::jetstream::consumer::push::Config {
78    ///             durable_name: Some("consumer".to_string()),
79    ///             deliver_subject: "deliver".to_string(),
80    ///             ..Default::default()
81    ///         },
82    ///     )
83    ///     .await?;
84    ///
85    /// let mut messages = consumer.messages().await?.take(100);
86    /// while let Some(Ok(message)) = messages.next().await {
87    ///     println!("got message {:?}", message);
88    ///     message.ack().await?;
89    /// }
90    /// Ok(())
91    /// # }
92    /// ```
93    pub async fn messages(&self) -> Result<Messages, StreamError> {
94        let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
95        let subscriber = if let Some(ref group) = self.info.config.deliver_group {
96            self.context
97                .client
98                .queue_subscribe(deliver_subject, group.to_owned())
99                .await
100                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
101        } else {
102            self.context
103                .client
104                .subscribe(deliver_subject)
105                .await
106                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
107        };
108
109        Ok(Messages {
110            context: self.context.clone(),
111            config: self.config.clone(),
112            subscriber,
113            heartbeat_sleep: None,
114        })
115    }
116}
117
118pub struct Messages {
119    context: Context,
120    subscriber: Subscriber,
121    config: Config,
122    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
123}
124
125impl futures::Stream for Messages {
126    type Item = Result<Message, MessagesError>;
127
128    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
129        if !self.config.idle_heartbeat.is_zero() {
130            let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
131            match self
132                .heartbeat_sleep
133                .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
134                .poll_unpin(cx)
135            {
136                Poll::Ready(_) => {
137                    self.heartbeat_sleep = None;
138                    return Poll::Ready(Some(Err(MessagesError::new(
139                        MessagesErrorKind::MissingHeartbeat,
140                    ))));
141                }
142                Poll::Pending => (),
143            }
144        }
145        loop {
146            match self.subscriber.receiver.poll_recv(cx) {
147                Poll::Ready(maybe_message) => {
148                    self.heartbeat_sleep = None;
149                    match maybe_message {
150                        Some(message) => match message.status {
151                            Some(StatusCode::IDLE_HEARTBEAT) => {
152                                if let Some(subject) = message.reply {
153                                    // TODO store pending_publish as a future and return errors from it
154                                    let client = self.context.client.clone();
155                                    tokio::task::spawn(async move {
156                                        client
157                                            .publish(subject, Bytes::from_static(b""))
158                                            .await
159                                            .unwrap();
160                                    });
161                                }
162
163                                continue;
164                            }
165                            Some(_) => {
166                                continue;
167                            }
168                            None => {
169                                return Poll::Ready(Some(Ok(jetstream::Message {
170                                    context: self.context.clone(),
171                                    message,
172                                })))
173                            }
174                        },
175                        None => return Poll::Ready(None),
176                    }
177                }
178                Poll::Pending => return Poll::Pending,
179            }
180        }
181    }
182}
183
184/// Configuration for consumers. From a high level, the
185/// `durable_name` and `deliver_subject` fields have a particularly
186/// strong influence on the consumer's overall behavior.
187#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
188pub struct Config {
189    /// The delivery subject used by the push consumer.
190    #[serde(default)]
191    pub deliver_subject: String,
192    /// Setting `durable_name` to `Some(...)` will cause this consumer
193    /// to be "durable". This may be a good choice for workloads that
194    /// benefit from the `JetStream` server or cluster remembering the
195    /// progress of consumers for fault tolerance purposes. If a consumer
196    /// crashes, the `JetStream` server or cluster will remember which
197    /// messages the consumer acknowledged. When the consumer recovers,
198    /// this information will allow the consumer to resume processing
199    /// where it left off. If you're unsure, set this to `Some(...)`.
200    ///
201    /// Setting `durable_name` to `None` will cause this consumer to
202    /// be "ephemeral". This may be a good choice for workloads where
203    /// you don't need the `JetStream` server to remember the consumer's
204    /// progress in the case of a crash, such as certain "high churn"
205    /// workloads or workloads where a crashed instance is not required
206    /// to recover.
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub durable_name: Option<String>,
209    /// A name of the consumer. Can be specified for both durable and ephemeral
210    /// consumers.
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub name: Option<String>,
213    /// A short description of the purpose of this consumer.
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub description: Option<String>,
216    #[serde(default, skip_serializing_if = "Option::is_none")]
217    /// Deliver group to use.
218    pub deliver_group: Option<String>,
219    /// Allows for a variety of options that determine how this consumer will receive messages
220    #[serde(flatten)]
221    pub deliver_policy: DeliverPolicy,
222    /// How messages should be acknowledged
223    pub ack_policy: AckPolicy,
224    /// How long to allow messages to remain un-acknowledged before attempting redelivery
225    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
226    pub ack_wait: Duration,
227    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
228    #[serde(default, skip_serializing_if = "is_default")]
229    pub max_deliver: i64,
230    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
231    #[serde(default, skip_serializing_if = "is_default")]
232    pub filter_subject: String,
233    #[cfg(feature = "server_2_10")]
234    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
235    #[serde(default, skip_serializing_if = "is_default")]
236    pub filter_subjects: Vec<String>,
237    /// Whether messages are sent as quickly as possible or at the rate of receipt
238    pub replay_policy: ReplayPolicy,
239    /// The rate of message delivery in bits per second
240    #[serde(default, skip_serializing_if = "is_default")]
241    pub rate_limit: u64,
242    /// What percentage of acknowledgments should be samples for observability, 0-100
243    #[serde(
244        rename = "sample_freq",
245        with = "super::sample_freq_deser",
246        default,
247        skip_serializing_if = "is_default"
248    )]
249    pub sample_frequency: u8,
250    /// The maximum number of waiting consumers.
251    #[serde(default, skip_serializing_if = "is_default")]
252    pub max_waiting: i64,
253    /// The maximum number of unacknowledged messages that may be
254    /// in-flight before pausing sending additional messages to
255    /// this consumer.
256    #[serde(default, skip_serializing_if = "is_default")]
257    pub max_ack_pending: i64,
258    /// Only deliver headers without payloads.
259    #[serde(default, skip_serializing_if = "is_default")]
260    pub headers_only: bool,
261    /// Enable flow control messages
262    #[serde(default, skip_serializing_if = "is_default")]
263    pub flow_control: bool,
264    /// Enable idle heartbeat messages
265    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
266    pub idle_heartbeat: Duration,
267    /// Number of consumer replicas
268    #[serde(default, skip_serializing_if = "is_default")]
269    pub num_replicas: usize,
270    /// Force consumer to use memory storage.
271    #[serde(default, skip_serializing_if = "is_default")]
272    pub memory_storage: bool,
273    #[cfg(feature = "server_2_10")]
274    // Additional consumer metadata.
275    #[serde(default, skip_serializing_if = "is_default")]
276    pub metadata: HashMap<String, String>,
277    /// Custom backoff for missed acknowledgments.
278    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
279    pub backoff: Vec<Duration>,
280    /// Threshold for consumer inactivity
281    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
282    pub inactive_threshold: Duration,
283    /// Create a consumer paused until provided deadline.
284    #[cfg(feature = "server_2_11")]
285    #[serde(
286        default,
287        with = "rfc3339::option",
288        skip_serializing_if = "Option::is_none"
289    )]
290    pub pause_until: Option<OffsetDateTime>,
291}
292
293impl FromConsumer for Config {
294    fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
295        if config.deliver_subject.is_none() {
296            return Err(Box::new(io::Error::new(
297                ErrorKind::Other,
298                "push consumer must have delivery subject",
299            )));
300        }
301
302        Ok(Config {
303            deliver_subject: config.deliver_subject.unwrap(),
304            durable_name: config.durable_name,
305            name: config.name,
306            description: config.description,
307            deliver_group: config.deliver_group,
308            deliver_policy: config.deliver_policy,
309            ack_policy: config.ack_policy,
310            ack_wait: config.ack_wait,
311            max_deliver: config.max_deliver,
312            filter_subject: config.filter_subject,
313            #[cfg(feature = "server_2_10")]
314            filter_subjects: config.filter_subjects,
315            replay_policy: config.replay_policy,
316            rate_limit: config.rate_limit,
317            sample_frequency: config.sample_frequency,
318            max_waiting: config.max_waiting,
319            max_ack_pending: config.max_ack_pending,
320            headers_only: config.headers_only,
321            flow_control: config.flow_control,
322            idle_heartbeat: config.idle_heartbeat,
323            num_replicas: config.num_replicas,
324            memory_storage: config.memory_storage,
325            #[cfg(feature = "server_2_10")]
326            metadata: config.metadata,
327            backoff: config.backoff,
328            inactive_threshold: config.inactive_threshold,
329            #[cfg(feature = "server_2_11")]
330            pause_until: config.pause_until,
331        })
332    }
333}
334
335impl IntoConsumerConfig for Config {
336    fn into_consumer_config(self) -> jetstream::consumer::Config {
337        jetstream::consumer::Config {
338            deliver_subject: Some(self.deliver_subject),
339            durable_name: self.durable_name,
340            name: self.name,
341            description: self.description,
342            deliver_group: self.deliver_group,
343            deliver_policy: self.deliver_policy,
344            ack_policy: self.ack_policy,
345            ack_wait: self.ack_wait,
346            max_deliver: self.max_deliver,
347            filter_subject: self.filter_subject,
348            #[cfg(feature = "server_2_10")]
349            filter_subjects: self.filter_subjects,
350            replay_policy: self.replay_policy,
351            rate_limit: self.rate_limit,
352            sample_frequency: self.sample_frequency,
353            max_waiting: self.max_waiting,
354            max_ack_pending: self.max_ack_pending,
355            headers_only: self.headers_only,
356            flow_control: self.flow_control,
357            idle_heartbeat: self.idle_heartbeat,
358            max_batch: 0,
359            max_bytes: 0,
360            max_expires: Duration::default(),
361            inactive_threshold: self.inactive_threshold,
362            num_replicas: self.num_replicas,
363            memory_storage: self.memory_storage,
364            #[cfg(feature = "server_2_10")]
365            metadata: self.metadata,
366            backoff: self.backoff,
367            #[cfg(feature = "server_2_11")]
368            priority_policy: PriorityPolicy::None,
369            #[cfg(feature = "server_2_11")]
370            priority_groups: Vec::new(),
371            #[cfg(feature = "server_2_11")]
372            pause_until: self.pause_until,
373        }
374    }
375}
376impl IntoConsumerConfig for &Config {
377    fn into_consumer_config(self) -> jetstream::consumer::Config {
378        self.clone().into_consumer_config()
379    }
380}
381fn is_default<T: Default + Eq>(t: &T) -> bool {
382    t == &T::default()
383}
384
385/// Configuration for consumers. From a high level, the
386/// `durable_name` and `deliver_subject` fields have a particularly
387/// strong influence on the consumer's overall behavior.
388#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
389pub struct OrderedConfig {
390    /// The delivery subject used by the push consumer.
391    #[serde(default)]
392    pub deliver_subject: String,
393    /// A name of the consumer. Can be specified for both durable and ephemeral
394    /// consumers.
395    #[serde(default, skip_serializing_if = "Option::is_none")]
396    pub name: Option<String>,
397    /// A short description of the purpose of this consumer.
398    #[serde(default, skip_serializing_if = "Option::is_none")]
399    pub description: Option<String>,
400    #[serde(default, skip_serializing_if = "is_default")]
401    pub filter_subject: String,
402    #[cfg(feature = "server_2_10")]
403    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
404    #[serde(default, skip_serializing_if = "is_default")]
405    pub filter_subjects: Vec<String>,
406    /// Whether messages are sent as quickly as possible or at the rate of receipt
407    pub replay_policy: ReplayPolicy,
408    /// The rate of message delivery in bits per second
409    #[serde(default, skip_serializing_if = "is_default")]
410    pub rate_limit: u64,
411    /// What percentage of acknowledgments should be samples for observability, 0-100
412    #[serde(
413        rename = "sample_freq",
414        with = "super::sample_freq_deser",
415        default,
416        skip_serializing_if = "is_default"
417    )]
418    pub sample_frequency: u8,
419    /// Only deliver headers without payloads.
420    #[serde(default, skip_serializing_if = "is_default")]
421    pub headers_only: bool,
422    /// Allows for a variety of options that determine how this consumer will receive messages
423    #[serde(flatten)]
424    pub deliver_policy: DeliverPolicy,
425    /// The maximum number of waiting consumers.
426    #[serde(default, skip_serializing_if = "is_default")]
427    pub max_waiting: i64,
428    #[cfg(feature = "server_2_10")]
429    // Additional consumer metadata.
430    #[serde(default, skip_serializing_if = "is_default")]
431    pub metadata: HashMap<String, String>,
432}
433
434impl FromConsumer for OrderedConfig {
435    fn try_from_consumer_config(
436        config: crate::jetstream::consumer::Config,
437    ) -> Result<Self, crate::Error>
438    where
439        Self: Sized,
440    {
441        if config.deliver_subject.is_none() {
442            return Err(Box::new(io::Error::new(
443                ErrorKind::Other,
444                "push consumer must have delivery subject",
445            )));
446        }
447        Ok(OrderedConfig {
448            name: config.name,
449            deliver_subject: config.deliver_subject.unwrap(),
450            description: config.description,
451            filter_subject: config.filter_subject,
452            #[cfg(feature = "server_2_10")]
453            filter_subjects: config.filter_subjects,
454            replay_policy: config.replay_policy,
455            rate_limit: config.rate_limit,
456            sample_frequency: config.sample_frequency,
457            headers_only: config.headers_only,
458            deliver_policy: config.deliver_policy,
459            max_waiting: config.max_waiting,
460            #[cfg(feature = "server_2_10")]
461            metadata: config.metadata,
462        })
463    }
464}
465
466impl IntoConsumerConfig for OrderedConfig {
467    fn into_consumer_config(self) -> super::Config {
468        jetstream::consumer::Config {
469            deliver_subject: Some(self.deliver_subject),
470            durable_name: None,
471            name: self.name,
472            description: self.description,
473            deliver_group: None,
474            deliver_policy: self.deliver_policy,
475            ack_policy: AckPolicy::None,
476            ack_wait: Duration::default(),
477            max_deliver: 1,
478            filter_subject: self.filter_subject,
479            #[cfg(feature = "server_2_10")]
480            filter_subjects: self.filter_subjects,
481            replay_policy: self.replay_policy,
482            rate_limit: self.rate_limit,
483            sample_frequency: self.sample_frequency,
484            max_waiting: self.max_waiting,
485            max_ack_pending: 0,
486            headers_only: self.headers_only,
487            flow_control: true,
488            idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
489            max_batch: 0,
490            max_bytes: 0,
491            max_expires: Duration::default(),
492            inactive_threshold: Duration::from_secs(30),
493            num_replicas: 1,
494            memory_storage: true,
495            #[cfg(feature = "server_2_10")]
496            metadata: self.metadata,
497            backoff: Vec::new(),
498            #[cfg(feature = "server_2_11")]
499            priority_policy: PriorityPolicy::None,
500            #[cfg(feature = "server_2_11")]
501            priority_groups: Vec::new(),
502            #[cfg(feature = "server_2_11")]
503            pause_until: None,
504        }
505    }
506}
507
508impl Consumer<OrderedConfig> {
509    pub async fn messages(self) -> Result<Ordered, StreamError> {
510        let subscriber = self
511            .context
512            .client
513            .subscribe(self.info.config.deliver_subject.clone().unwrap())
514            .await
515            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
516
517        let last_sequence = Arc::new(AtomicU64::new(0));
518        let consumer_sequence = Arc::new(AtomicU64::new(0));
519        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
520        let handle = tokio::task::spawn({
521            let stream_name = self.info.stream_name.clone();
522            let config = self.config.clone();
523            let mut context = self.context.clone();
524            let last_sequence = last_sequence.clone();
525            let consumer_sequence = consumer_sequence.clone();
526            let state = self.context.client.state.clone();
527            async move {
528                loop {
529                    let current_state = state.borrow().to_owned();
530
531                    context.client.state.changed().await.unwrap();
532                    // State change notification received within the timeout
533                    if state.borrow().to_owned() != State::Connected
534                        || current_state == State::Connected
535                    {
536                        continue;
537                    }
538                    debug!("reconnected. trigger consumer recreation");
539
540                    debug!(
541                        "idle heartbeats expired. recreating consumer s: {},  {:?}",
542                        stream_name, config
543                    );
544                    let consumer = tryhard::retry_fn(|| {
545                        recreate_ephemeral_consumer(
546                            context.clone(),
547                            config.clone(),
548                            stream_name.clone(),
549                            last_sequence.load(Ordering::Relaxed),
550                        )
551                    })
552                    .retries(u32::MAX)
553                    .custom_backoff(backoff)
554                    .await;
555                    if let Err(err) = consumer {
556                        shutdown_tx.send(err).unwrap();
557                        break;
558                    }
559                    debug!("resetting consume sequence to 0");
560                    consumer_sequence.store(0, Ordering::Relaxed);
561                }
562            }
563        });
564
565        Ok(Ordered {
566            context: self.context.clone(),
567            consumer: self,
568            subscriber: Some(subscriber),
569            subscriber_future: None,
570            stream_sequence: last_sequence,
571            consumer_sequence,
572            shutdown: shutdown_rx,
573            handle,
574            heartbeat_sleep: None,
575        })
576    }
577}
578
579pub struct Ordered {
580    context: Context,
581    consumer: Consumer<OrderedConfig>,
582    subscriber: Option<Subscriber>,
583    subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
584    stream_sequence: Arc<AtomicU64>,
585    consumer_sequence: Arc<AtomicU64>,
586    shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
587    handle: JoinHandle<()>,
588    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
589}
590
591impl Drop for Ordered {
592    fn drop(&mut self) {
593        // Stop trying to recreate the consumer
594        self.handle.abort()
595    }
596}
597
598impl futures::Stream for Ordered {
599    type Item = Result<Message, OrderedError>;
600
601    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
602        match self
603            .heartbeat_sleep
604            .get_or_insert_with(|| {
605                Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
606            })
607            .poll_unpin(cx)
608        {
609            Poll::Ready(_) => {
610                self.heartbeat_sleep = None;
611                return Poll::Ready(Some(Err(OrderedError::new(
612                    OrderedErrorKind::MissingHeartbeat,
613                ))));
614            }
615            Poll::Pending => (),
616        }
617
618        loop {
619            match self.shutdown.try_recv() {
620                Ok(err) => {
621                    return Poll::Ready(Some(Err(OrderedError::with_source(
622                        OrderedErrorKind::Other,
623                        err,
624                    ))))
625                }
626                Err(TryRecvError::Closed) => {
627                    return Poll::Ready(Some(Err(OrderedError::with_source(
628                        OrderedErrorKind::Other,
629                        "consumer task closed",
630                    ))))
631                }
632                Err(TryRecvError::Empty) => {}
633            }
634            if self.subscriber.is_none() {
635                match self.subscriber_future.as_mut() {
636                    None => {
637                        trace!(
638                            "subscriber and subscriber future are None. Recreating the consumer"
639                        );
640                        let context = self.context.clone();
641                        let sequence = self.stream_sequence.clone();
642                        let config = self.consumer.config.clone();
643                        let stream_name = self.consumer.info.stream_name.clone();
644                        let subscriber_future =
645                            self.subscriber_future.insert(Box::pin(async move {
646                                recreate_consumer_and_subscription(
647                                    context,
648                                    config,
649                                    stream_name,
650                                    sequence.load(Ordering::Relaxed),
651                                )
652                                .await
653                            }));
654                        match subscriber_future.as_mut().poll(cx) {
655                            Poll::Ready(subscriber) => {
656                                self.subscriber_future = None;
657                                self.consumer_sequence.store(0, Ordering::Relaxed);
658                                self.subscriber = Some(subscriber.map_err(|err| {
659                                    OrderedError::with_source(OrderedErrorKind::Recreate, err)
660                                })?);
661                            }
662                            Poll::Pending => {
663                                return Poll::Pending;
664                            }
665                        }
666                    }
667                    Some(subscriber) => match subscriber.as_mut().poll(cx) {
668                        Poll::Ready(subscriber) => {
669                            self.subscriber_future = None;
670                            self.consumer_sequence.store(0, Ordering::Relaxed);
671                            self.subscriber = Some(subscriber.map_err(|err| {
672                                OrderedError::with_source(OrderedErrorKind::Recreate, err)
673                            })?);
674                        }
675                        Poll::Pending => {
676                            return Poll::Pending;
677                        }
678                    },
679                }
680            }
681            if let Some(subscriber) = self.subscriber.as_mut() {
682                match subscriber.receiver.poll_recv(cx) {
683                    Poll::Ready(maybe_message) => match maybe_message {
684                        Some(message) => {
685                            self.heartbeat_sleep = None;
686                            match message.status {
687                                Some(StatusCode::IDLE_HEARTBEAT) => {
688                                    debug!("received idle heartbeats");
689                                    if let Some(headers) = message.headers.as_ref() {
690                                        if let Some(sequence) =
691                                            headers.get_last(crate::header::NATS_LAST_CONSUMER)
692                                        {
693                                            let sequence: u64 =
694                                                sequence.as_str().parse().map_err(|err| {
695                                                    OrderedError::with_source(
696                                                        OrderedErrorKind::Other,
697                                                        err,
698                                                    )
699                                                })?;
700
701                                            let last_sequence =
702                                                self.consumer_sequence.load(Ordering::Relaxed);
703
704                                            if sequence != last_sequence {
705                                                debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
706                                                self.subscriber = None;
707                                            }
708                                        }
709                                    }
710                                    // flow control.
711                                    if let Some(subject) = message.reply.clone() {
712                                        trace!("received flow control message");
713                                        let client = self.context.client.clone();
714                                        tokio::task::spawn(async move {
715                                            client
716                                                .publish(subject, Bytes::from_static(b""))
717                                                .await
718                                                .ok();
719                                        });
720                                    }
721                                    continue;
722                                }
723                                Some(status) => {
724                                    debug!("received status message: {}", status);
725                                    continue;
726                                }
727                                None => {
728                                    trace!("received a message");
729                                    let jetstream_message = jetstream::message::Message {
730                                        message,
731                                        context: self.context.clone(),
732                                    };
733
734                                    let info = jetstream_message.info().map_err(|err| {
735                                        OrderedError::with_source(OrderedErrorKind::Other, err)
736                                    })?;
737                                    trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
738                                               self.consumer_sequence,
739                                               self.stream_sequence,
740                                               info.consumer_sequence,
741                                               info.stream_sequence);
742                                    if info.consumer_sequence
743                                        != self.consumer_sequence.load(Ordering::Relaxed) + 1
744                                    {
745                                        debug!(
746                                            "ordered consumer mismatch. current {}, info: {}",
747                                            self.consumer_sequence.load(Ordering::Relaxed),
748                                            info.consumer_sequence
749                                        );
750                                        self.subscriber = None;
751                                        self.consumer_sequence.store(0, Ordering::Relaxed);
752                                        continue;
753                                    }
754                                    self.stream_sequence
755                                        .store(info.stream_sequence, Ordering::Relaxed);
756                                    self.consumer_sequence
757                                        .store(info.consumer_sequence, Ordering::Relaxed);
758                                    return Poll::Ready(Some(Ok(jetstream_message)));
759                                }
760                            }
761                        }
762                        None => {
763                            return Poll::Ready(None);
764                        }
765                    },
766                    Poll::Pending => return Poll::Pending,
767                }
768            }
769        }
770    }
771}
772
773#[derive(Clone, Debug, PartialEq)]
774pub enum OrderedErrorKind {
775    MissingHeartbeat,
776    ConsumerDeleted,
777    PullBasedConsumer,
778    Recreate,
779    Other,
780}
781
782impl std::fmt::Display for OrderedErrorKind {
783    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
784        match self {
785            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
786            Self::ConsumerDeleted => write!(f, "consumer deleted"),
787            Self::Other => write!(f, "error"),
788            Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
789            Self::Recreate => write!(f, "consumer recreation failed"),
790        }
791    }
792}
793
794pub type OrderedError = Error<OrderedErrorKind>;
795
796impl From<MessagesError> for OrderedError {
797    fn from(err: MessagesError) -> Self {
798        match err.kind() {
799            MessagesErrorKind::MissingHeartbeat => {
800                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
801            }
802            MessagesErrorKind::ConsumerDeleted => {
803                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
804            }
805            MessagesErrorKind::PullBasedConsumer => {
806                OrderedError::new(OrderedErrorKind::PullBasedConsumer)
807            }
808            MessagesErrorKind::Other => OrderedError {
809                kind: OrderedErrorKind::Other,
810                source: err.source,
811            },
812        }
813    }
814}
815
816#[derive(Clone, Copy, Debug, PartialEq)]
817pub enum MessagesErrorKind {
818    MissingHeartbeat,
819    ConsumerDeleted,
820    PullBasedConsumer,
821    Other,
822}
823
824impl std::fmt::Display for MessagesErrorKind {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        match self {
827            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
828            Self::ConsumerDeleted => write!(f, "consumer deleted"),
829            Self::Other => write!(f, "error"),
830            Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
831        }
832    }
833}
834
835pub type MessagesError = Error<MessagesErrorKind>;
836
837#[derive(Clone, Copy, Debug, PartialEq)]
838pub enum ConsumerRecreateErrorKind {
839    GetStream,
840    Subscription,
841    Recreate,
842    TimedOut,
843}
844
845impl std::fmt::Display for ConsumerRecreateErrorKind {
846    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
847        match self {
848            Self::GetStream => write!(f, "error getting stream"),
849            Self::Recreate => write!(f, "consumer creation failed"),
850            Self::TimedOut => write!(f, "timed out"),
851            Self::Subscription => write!(f, "failed to resubscribe"),
852        }
853    }
854}
855
856pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
857
858async fn recreate_consumer_and_subscription(
859    context: Context,
860    mut config: OrderedConfig,
861    stream_name: String,
862    sequence: u64,
863) -> Result<Subscriber, ConsumerRecreateError> {
864    let delivery_subject = context.client.new_inbox();
865    config.deliver_subject = delivery_subject;
866
867    let subscriber = context
868        .client
869        .subscribe(config.deliver_subject.clone())
870        .await
871        .map_err(|err| {
872            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
873        })?;
874
875    recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
876    Ok(subscriber)
877}
878async fn recreate_ephemeral_consumer(
879    context: Context,
880    config: OrderedConfig,
881    stream_name: String,
882    sequence: u64,
883) -> Result<(), ConsumerRecreateError> {
884    let stream = tryhard::retry_fn(|| context.get_stream(stream_name.clone()))
885        .retries(u32::MAX)
886        .custom_backoff(backoff)
887        .await
888        .map_err(|err| {
889            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
890        })?;
891
892    let deliver_policy = {
893        if sequence == 0 {
894            DeliverPolicy::All
895        } else {
896            DeliverPolicy::ByStartSequence {
897                start_sequence: sequence + 1,
898            }
899        }
900    };
901
902    tryhard::retry_fn(|| {
903        let config = config.clone();
904        tokio::time::timeout(
905            Duration::from_secs(5),
906            stream.create_consumer(jetstream::consumer::push::OrderedConfig {
907                deliver_policy,
908                ..config
909            }),
910        )
911    })
912    .retries(u32::MAX)
913    .custom_backoff(backoff)
914    .await
915    .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
916    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
917
918    Ok(())
919}