Skip to main content

async_nats/jetstream/consumer/
push.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use super::{
15    backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16    StreamError, StreamErrorKind,
17};
18
19#[cfg(feature = "server_2_11")]
20use super::PriorityPolicy;
21
22use crate::{
23    connection::State,
24    error::Error,
25    jetstream::{self, Context, Message},
26    StatusCode, Subscriber,
27};
28
29use bytes::Bytes;
30use futures_util::{future::BoxFuture, FutureExt};
31use portable_atomic::AtomicU64;
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "server_2_10")]
34use std::collections::HashMap;
35use std::task::{self, Poll};
36use std::{
37    io::{self},
38    pin::Pin,
39    sync::Arc,
40};
41use std::{sync::atomic::Ordering, time::Duration};
42#[cfg(feature = "server_2_11")]
43use time::{serde::rfc3339, OffsetDateTime};
44use tracing::{debug, trace};
45
46const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
47
48impl Consumer<Config> {
49    /// Returns a stream of messages for Push Consumer.
50    ///
51    /// # Example
52    ///
53    /// ```no_run
54    /// # #[tokio::main]
55    /// # async fn mains() -> Result<(), async_nats::Error> {
56    /// use async_nats::jetstream::consumer::PushConsumer;
57    /// use futures_util::StreamExt;
58    /// use futures_util::TryStreamExt;
59    ///
60    /// let client = async_nats::connect("localhost:4222").await?;
61    /// let jetstream = async_nats::jetstream::new(client);
62    ///
63    /// let stream = jetstream
64    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
65    ///         name: "events".to_string(),
66    ///         max_messages: 10_000,
67    ///         ..Default::default()
68    ///     })
69    ///     .await?;
70    ///
71    /// jetstream.publish("events", "data".into()).await?;
72    ///
73    /// let consumer: PushConsumer = stream
74    ///     .get_or_create_consumer(
75    ///         "consumer",
76    ///         async_nats::jetstream::consumer::push::Config {
77    ///             durable_name: Some("consumer".to_string()),
78    ///             deliver_subject: "deliver".to_string(),
79    ///             ..Default::default()
80    ///         },
81    ///     )
82    ///     .await?;
83    ///
84    /// let mut messages = consumer.messages().await?.take(100);
85    /// while let Some(Ok(message)) = messages.next().await {
86    ///     println!("got message {:?}", message);
87    ///     message.ack().await?;
88    /// }
89    /// Ok(())
90    /// # }
91    /// ```
92    pub async fn messages(&self) -> Result<Messages, StreamError> {
93        let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
94        let subscriber = if let Some(ref group) = self.info.config.deliver_group {
95            self.context
96                .client
97                .queue_subscribe(deliver_subject, group.to_owned())
98                .await
99                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
100        } else {
101            self.context
102                .client
103                .subscribe(deliver_subject)
104                .await
105                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
106        };
107
108        Ok(Messages {
109            context: self.context.clone(),
110            config: self.config.clone(),
111            subscriber,
112            heartbeat_sleep: None,
113        })
114    }
115}
116
117pub struct Messages {
118    context: Context,
119    subscriber: Subscriber,
120    config: Config,
121    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
122}
123
124impl futures_util::Stream for Messages {
125    type Item = Result<Message, MessagesError>;
126
127    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
128        if !self.config.idle_heartbeat.is_zero() {
129            let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
130            match self
131                .heartbeat_sleep
132                .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
133                .poll_unpin(cx)
134            {
135                Poll::Ready(_) => {
136                    self.heartbeat_sleep = None;
137                    return Poll::Ready(Some(Err(MessagesError::new(
138                        MessagesErrorKind::MissingHeartbeat,
139                    ))));
140                }
141                Poll::Pending => (),
142            }
143        }
144        loop {
145            match self.subscriber.receiver.poll_recv(cx) {
146                Poll::Ready(maybe_message) => {
147                    self.heartbeat_sleep = None;
148                    match maybe_message {
149                        Some(message) => match message.status {
150                            Some(StatusCode::IDLE_HEARTBEAT) => {
151                                if let Some(subject) = message.reply {
152                                    // TODO store pending_publish as a future and return errors from it
153                                    let client = self.context.client.clone();
154                                    tokio::task::spawn(async move {
155                                        client
156                                            .publish(subject, Bytes::from_static(b""))
157                                            .await
158                                            .unwrap();
159                                    });
160                                }
161
162                                continue;
163                            }
164                            Some(_) => {
165                                continue;
166                            }
167                            None => {
168                                return Poll::Ready(Some(Ok(jetstream::Message {
169                                    context: self.context.clone(),
170                                    message,
171                                })))
172                            }
173                        },
174                        None => return Poll::Ready(None),
175                    }
176                }
177                Poll::Pending => return Poll::Pending,
178            }
179        }
180    }
181}
182
183/// Configuration for consumers. From a high level, the
184/// `durable_name` and `deliver_subject` fields have a particularly
185/// strong influence on the consumer's overall behavior.
186#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
187pub struct Config {
188    /// The delivery subject used by the push consumer.
189    #[serde(default)]
190    pub deliver_subject: String,
191    /// Setting `durable_name` to `Some(...)` will cause this consumer
192    /// to be "durable". This may be a good choice for workloads that
193    /// benefit from the `JetStream` server or cluster remembering the
194    /// progress of consumers for fault tolerance purposes. If a consumer
195    /// crashes, the `JetStream` server or cluster will remember which
196    /// messages the consumer acknowledged. When the consumer recovers,
197    /// this information will allow the consumer to resume processing
198    /// where it left off. If you're unsure, set this to `Some(...)`.
199    ///
200    /// Setting `durable_name` to `None` will cause this consumer to
201    /// be "ephemeral". This may be a good choice for workloads where
202    /// you don't need the `JetStream` server to remember the consumer's
203    /// progress in the case of a crash, such as certain "high churn"
204    /// workloads or workloads where a crashed instance is not required
205    /// to recover.
206    #[serde(default, skip_serializing_if = "Option::is_none")]
207    pub durable_name: Option<String>,
208    /// A name of the consumer. Can be specified for both durable and ephemeral
209    /// consumers.
210    #[serde(default, skip_serializing_if = "Option::is_none")]
211    pub name: Option<String>,
212    /// A short description of the purpose of this consumer.
213    #[serde(default, skip_serializing_if = "Option::is_none")]
214    pub description: Option<String>,
215    #[serde(default, skip_serializing_if = "Option::is_none")]
216    /// Deliver group to use.
217    pub deliver_group: Option<String>,
218    /// Allows for a variety of options that determine how this consumer will receive messages
219    #[serde(flatten)]
220    pub deliver_policy: DeliverPolicy,
221    /// How messages should be acknowledged
222    pub ack_policy: AckPolicy,
223    /// How long to allow messages to remain un-acknowledged before attempting redelivery
224    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
225    pub ack_wait: Duration,
226    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
227    #[serde(default, skip_serializing_if = "is_default")]
228    pub max_deliver: i64,
229    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
230    #[serde(default, skip_serializing_if = "is_default")]
231    pub filter_subject: String,
232    #[cfg(feature = "server_2_10")]
233    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
234    #[serde(default, skip_serializing_if = "is_default")]
235    pub filter_subjects: Vec<String>,
236    /// Whether messages are sent as quickly as possible or at the rate of receipt
237    pub replay_policy: ReplayPolicy,
238    /// The rate of message delivery in bits per second
239    #[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
240    pub rate_limit: u64,
241    /// What percentage of acknowledgments should be samples for observability, 0-100
242    #[serde(
243        rename = "sample_freq",
244        with = "super::sample_freq_deser",
245        default,
246        skip_serializing_if = "is_default"
247    )]
248    pub sample_frequency: u8,
249    /// The maximum number of waiting consumers.
250    #[serde(default, skip_serializing_if = "is_default")]
251    pub max_waiting: i64,
252    /// The maximum number of unacknowledged messages that may be
253    /// in-flight before pausing sending additional messages to
254    /// this consumer.
255    #[serde(default, skip_serializing_if = "is_default")]
256    pub max_ack_pending: i64,
257    /// Only deliver headers without payloads.
258    #[serde(default, skip_serializing_if = "is_default")]
259    pub headers_only: bool,
260    /// Enable flow control messages
261    #[serde(default, skip_serializing_if = "is_default")]
262    pub flow_control: bool,
263    /// Enable idle heartbeat messages
264    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
265    pub idle_heartbeat: Duration,
266    /// Number of consumer replicas
267    #[serde(default, skip_serializing_if = "is_default")]
268    pub num_replicas: usize,
269    /// Force consumer to use memory storage.
270    #[serde(default, skip_serializing_if = "is_default")]
271    pub memory_storage: bool,
272    #[cfg(feature = "server_2_10")]
273    /// Additional consumer metadata.
274    #[serde(default, skip_serializing_if = "is_default")]
275    pub metadata: HashMap<String, String>,
276    /// Custom backoff for missed acknowledgments.
277    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
278    pub backoff: Vec<Duration>,
279    /// Threshold for consumer inactivity
280    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
281    pub inactive_threshold: Duration,
282    /// Create a consumer paused until provided deadline.
283    #[cfg(feature = "server_2_11")]
284    #[serde(
285        default,
286        with = "rfc3339::option",
287        skip_serializing_if = "Option::is_none"
288    )]
289    pub pause_until: Option<OffsetDateTime>,
290}
291
292impl FromConsumer for Config {
293    fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
294        if config.deliver_subject.is_none() {
295            return Err(Box::new(io::Error::other(
296                "push consumer must have delivery subject",
297            )));
298        }
299
300        Ok(Config {
301            deliver_subject: config.deliver_subject.unwrap(),
302            durable_name: config.durable_name,
303            name: config.name,
304            description: config.description,
305            deliver_group: config.deliver_group,
306            deliver_policy: config.deliver_policy,
307            ack_policy: config.ack_policy,
308            ack_wait: config.ack_wait,
309            max_deliver: config.max_deliver,
310            filter_subject: config.filter_subject,
311            #[cfg(feature = "server_2_10")]
312            filter_subjects: config.filter_subjects,
313            replay_policy: config.replay_policy,
314            rate_limit: config.rate_limit,
315            sample_frequency: config.sample_frequency,
316            max_waiting: config.max_waiting,
317            max_ack_pending: config.max_ack_pending,
318            headers_only: config.headers_only,
319            flow_control: config.flow_control,
320            idle_heartbeat: config.idle_heartbeat,
321            num_replicas: config.num_replicas,
322            memory_storage: config.memory_storage,
323            #[cfg(feature = "server_2_10")]
324            metadata: config.metadata,
325            backoff: config.backoff,
326            inactive_threshold: config.inactive_threshold,
327            #[cfg(feature = "server_2_11")]
328            pause_until: config.pause_until,
329        })
330    }
331}
332
333impl IntoConsumerConfig for Config {
334    fn into_consumer_config(self) -> jetstream::consumer::Config {
335        jetstream::consumer::Config {
336            deliver_subject: Some(self.deliver_subject),
337            durable_name: self.durable_name,
338            name: self.name,
339            description: self.description,
340            deliver_group: self.deliver_group,
341            deliver_policy: self.deliver_policy,
342            ack_policy: self.ack_policy,
343            ack_wait: self.ack_wait,
344            max_deliver: self.max_deliver,
345            filter_subject: self.filter_subject,
346            #[cfg(feature = "server_2_10")]
347            filter_subjects: self.filter_subjects,
348            replay_policy: self.replay_policy,
349            rate_limit: self.rate_limit,
350            sample_frequency: self.sample_frequency,
351            max_waiting: self.max_waiting,
352            max_ack_pending: self.max_ack_pending,
353            headers_only: self.headers_only,
354            flow_control: self.flow_control,
355            idle_heartbeat: self.idle_heartbeat,
356            max_batch: 0,
357            max_bytes: 0,
358            max_expires: Duration::default(),
359            inactive_threshold: self.inactive_threshold,
360            num_replicas: self.num_replicas,
361            memory_storage: self.memory_storage,
362            #[cfg(feature = "server_2_10")]
363            metadata: self.metadata,
364            backoff: self.backoff,
365            #[cfg(feature = "server_2_11")]
366            priority_policy: PriorityPolicy::None,
367            #[cfg(feature = "server_2_11")]
368            priority_groups: Vec::new(),
369            #[cfg(feature = "server_2_11")]
370            pause_until: self.pause_until,
371        }
372    }
373}
374impl IntoConsumerConfig for &Config {
375    fn into_consumer_config(self) -> jetstream::consumer::Config {
376        self.clone().into_consumer_config()
377    }
378}
379fn is_default<T: Default + Eq>(t: &T) -> bool {
380    t == &T::default()
381}
382
383/// Configuration for consumers. From a high level, the
384/// `durable_name` and `deliver_subject` fields have a particularly
385/// strong influence on the consumer's overall behavior.
386#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
387pub struct OrderedConfig {
388    /// The delivery subject used by the push consumer.
389    #[serde(default)]
390    pub deliver_subject: String,
391    /// A name of the consumer. Can be specified for both durable and ephemeral
392    /// consumers.
393    #[serde(default, skip_serializing_if = "Option::is_none")]
394    pub name: Option<String>,
395    /// A short description of the purpose of this consumer.
396    #[serde(default, skip_serializing_if = "Option::is_none")]
397    pub description: Option<String>,
398    #[serde(default, skip_serializing_if = "is_default")]
399    pub filter_subject: String,
400    #[cfg(feature = "server_2_10")]
401    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
402    #[serde(default, skip_serializing_if = "is_default")]
403    pub filter_subjects: Vec<String>,
404    /// Whether messages are sent as quickly as possible or at the rate of receipt
405    pub replay_policy: ReplayPolicy,
406    /// The rate of message delivery in bits per second
407    #[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
408    pub rate_limit: u64,
409    /// What percentage of acknowledgments should be samples for observability, 0-100
410    #[serde(
411        rename = "sample_freq",
412        with = "super::sample_freq_deser",
413        default,
414        skip_serializing_if = "is_default"
415    )]
416    pub sample_frequency: u8,
417    /// Only deliver headers without payloads.
418    #[serde(default, skip_serializing_if = "is_default")]
419    pub headers_only: bool,
420    /// Allows for a variety of options that determine how this consumer will receive messages
421    #[serde(flatten)]
422    pub deliver_policy: DeliverPolicy,
423    /// The maximum number of waiting consumers.
424    #[serde(default, skip_serializing_if = "is_default")]
425    pub max_waiting: i64,
426    #[cfg(feature = "server_2_10")]
427    /// Additional consumer metadata.
428    #[serde(default, skip_serializing_if = "is_default")]
429    pub metadata: HashMap<String, String>,
430}
431
432impl FromConsumer for OrderedConfig {
433    fn try_from_consumer_config(
434        config: crate::jetstream::consumer::Config,
435    ) -> Result<Self, crate::Error>
436    where
437        Self: Sized,
438    {
439        if config.deliver_subject.is_none() {
440            return Err(Box::new(io::Error::other(
441                "push consumer must have delivery subject",
442            )));
443        }
444        Ok(OrderedConfig {
445            name: config.name,
446            deliver_subject: config.deliver_subject.unwrap(),
447            description: config.description,
448            filter_subject: config.filter_subject,
449            #[cfg(feature = "server_2_10")]
450            filter_subjects: config.filter_subjects,
451            replay_policy: config.replay_policy,
452            rate_limit: config.rate_limit,
453            sample_frequency: config.sample_frequency,
454            headers_only: config.headers_only,
455            deliver_policy: config.deliver_policy,
456            max_waiting: config.max_waiting,
457            #[cfg(feature = "server_2_10")]
458            metadata: config.metadata,
459        })
460    }
461}
462
463impl IntoConsumerConfig for OrderedConfig {
464    fn into_consumer_config(self) -> super::Config {
465        jetstream::consumer::Config {
466            deliver_subject: Some(self.deliver_subject),
467            durable_name: None,
468            name: self.name,
469            description: self.description,
470            deliver_group: None,
471            deliver_policy: self.deliver_policy,
472            ack_policy: AckPolicy::None,
473            ack_wait: Duration::default(),
474            max_deliver: 1,
475            filter_subject: self.filter_subject,
476            #[cfg(feature = "server_2_10")]
477            filter_subjects: self.filter_subjects,
478            replay_policy: self.replay_policy,
479            rate_limit: self.rate_limit,
480            sample_frequency: self.sample_frequency,
481            max_waiting: self.max_waiting,
482            max_ack_pending: 0,
483            headers_only: self.headers_only,
484            flow_control: true,
485            idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
486            max_batch: 0,
487            max_bytes: 0,
488            max_expires: Duration::default(),
489            inactive_threshold: Duration::from_secs(30),
490            num_replicas: 1,
491            memory_storage: true,
492            #[cfg(feature = "server_2_10")]
493            metadata: self.metadata,
494            backoff: Vec::new(),
495            #[cfg(feature = "server_2_11")]
496            priority_policy: PriorityPolicy::None,
497            #[cfg(feature = "server_2_11")]
498            priority_groups: Vec::new(),
499            #[cfg(feature = "server_2_11")]
500            pause_until: None,
501        }
502    }
503}
504
505impl Consumer<OrderedConfig> {
506    pub async fn messages(self) -> Result<Ordered, StreamError> {
507        let subscriber = self
508            .context
509            .client
510            .subscribe(self.info.config.deliver_subject.clone().unwrap())
511            .await
512            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
513
514        let last_sequence = Arc::new(AtomicU64::new(0));
515        let consumer_sequence = Arc::new(AtomicU64::new(0));
516
517        let state = self.context.client.state.clone();
518        Ok(Ordered {
519            context: self.context.clone(),
520            consumer: self,
521            subscriber: Some(subscriber),
522            subscriber_future: None,
523            stream_sequence: last_sequence,
524            consumer_sequence,
525            heartbeat_sleep: None,
526            state,
527            last_known_state: State::Connected,
528            state_change_future: None,
529        })
530    }
531}
532
533pub struct Ordered {
534    context: Context,
535    consumer: Consumer<OrderedConfig>,
536    subscriber: Option<Subscriber>,
537    subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
538    stream_sequence: Arc<AtomicU64>,
539    consumer_sequence: Arc<AtomicU64>,
540    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
541    state: tokio::sync::watch::Receiver<State>,
542    last_known_state: State,
543    state_change_future:
544        Option<BoxFuture<'static, Result<(), tokio::sync::watch::error::RecvError>>>,
545}
546
547impl futures_util::Stream for Ordered {
548    type Item = Result<Message, OrderedError>;
549
550    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
551        loop {
552            // Poll for connection state changes
553            match &mut self.state_change_future {
554                None => {
555                    // Check if state has already changed
556                    if self.state.has_changed().unwrap_or(false) {
557                        let current_state = self.state.borrow_and_update().clone();
558
559                        // Handle state transitions
560                        use State::*;
561                        match (&self.last_known_state, &current_state) {
562                            (Connected, Disconnected | Pending) => {
563                                debug!("Connection lost, marking subscriber as invalid");
564                                self.subscriber = None;
565                                self.heartbeat_sleep = None;
566                            }
567                            (Disconnected | Pending, Connected) => {
568                                debug!("Connection restored, triggering consumer recreation");
569                                self.subscriber = None;
570                                self.consumer_sequence.store(0, Ordering::Relaxed);
571                                self.heartbeat_sleep = None;
572                            }
573                            _ => {}
574                        }
575
576                        self.last_known_state = current_state;
577                    } else {
578                        // Create a future to wait for state changes
579                        let mut state = self.state.clone();
580                        self.state_change_future =
581                            Some(Box::pin(async move { state.changed().await }));
582                    }
583                }
584                Some(fut) => match fut.poll_unpin(cx) {
585                    Poll::Ready(Ok(())) => {
586                        self.state_change_future = None;
587                        continue; // State changed, restart loop to check it
588                    }
589                    Poll::Ready(Err(_)) => {
590                        return Poll::Ready(Some(Err(OrderedError::with_source(
591                            OrderedErrorKind::Other,
592                            "Connection state watcher dropped",
593                        ))));
594                    }
595                    Poll::Pending => {}
596                },
597            }
598
599            // Handle messages after checking connection state
600
601            // Handle subscriber recreation if needed
602            if self.subscriber.is_none() {
603                // Ensure we have a recreation future
604                if self.subscriber_future.is_none() {
605                    trace!("Creating subscriber recreation future");
606                    let context = self.context.clone();
607                    let sequence = self.stream_sequence.clone();
608                    let config = self.consumer.config.clone();
609                    let stream_name = self.consumer.info.stream_name.clone();
610
611                    self.subscriber_future = Some(Box::pin(async move {
612                        tryhard::retry_fn(|| {
613                            recreate_consumer_and_subscription(
614                                context.clone(),
615                                config.clone(),
616                                stream_name.clone(),
617                                sequence.load(Ordering::Relaxed),
618                            )
619                        })
620                        .retries(u32::MAX)
621                        .custom_backoff(backoff)
622                        .await
623                        .map_err(|err| {
624                            ConsumerRecreateError::with_source(
625                                ConsumerRecreateErrorKind::Recreate,
626                                err,
627                            )
628                        })
629                    }));
630                }
631
632                // Poll the recreation future
633                if let Some(fut) = &mut self.subscriber_future {
634                    match fut.poll_unpin(cx) {
635                        Poll::Ready(result) => {
636                            self.subscriber_future = None;
637                            self.consumer_sequence.store(0, Ordering::Relaxed);
638                            self.subscriber = Some(result.map_err(|err| {
639                                OrderedError::with_source(OrderedErrorKind::Recreate, err)
640                            })?);
641                        }
642                        Poll::Pending => return Poll::Pending,
643                    }
644                }
645            }
646            // Poll for messages
647            if let Some(subscriber) = self.subscriber.as_mut() {
648                match subscriber.receiver.poll_recv(cx) {
649                    Poll::Ready(None) => return Poll::Ready(None),
650                    Poll::Ready(Some(message)) => {
651                        self.heartbeat_sleep = None;
652                        match message.status {
653                            Some(StatusCode::IDLE_HEARTBEAT) => {
654                                debug!("received idle heartbeats");
655                                if let Some(headers) = message.headers.as_ref() {
656                                    if let Some(sequence) =
657                                        headers.get_last(crate::header::NATS_LAST_CONSUMER)
658                                    {
659                                        let sequence: u64 =
660                                            sequence.as_str().parse().map_err(|err| {
661                                                OrderedError::with_source(
662                                                    OrderedErrorKind::Other,
663                                                    err,
664                                                )
665                                            })?;
666
667                                        let last_sequence =
668                                            self.consumer_sequence.load(Ordering::Relaxed);
669
670                                        if sequence != last_sequence {
671                                            debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
672                                            self.subscriber = None;
673                                            self.heartbeat_sleep = None;
674                                        }
675                                    }
676                                }
677                                // flow control.
678                                if let Some(subject) = message.reply.clone() {
679                                    trace!("received flow control message");
680                                    let client = self.context.client.clone();
681                                    tokio::task::spawn(async move {
682                                        client.publish(subject, Bytes::from_static(b"")).await.ok();
683                                    });
684                                }
685                                continue;
686                            }
687                            Some(status) => {
688                                debug!("received status message: {}", status);
689                                continue;
690                            }
691                            None => {
692                                trace!("received a message");
693                                let jetstream_message = jetstream::message::Message {
694                                    message,
695                                    context: self.context.clone(),
696                                };
697
698                                let info = jetstream_message.info().map_err(|err| {
699                                    OrderedError::with_source(OrderedErrorKind::Other, err)
700                                })?;
701                                trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
702                                               self.consumer_sequence,
703                                               self.stream_sequence,
704                                               info.consumer_sequence,
705                                               info.stream_sequence);
706                                if info.consumer_sequence
707                                    != self.consumer_sequence.load(Ordering::Relaxed) + 1
708                                {
709                                    debug!(
710                                        "ordered consumer mismatch. current {}, info: {}",
711                                        self.consumer_sequence.load(Ordering::Relaxed),
712                                        info.consumer_sequence
713                                    );
714                                    self.subscriber = None;
715                                    self.consumer_sequence.store(0, Ordering::Relaxed);
716                                    self.heartbeat_sleep = None;
717                                    continue;
718                                }
719                                self.stream_sequence
720                                    .store(info.stream_sequence, Ordering::Relaxed);
721                                self.consumer_sequence
722                                    .store(info.consumer_sequence, Ordering::Relaxed);
723                                return Poll::Ready(Some(Ok(jetstream_message)));
724                            }
725                        }
726                    }
727                    Poll::Pending => {
728                        // The channel is drained. Only now consult the idle-heartbeat timer: a
729                        // live consumer keeps sending heartbeats well within the timeout, so an
730                        // empty channel past the deadline means the consumer is gone (e.g. the
731                        // server was restarted, or the consumer was deleted, while this stream
732                        // was idle). Recreate from the last delivered sequence instead of
733                        // erroring terminally and then hanging forever (#1596).
734                        //
735                        // Draining before this check is what makes it safe: if the application
736                        // merely paused while messages were still buffered, those are delivered
737                        // first (each resetting the timer), so a false timeout never discards
738                        // buffered messages. That matters because recreation resumes from the
739                        // last *delivered* sequence (AckPolicy::None), and on capped/discarding
740                        // streams the dropped messages might no longer be replayable.
741                        if self
742                            .heartbeat_sleep
743                            .get_or_insert_with(|| {
744                                Box::pin(tokio::time::sleep(
745                                    ORDERED_IDLE_HEARTBEAT.saturating_mul(2),
746                                ))
747                            })
748                            .poll_unpin(cx)
749                            .is_ready()
750                        {
751                            self.heartbeat_sleep = None;
752                            self.subscriber = None;
753                            self.consumer_sequence.store(0, Ordering::Relaxed);
754                            return Poll::Ready(Some(Err(OrderedError::new(
755                                OrderedErrorKind::MissingHeartbeat,
756                            ))));
757                        }
758                        return Poll::Pending;
759                    }
760                }
761            }
762        }
763    }
764}
765
766#[derive(Clone, Debug, PartialEq)]
767pub enum OrderedErrorKind {
768    MissingHeartbeat,
769    ConsumerDeleted,
770    PullBasedConsumer,
771    Recreate,
772    Other,
773}
774
775impl std::fmt::Display for OrderedErrorKind {
776    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
777        match self {
778            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
779            Self::ConsumerDeleted => write!(f, "consumer deleted"),
780            Self::Other => write!(f, "error"),
781            Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
782            Self::Recreate => write!(f, "consumer recreation failed"),
783        }
784    }
785}
786
787pub type OrderedError = Error<OrderedErrorKind>;
788
789impl From<MessagesError> for OrderedError {
790    fn from(err: MessagesError) -> Self {
791        match err.kind() {
792            MessagesErrorKind::MissingHeartbeat => {
793                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
794            }
795            MessagesErrorKind::ConsumerDeleted => {
796                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
797            }
798            MessagesErrorKind::PullBasedConsumer => {
799                OrderedError::new(OrderedErrorKind::PullBasedConsumer)
800            }
801            MessagesErrorKind::Other => OrderedError {
802                kind: OrderedErrorKind::Other,
803                source: err.source,
804            },
805        }
806    }
807}
808
809#[derive(Clone, Copy, Debug, PartialEq)]
810pub enum MessagesErrorKind {
811    MissingHeartbeat,
812    ConsumerDeleted,
813    PullBasedConsumer,
814    Other,
815}
816
817impl std::fmt::Display for MessagesErrorKind {
818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819        match self {
820            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
821            Self::ConsumerDeleted => write!(f, "consumer deleted"),
822            Self::Other => write!(f, "error"),
823            Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
824        }
825    }
826}
827
828pub type MessagesError = Error<MessagesErrorKind>;
829
830#[derive(Clone, Copy, Debug, PartialEq)]
831pub enum ConsumerRecreateErrorKind {
832    GetStream,
833    Subscription,
834    Recreate,
835    TimedOut,
836}
837
838impl std::fmt::Display for ConsumerRecreateErrorKind {
839    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
840        match self {
841            Self::GetStream => write!(f, "error getting stream"),
842            Self::Recreate => write!(f, "consumer creation failed"),
843            Self::TimedOut => write!(f, "timed out"),
844            Self::Subscription => write!(f, "failed to resubscribe"),
845        }
846    }
847}
848
849pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
850
851async fn recreate_consumer_and_subscription(
852    context: Context,
853    mut config: OrderedConfig,
854    stream_name: String,
855    sequence: u64,
856) -> Result<Subscriber, ConsumerRecreateError> {
857    let delivery_subject = context.client.new_inbox();
858    config.deliver_subject = delivery_subject;
859
860    let subscriber = context
861        .client
862        .subscribe(config.deliver_subject.clone())
863        .await
864        .map_err(|err| {
865            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
866        })?;
867
868    recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
869    Ok(subscriber)
870}
871async fn recreate_ephemeral_consumer(
872    context: Context,
873    config: OrderedConfig,
874    stream_name: String,
875    sequence: u64,
876) -> Result<(), ConsumerRecreateError> {
877    let deliver_policy = {
878        if sequence == 0 {
879            DeliverPolicy::All
880        } else {
881            DeliverPolicy::ByStartSequence {
882                start_sequence: sequence + 1,
883            }
884        }
885    };
886
887    let config = config.clone();
888    tokio::time::timeout(
889        Duration::from_secs(5),
890        context.create_consumer_on_stream(
891            jetstream::consumer::push::OrderedConfig {
892                deliver_policy,
893                ..config
894            },
895            stream_name.clone(),
896        ),
897    )
898    .await
899    .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
900    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
901
902    Ok(())
903}