async_nats/jetstream/consumer/
push.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use super::{
15    backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16    StreamError, StreamErrorKind,
17};
18
19#[cfg(feature = "server_2_11")]
20use super::PriorityPolicy;
21
22use crate::{
23    connection::State,
24    error::Error,
25    jetstream::{self, Context, Message},
26    StatusCode, Subscriber,
27};
28
29use bytes::Bytes;
30use futures_util::{future::BoxFuture, FutureExt};
31use portable_atomic::AtomicU64;
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "server_2_10")]
34use std::collections::HashMap;
35use std::task::{self, Poll};
36use std::{
37    io::{self},
38    pin::Pin,
39    sync::Arc,
40};
41use std::{sync::atomic::Ordering, time::Duration};
42#[cfg(feature = "server_2_11")]
43use time::{serde::rfc3339, OffsetDateTime};
44use tracing::{debug, trace};
45
46const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
47
48impl Consumer<Config> {
49    /// Returns a stream of messages for Push Consumer.
50    ///
51    /// # Example
52    ///
53    /// ```no_run
54    /// # #[tokio::main]
55    /// # async fn mains() -> Result<(), async_nats::Error> {
56    /// use async_nats::jetstream::consumer::PushConsumer;
57    /// use futures_util::StreamExt;
58    /// use futures_util::TryStreamExt;
59    ///
60    /// let client = async_nats::connect("localhost:4222").await?;
61    /// let jetstream = async_nats::jetstream::new(client);
62    ///
63    /// let stream = jetstream
64    ///     .get_or_create_stream(async_nats::jetstream::stream::Config {
65    ///         name: "events".to_string(),
66    ///         max_messages: 10_000,
67    ///         ..Default::default()
68    ///     })
69    ///     .await?;
70    ///
71    /// jetstream.publish("events", "data".into()).await?;
72    ///
73    /// let consumer: PushConsumer = stream
74    ///     .get_or_create_consumer(
75    ///         "consumer",
76    ///         async_nats::jetstream::consumer::push::Config {
77    ///             durable_name: Some("consumer".to_string()),
78    ///             deliver_subject: "deliver".to_string(),
79    ///             ..Default::default()
80    ///         },
81    ///     )
82    ///     .await?;
83    ///
84    /// let mut messages = consumer.messages().await?.take(100);
85    /// while let Some(Ok(message)) = messages.next().await {
86    ///     println!("got message {:?}", message);
87    ///     message.ack().await?;
88    /// }
89    /// Ok(())
90    /// # }
91    /// ```
92    pub async fn messages(&self) -> Result<Messages, StreamError> {
93        let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
94        let subscriber = if let Some(ref group) = self.info.config.deliver_group {
95            self.context
96                .client
97                .queue_subscribe(deliver_subject, group.to_owned())
98                .await
99                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
100        } else {
101            self.context
102                .client
103                .subscribe(deliver_subject)
104                .await
105                .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
106        };
107
108        Ok(Messages {
109            context: self.context.clone(),
110            config: self.config.clone(),
111            subscriber,
112            heartbeat_sleep: None,
113        })
114    }
115}
116
117pub struct Messages {
118    context: Context,
119    subscriber: Subscriber,
120    config: Config,
121    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
122}
123
124impl futures_util::Stream for Messages {
125    type Item = Result<Message, MessagesError>;
126
127    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
128        if !self.config.idle_heartbeat.is_zero() {
129            let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
130            match self
131                .heartbeat_sleep
132                .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
133                .poll_unpin(cx)
134            {
135                Poll::Ready(_) => {
136                    self.heartbeat_sleep = None;
137                    return Poll::Ready(Some(Err(MessagesError::new(
138                        MessagesErrorKind::MissingHeartbeat,
139                    ))));
140                }
141                Poll::Pending => (),
142            }
143        }
144        loop {
145            match self.subscriber.receiver.poll_recv(cx) {
146                Poll::Ready(maybe_message) => {
147                    self.heartbeat_sleep = None;
148                    match maybe_message {
149                        Some(message) => match message.status {
150                            Some(StatusCode::IDLE_HEARTBEAT) => {
151                                if let Some(subject) = message.reply {
152                                    // TODO store pending_publish as a future and return errors from it
153                                    let client = self.context.client.clone();
154                                    tokio::task::spawn(async move {
155                                        client
156                                            .publish(subject, Bytes::from_static(b""))
157                                            .await
158                                            .unwrap();
159                                    });
160                                }
161
162                                continue;
163                            }
164                            Some(_) => {
165                                continue;
166                            }
167                            None => {
168                                return Poll::Ready(Some(Ok(jetstream::Message {
169                                    context: self.context.clone(),
170                                    message,
171                                })))
172                            }
173                        },
174                        None => return Poll::Ready(None),
175                    }
176                }
177                Poll::Pending => return Poll::Pending,
178            }
179        }
180    }
181}
182
183/// Configuration for consumers. From a high level, the
184/// `durable_name` and `deliver_subject` fields have a particularly
185/// strong influence on the consumer's overall behavior.
186#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
187pub struct Config {
188    /// The delivery subject used by the push consumer.
189    #[serde(default)]
190    pub deliver_subject: String,
191    /// Setting `durable_name` to `Some(...)` will cause this consumer
192    /// to be "durable". This may be a good choice for workloads that
193    /// benefit from the `JetStream` server or cluster remembering the
194    /// progress of consumers for fault tolerance purposes. If a consumer
195    /// crashes, the `JetStream` server or cluster will remember which
196    /// messages the consumer acknowledged. When the consumer recovers,
197    /// this information will allow the consumer to resume processing
198    /// where it left off. If you're unsure, set this to `Some(...)`.
199    ///
200    /// Setting `durable_name` to `None` will cause this consumer to
201    /// be "ephemeral". This may be a good choice for workloads where
202    /// you don't need the `JetStream` server to remember the consumer's
203    /// progress in the case of a crash, such as certain "high churn"
204    /// workloads or workloads where a crashed instance is not required
205    /// to recover.
206    #[serde(default, skip_serializing_if = "Option::is_none")]
207    pub durable_name: Option<String>,
208    /// A name of the consumer. Can be specified for both durable and ephemeral
209    /// consumers.
210    #[serde(default, skip_serializing_if = "Option::is_none")]
211    pub name: Option<String>,
212    /// A short description of the purpose of this consumer.
213    #[serde(default, skip_serializing_if = "Option::is_none")]
214    pub description: Option<String>,
215    #[serde(default, skip_serializing_if = "Option::is_none")]
216    /// Deliver group to use.
217    pub deliver_group: Option<String>,
218    /// Allows for a variety of options that determine how this consumer will receive messages
219    #[serde(flatten)]
220    pub deliver_policy: DeliverPolicy,
221    /// How messages should be acknowledged
222    pub ack_policy: AckPolicy,
223    /// How long to allow messages to remain un-acknowledged before attempting redelivery
224    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
225    pub ack_wait: Duration,
226    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
227    #[serde(default, skip_serializing_if = "is_default")]
228    pub max_deliver: i64,
229    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
230    #[serde(default, skip_serializing_if = "is_default")]
231    pub filter_subject: String,
232    #[cfg(feature = "server_2_10")]
233    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
234    #[serde(default, skip_serializing_if = "is_default")]
235    pub filter_subjects: Vec<String>,
236    /// Whether messages are sent as quickly as possible or at the rate of receipt
237    pub replay_policy: ReplayPolicy,
238    /// The rate of message delivery in bits per second
239    #[serde(default, skip_serializing_if = "is_default")]
240    pub rate_limit: u64,
241    /// What percentage of acknowledgments should be samples for observability, 0-100
242    #[serde(
243        rename = "sample_freq",
244        with = "super::sample_freq_deser",
245        default,
246        skip_serializing_if = "is_default"
247    )]
248    pub sample_frequency: u8,
249    /// The maximum number of waiting consumers.
250    #[serde(default, skip_serializing_if = "is_default")]
251    pub max_waiting: i64,
252    /// The maximum number of unacknowledged messages that may be
253    /// in-flight before pausing sending additional messages to
254    /// this consumer.
255    #[serde(default, skip_serializing_if = "is_default")]
256    pub max_ack_pending: i64,
257    /// Only deliver headers without payloads.
258    #[serde(default, skip_serializing_if = "is_default")]
259    pub headers_only: bool,
260    /// Enable flow control messages
261    #[serde(default, skip_serializing_if = "is_default")]
262    pub flow_control: bool,
263    /// Enable idle heartbeat messages
264    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
265    pub idle_heartbeat: Duration,
266    /// Number of consumer replicas
267    #[serde(default, skip_serializing_if = "is_default")]
268    pub num_replicas: usize,
269    /// Force consumer to use memory storage.
270    #[serde(default, skip_serializing_if = "is_default")]
271    pub memory_storage: bool,
272    #[cfg(feature = "server_2_10")]
273    // Additional consumer metadata.
274    #[serde(default, skip_serializing_if = "is_default")]
275    pub metadata: HashMap<String, String>,
276    /// Custom backoff for missed acknowledgments.
277    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
278    pub backoff: Vec<Duration>,
279    /// Threshold for consumer inactivity
280    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
281    pub inactive_threshold: Duration,
282    /// Create a consumer paused until provided deadline.
283    #[cfg(feature = "server_2_11")]
284    #[serde(
285        default,
286        with = "rfc3339::option",
287        skip_serializing_if = "Option::is_none"
288    )]
289    pub pause_until: Option<OffsetDateTime>,
290}
291
292impl FromConsumer for Config {
293    fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
294        if config.deliver_subject.is_none() {
295            return Err(Box::new(io::Error::other(
296                "push consumer must have delivery subject",
297            )));
298        }
299
300        Ok(Config {
301            deliver_subject: config.deliver_subject.unwrap(),
302            durable_name: config.durable_name,
303            name: config.name,
304            description: config.description,
305            deliver_group: config.deliver_group,
306            deliver_policy: config.deliver_policy,
307            ack_policy: config.ack_policy,
308            ack_wait: config.ack_wait,
309            max_deliver: config.max_deliver,
310            filter_subject: config.filter_subject,
311            #[cfg(feature = "server_2_10")]
312            filter_subjects: config.filter_subjects,
313            replay_policy: config.replay_policy,
314            rate_limit: config.rate_limit,
315            sample_frequency: config.sample_frequency,
316            max_waiting: config.max_waiting,
317            max_ack_pending: config.max_ack_pending,
318            headers_only: config.headers_only,
319            flow_control: config.flow_control,
320            idle_heartbeat: config.idle_heartbeat,
321            num_replicas: config.num_replicas,
322            memory_storage: config.memory_storage,
323            #[cfg(feature = "server_2_10")]
324            metadata: config.metadata,
325            backoff: config.backoff,
326            inactive_threshold: config.inactive_threshold,
327            #[cfg(feature = "server_2_11")]
328            pause_until: config.pause_until,
329        })
330    }
331}
332
333impl IntoConsumerConfig for Config {
334    fn into_consumer_config(self) -> jetstream::consumer::Config {
335        jetstream::consumer::Config {
336            deliver_subject: Some(self.deliver_subject),
337            durable_name: self.durable_name,
338            name: self.name,
339            description: self.description,
340            deliver_group: self.deliver_group,
341            deliver_policy: self.deliver_policy,
342            ack_policy: self.ack_policy,
343            ack_wait: self.ack_wait,
344            max_deliver: self.max_deliver,
345            filter_subject: self.filter_subject,
346            #[cfg(feature = "server_2_10")]
347            filter_subjects: self.filter_subjects,
348            replay_policy: self.replay_policy,
349            rate_limit: self.rate_limit,
350            sample_frequency: self.sample_frequency,
351            max_waiting: self.max_waiting,
352            max_ack_pending: self.max_ack_pending,
353            headers_only: self.headers_only,
354            flow_control: self.flow_control,
355            idle_heartbeat: self.idle_heartbeat,
356            max_batch: 0,
357            max_bytes: 0,
358            max_expires: Duration::default(),
359            inactive_threshold: self.inactive_threshold,
360            num_replicas: self.num_replicas,
361            memory_storage: self.memory_storage,
362            #[cfg(feature = "server_2_10")]
363            metadata: self.metadata,
364            backoff: self.backoff,
365            #[cfg(feature = "server_2_11")]
366            priority_policy: PriorityPolicy::None,
367            #[cfg(feature = "server_2_11")]
368            priority_groups: Vec::new(),
369            #[cfg(feature = "server_2_11")]
370            pause_until: self.pause_until,
371        }
372    }
373}
374impl IntoConsumerConfig for &Config {
375    fn into_consumer_config(self) -> jetstream::consumer::Config {
376        self.clone().into_consumer_config()
377    }
378}
379fn is_default<T: Default + Eq>(t: &T) -> bool {
380    t == &T::default()
381}
382
383/// Configuration for consumers. From a high level, the
384/// `durable_name` and `deliver_subject` fields have a particularly
385/// strong influence on the consumer's overall behavior.
386#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
387pub struct OrderedConfig {
388    /// The delivery subject used by the push consumer.
389    #[serde(default)]
390    pub deliver_subject: String,
391    /// A name of the consumer. Can be specified for both durable and ephemeral
392    /// consumers.
393    #[serde(default, skip_serializing_if = "Option::is_none")]
394    pub name: Option<String>,
395    /// A short description of the purpose of this consumer.
396    #[serde(default, skip_serializing_if = "Option::is_none")]
397    pub description: Option<String>,
398    #[serde(default, skip_serializing_if = "is_default")]
399    pub filter_subject: String,
400    #[cfg(feature = "server_2_10")]
401    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
402    #[serde(default, skip_serializing_if = "is_default")]
403    pub filter_subjects: Vec<String>,
404    /// Whether messages are sent as quickly as possible or at the rate of receipt
405    pub replay_policy: ReplayPolicy,
406    /// The rate of message delivery in bits per second
407    #[serde(default, skip_serializing_if = "is_default")]
408    pub rate_limit: u64,
409    /// What percentage of acknowledgments should be samples for observability, 0-100
410    #[serde(
411        rename = "sample_freq",
412        with = "super::sample_freq_deser",
413        default,
414        skip_serializing_if = "is_default"
415    )]
416    pub sample_frequency: u8,
417    /// Only deliver headers without payloads.
418    #[serde(default, skip_serializing_if = "is_default")]
419    pub headers_only: bool,
420    /// Allows for a variety of options that determine how this consumer will receive messages
421    #[serde(flatten)]
422    pub deliver_policy: DeliverPolicy,
423    /// The maximum number of waiting consumers.
424    #[serde(default, skip_serializing_if = "is_default")]
425    pub max_waiting: i64,
426    #[cfg(feature = "server_2_10")]
427    // Additional consumer metadata.
428    #[serde(default, skip_serializing_if = "is_default")]
429    pub metadata: HashMap<String, String>,
430}
431
432impl FromConsumer for OrderedConfig {
433    fn try_from_consumer_config(
434        config: crate::jetstream::consumer::Config,
435    ) -> Result<Self, crate::Error>
436    where
437        Self: Sized,
438    {
439        if config.deliver_subject.is_none() {
440            return Err(Box::new(io::Error::other(
441                "push consumer must have delivery subject",
442            )));
443        }
444        Ok(OrderedConfig {
445            name: config.name,
446            deliver_subject: config.deliver_subject.unwrap(),
447            description: config.description,
448            filter_subject: config.filter_subject,
449            #[cfg(feature = "server_2_10")]
450            filter_subjects: config.filter_subjects,
451            replay_policy: config.replay_policy,
452            rate_limit: config.rate_limit,
453            sample_frequency: config.sample_frequency,
454            headers_only: config.headers_only,
455            deliver_policy: config.deliver_policy,
456            max_waiting: config.max_waiting,
457            #[cfg(feature = "server_2_10")]
458            metadata: config.metadata,
459        })
460    }
461}
462
463impl IntoConsumerConfig for OrderedConfig {
464    fn into_consumer_config(self) -> super::Config {
465        jetstream::consumer::Config {
466            deliver_subject: Some(self.deliver_subject),
467            durable_name: None,
468            name: self.name,
469            description: self.description,
470            deliver_group: None,
471            deliver_policy: self.deliver_policy,
472            ack_policy: AckPolicy::None,
473            ack_wait: Duration::default(),
474            max_deliver: 1,
475            filter_subject: self.filter_subject,
476            #[cfg(feature = "server_2_10")]
477            filter_subjects: self.filter_subjects,
478            replay_policy: self.replay_policy,
479            rate_limit: self.rate_limit,
480            sample_frequency: self.sample_frequency,
481            max_waiting: self.max_waiting,
482            max_ack_pending: 0,
483            headers_only: self.headers_only,
484            flow_control: true,
485            idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
486            max_batch: 0,
487            max_bytes: 0,
488            max_expires: Duration::default(),
489            inactive_threshold: Duration::from_secs(30),
490            num_replicas: 1,
491            memory_storage: true,
492            #[cfg(feature = "server_2_10")]
493            metadata: self.metadata,
494            backoff: Vec::new(),
495            #[cfg(feature = "server_2_11")]
496            priority_policy: PriorityPolicy::None,
497            #[cfg(feature = "server_2_11")]
498            priority_groups: Vec::new(),
499            #[cfg(feature = "server_2_11")]
500            pause_until: None,
501        }
502    }
503}
504
505impl Consumer<OrderedConfig> {
506    pub async fn messages(self) -> Result<Ordered, StreamError> {
507        let subscriber = self
508            .context
509            .client
510            .subscribe(self.info.config.deliver_subject.clone().unwrap())
511            .await
512            .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
513
514        let last_sequence = Arc::new(AtomicU64::new(0));
515        let consumer_sequence = Arc::new(AtomicU64::new(0));
516
517        let state = self.context.client.state.clone();
518        Ok(Ordered {
519            context: self.context.clone(),
520            consumer: self,
521            subscriber: Some(subscriber),
522            subscriber_future: None,
523            stream_sequence: last_sequence,
524            consumer_sequence,
525            heartbeat_sleep: None,
526            state,
527            last_known_state: State::Connected,
528            state_change_future: None,
529        })
530    }
531}
532
533pub struct Ordered {
534    context: Context,
535    consumer: Consumer<OrderedConfig>,
536    subscriber: Option<Subscriber>,
537    subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
538    stream_sequence: Arc<AtomicU64>,
539    consumer_sequence: Arc<AtomicU64>,
540    heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
541    state: tokio::sync::watch::Receiver<State>,
542    last_known_state: State,
543    state_change_future:
544        Option<BoxFuture<'static, Result<(), tokio::sync::watch::error::RecvError>>>,
545}
546
547impl futures_util::Stream for Ordered {
548    type Item = Result<Message, OrderedError>;
549
550    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
551        loop {
552            // Check heartbeat timer, but only if we have an active subscriber
553            // Don't check during recreation
554            if self.subscriber.is_some()
555                && self.subscriber_future.is_none()
556                && self
557                    .heartbeat_sleep
558                    .get_or_insert_with(|| {
559                        Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
560                    })
561                    .poll_unpin(cx)
562                    .is_ready()
563            {
564                self.heartbeat_sleep = None;
565                return Poll::Ready(Some(Err(OrderedError::new(
566                    OrderedErrorKind::MissingHeartbeat,
567                ))));
568            }
569            // Poll for connection state changes
570            match &mut self.state_change_future {
571                None => {
572                    // Check if state has already changed
573                    if self.state.has_changed().unwrap_or(false) {
574                        let current_state = self.state.borrow_and_update().clone();
575
576                        // Handle state transitions
577                        use State::*;
578                        match (&self.last_known_state, &current_state) {
579                            (Connected, Disconnected | Pending) => {
580                                debug!("Connection lost, marking subscriber as invalid");
581                                self.subscriber = None;
582                                self.heartbeat_sleep = None;
583                            }
584                            (Disconnected | Pending, Connected) => {
585                                debug!("Connection restored, triggering consumer recreation");
586                                self.subscriber = None;
587                                self.consumer_sequence.store(0, Ordering::Relaxed);
588                                self.heartbeat_sleep = None;
589                            }
590                            _ => {}
591                        }
592
593                        self.last_known_state = current_state;
594                    } else {
595                        // Create a future to wait for state changes
596                        let mut state = self.state.clone();
597                        self.state_change_future =
598                            Some(Box::pin(async move { state.changed().await }));
599                    }
600                }
601                Some(fut) => match fut.poll_unpin(cx) {
602                    Poll::Ready(Ok(())) => {
603                        self.state_change_future = None;
604                        continue; // State changed, restart loop to check it
605                    }
606                    Poll::Ready(Err(_)) => {
607                        return Poll::Ready(Some(Err(OrderedError::with_source(
608                            OrderedErrorKind::Other,
609                            "Connection state watcher dropped",
610                        ))));
611                    }
612                    Poll::Pending => {}
613                },
614            }
615
616            // Handle messages after checking connection state
617
618            // Handle subscriber recreation if needed
619            if self.subscriber.is_none() {
620                // Ensure we have a recreation future
621                if self.subscriber_future.is_none() {
622                    trace!("Creating subscriber recreation future");
623                    let context = self.context.clone();
624                    let sequence = self.stream_sequence.clone();
625                    let config = self.consumer.config.clone();
626                    let stream_name = self.consumer.info.stream_name.clone();
627
628                    self.subscriber_future = Some(Box::pin(async move {
629                        tryhard::retry_fn(|| {
630                            recreate_consumer_and_subscription(
631                                context.clone(),
632                                config.clone(),
633                                stream_name.clone(),
634                                sequence.load(Ordering::Relaxed),
635                            )
636                        })
637                        .retries(u32::MAX)
638                        .custom_backoff(backoff)
639                        .await
640                        .map_err(|err| {
641                            ConsumerRecreateError::with_source(
642                                ConsumerRecreateErrorKind::Recreate,
643                                err,
644                            )
645                        })
646                    }));
647                }
648
649                // Poll the recreation future
650                if let Some(fut) = &mut self.subscriber_future {
651                    match fut.poll_unpin(cx) {
652                        Poll::Ready(result) => {
653                            self.subscriber_future = None;
654                            self.consumer_sequence.store(0, Ordering::Relaxed);
655                            self.subscriber = Some(result.map_err(|err| {
656                                OrderedError::with_source(OrderedErrorKind::Recreate, err)
657                            })?);
658                        }
659                        Poll::Pending => return Poll::Pending,
660                    }
661                }
662            }
663            // Poll for messages
664            if let Some(subscriber) = self.subscriber.as_mut() {
665                match subscriber.receiver.poll_recv(cx) {
666                    Poll::Ready(None) => return Poll::Ready(None),
667                    Poll::Ready(Some(message)) => {
668                        self.heartbeat_sleep = None;
669                        match message.status {
670                            Some(StatusCode::IDLE_HEARTBEAT) => {
671                                debug!("received idle heartbeats");
672                                if let Some(headers) = message.headers.as_ref() {
673                                    if let Some(sequence) =
674                                        headers.get_last(crate::header::NATS_LAST_CONSUMER)
675                                    {
676                                        let sequence: u64 =
677                                            sequence.as_str().parse().map_err(|err| {
678                                                OrderedError::with_source(
679                                                    OrderedErrorKind::Other,
680                                                    err,
681                                                )
682                                            })?;
683
684                                        let last_sequence =
685                                            self.consumer_sequence.load(Ordering::Relaxed);
686
687                                        if sequence != last_sequence {
688                                            debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
689                                            self.subscriber = None;
690                                            self.heartbeat_sleep = None;
691                                        }
692                                    }
693                                }
694                                // flow control.
695                                if let Some(subject) = message.reply.clone() {
696                                    trace!("received flow control message");
697                                    let client = self.context.client.clone();
698                                    tokio::task::spawn(async move {
699                                        client.publish(subject, Bytes::from_static(b"")).await.ok();
700                                    });
701                                }
702                                continue;
703                            }
704                            Some(status) => {
705                                debug!("received status message: {}", status);
706                                continue;
707                            }
708                            None => {
709                                trace!("received a message");
710                                let jetstream_message = jetstream::message::Message {
711                                    message,
712                                    context: self.context.clone(),
713                                };
714
715                                let info = jetstream_message.info().map_err(|err| {
716                                    OrderedError::with_source(OrderedErrorKind::Other, err)
717                                })?;
718                                trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
719                                               self.consumer_sequence,
720                                               self.stream_sequence,
721                                               info.consumer_sequence,
722                                               info.stream_sequence);
723                                if info.consumer_sequence
724                                    != self.consumer_sequence.load(Ordering::Relaxed) + 1
725                                {
726                                    debug!(
727                                        "ordered consumer mismatch. current {}, info: {}",
728                                        self.consumer_sequence.load(Ordering::Relaxed),
729                                        info.consumer_sequence
730                                    );
731                                    self.subscriber = None;
732                                    self.consumer_sequence.store(0, Ordering::Relaxed);
733                                    self.heartbeat_sleep = None;
734                                    continue;
735                                }
736                                self.stream_sequence
737                                    .store(info.stream_sequence, Ordering::Relaxed);
738                                self.consumer_sequence
739                                    .store(info.consumer_sequence, Ordering::Relaxed);
740                                return Poll::Ready(Some(Ok(jetstream_message)));
741                            }
742                        }
743                    }
744                    Poll::Pending => return Poll::Pending,
745                }
746            }
747        }
748    }
749}
750
751#[derive(Clone, Debug, PartialEq)]
752pub enum OrderedErrorKind {
753    MissingHeartbeat,
754    ConsumerDeleted,
755    PullBasedConsumer,
756    Recreate,
757    Other,
758}
759
760impl std::fmt::Display for OrderedErrorKind {
761    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
762        match self {
763            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
764            Self::ConsumerDeleted => write!(f, "consumer deleted"),
765            Self::Other => write!(f, "error"),
766            Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
767            Self::Recreate => write!(f, "consumer recreation failed"),
768        }
769    }
770}
771
772pub type OrderedError = Error<OrderedErrorKind>;
773
774impl From<MessagesError> for OrderedError {
775    fn from(err: MessagesError) -> Self {
776        match err.kind() {
777            MessagesErrorKind::MissingHeartbeat => {
778                OrderedError::new(OrderedErrorKind::MissingHeartbeat)
779            }
780            MessagesErrorKind::ConsumerDeleted => {
781                OrderedError::new(OrderedErrorKind::ConsumerDeleted)
782            }
783            MessagesErrorKind::PullBasedConsumer => {
784                OrderedError::new(OrderedErrorKind::PullBasedConsumer)
785            }
786            MessagesErrorKind::Other => OrderedError {
787                kind: OrderedErrorKind::Other,
788                source: err.source,
789            },
790        }
791    }
792}
793
794#[derive(Clone, Copy, Debug, PartialEq)]
795pub enum MessagesErrorKind {
796    MissingHeartbeat,
797    ConsumerDeleted,
798    PullBasedConsumer,
799    Other,
800}
801
802impl std::fmt::Display for MessagesErrorKind {
803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804        match self {
805            Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
806            Self::ConsumerDeleted => write!(f, "consumer deleted"),
807            Self::Other => write!(f, "error"),
808            Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
809        }
810    }
811}
812
813pub type MessagesError = Error<MessagesErrorKind>;
814
815#[derive(Clone, Copy, Debug, PartialEq)]
816pub enum ConsumerRecreateErrorKind {
817    GetStream,
818    Subscription,
819    Recreate,
820    TimedOut,
821}
822
823impl std::fmt::Display for ConsumerRecreateErrorKind {
824    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825        match self {
826            Self::GetStream => write!(f, "error getting stream"),
827            Self::Recreate => write!(f, "consumer creation failed"),
828            Self::TimedOut => write!(f, "timed out"),
829            Self::Subscription => write!(f, "failed to resubscribe"),
830        }
831    }
832}
833
834pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
835
836async fn recreate_consumer_and_subscription(
837    context: Context,
838    mut config: OrderedConfig,
839    stream_name: String,
840    sequence: u64,
841) -> Result<Subscriber, ConsumerRecreateError> {
842    let delivery_subject = context.client.new_inbox();
843    config.deliver_subject = delivery_subject;
844
845    let subscriber = context
846        .client
847        .subscribe(config.deliver_subject.clone())
848        .await
849        .map_err(|err| {
850            ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
851        })?;
852
853    recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
854    Ok(subscriber)
855}
856async fn recreate_ephemeral_consumer(
857    context: Context,
858    config: OrderedConfig,
859    stream_name: String,
860    sequence: u64,
861) -> Result<(), ConsumerRecreateError> {
862    let deliver_policy = {
863        if sequence == 0 {
864            DeliverPolicy::All
865        } else {
866            DeliverPolicy::ByStartSequence {
867                start_sequence: sequence + 1,
868            }
869        }
870    };
871
872    let config = config.clone();
873    tokio::time::timeout(
874        Duration::from_secs(5),
875        context.create_consumer_on_stream(
876            jetstream::consumer::push::OrderedConfig {
877                deliver_policy,
878                ..config
879            },
880            stream_name.clone(),
881        ),
882    )
883    .await
884    .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
885    .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
886
887    Ok(())
888}