1use super::{
15 AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16 StreamError, StreamErrorKind,
17};
18use crate::{
19 connection::State,
20 error::Error,
21 jetstream::{self, Context, Message},
22 StatusCode, Subscriber,
23};
24
25use bytes::Bytes;
26use futures::{future::BoxFuture, FutureExt};
27use portable_atomic::AtomicU64;
28use serde::{Deserialize, Serialize};
29#[cfg(feature = "server_2_10")]
30use std::collections::HashMap;
31use std::task::{self, Poll};
32use std::{
33 io::{self, ErrorKind},
34 pin::Pin,
35 sync::Arc,
36};
37use std::{sync::atomic::Ordering, time::Duration};
38use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle};
39use tracing::{debug, trace};
40
41const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
42
43impl Consumer<Config> {
44 pub async fn messages(&self) -> Result<Messages, StreamError> {
88 let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
89 let subscriber = if let Some(ref group) = self.info.config.deliver_group {
90 self.context
91 .client
92 .queue_subscribe(deliver_subject, group.to_owned())
93 .await
94 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
95 } else {
96 self.context
97 .client
98 .subscribe(deliver_subject)
99 .await
100 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
101 };
102
103 Ok(Messages {
104 context: self.context.clone(),
105 config: self.config.clone(),
106 subscriber,
107 heartbeat_sleep: None,
108 })
109 }
110}
111
112pub struct Messages {
113 context: Context,
114 subscriber: Subscriber,
115 config: Config,
116 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
117}
118
119impl futures::Stream for Messages {
120 type Item = Result<Message, MessagesError>;
121
122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
123 if !self.config.idle_heartbeat.is_zero() {
124 let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
125 match self
126 .heartbeat_sleep
127 .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
128 .poll_unpin(cx)
129 {
130 Poll::Ready(_) => {
131 self.heartbeat_sleep = None;
132 return Poll::Ready(Some(Err(MessagesError::new(
133 MessagesErrorKind::MissingHeartbeat,
134 ))));
135 }
136 Poll::Pending => (),
137 }
138 }
139 loop {
140 match self.subscriber.receiver.poll_recv(cx) {
141 Poll::Ready(maybe_message) => {
142 self.heartbeat_sleep = None;
143 match maybe_message {
144 Some(message) => match message.status {
145 Some(StatusCode::IDLE_HEARTBEAT) => {
146 if let Some(subject) = message.reply {
147 let client = self.context.client.clone();
149 tokio::task::spawn(async move {
150 client
151 .publish(subject, Bytes::from_static(b""))
152 .await
153 .unwrap();
154 });
155 }
156
157 continue;
158 }
159 Some(_) => {
160 continue;
161 }
162 None => {
163 return Poll::Ready(Some(Ok(jetstream::Message {
164 context: self.context.clone(),
165 message,
166 })))
167 }
168 },
169 None => return Poll::Ready(None),
170 }
171 }
172 Poll::Pending => return Poll::Pending,
173 }
174 }
175 }
176}
177
178#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
182pub struct Config {
183 #[serde(default)]
185 pub deliver_subject: String,
186 #[serde(default, skip_serializing_if = "Option::is_none")]
202 pub durable_name: Option<String>,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub name: Option<String>,
207 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub description: Option<String>,
210 #[serde(default, skip_serializing_if = "Option::is_none")]
211 pub deliver_group: Option<String>,
213 #[serde(flatten)]
215 pub deliver_policy: DeliverPolicy,
216 pub ack_policy: AckPolicy,
218 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
220 pub ack_wait: Duration,
221 #[serde(default, skip_serializing_if = "is_default")]
223 pub max_deliver: i64,
224 #[serde(default, skip_serializing_if = "is_default")]
226 pub filter_subject: String,
227 #[cfg(feature = "server_2_10")]
228 #[serde(default, skip_serializing_if = "is_default")]
230 pub filter_subjects: Vec<String>,
231 pub replay_policy: ReplayPolicy,
233 #[serde(default, skip_serializing_if = "is_default")]
235 pub rate_limit: u64,
236 #[serde(default, skip_serializing_if = "is_default")]
238 pub sample_frequency: u8,
239 #[serde(default, skip_serializing_if = "is_default")]
241 pub max_waiting: i64,
242 #[serde(default, skip_serializing_if = "is_default")]
246 pub max_ack_pending: i64,
247 #[serde(default, skip_serializing_if = "is_default")]
249 pub headers_only: bool,
250 #[serde(default, skip_serializing_if = "is_default")]
252 pub flow_control: bool,
253 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
255 pub idle_heartbeat: Duration,
256 #[serde(default, skip_serializing_if = "is_default")]
258 pub num_replicas: usize,
259 #[serde(default, skip_serializing_if = "is_default")]
261 pub memory_storage: bool,
262 #[cfg(feature = "server_2_10")]
263 #[serde(default, skip_serializing_if = "is_default")]
265 pub metadata: HashMap<String, String>,
266 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
268 pub backoff: Vec<Duration>,
269 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
271 pub inactive_threshold: Duration,
272}
273
274impl FromConsumer for Config {
275 fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
276 if config.deliver_subject.is_none() {
277 return Err(Box::new(io::Error::new(
278 ErrorKind::Other,
279 "push consumer must have delivery subject",
280 )));
281 }
282
283 Ok(Config {
284 deliver_subject: config.deliver_subject.unwrap(),
285 durable_name: config.durable_name,
286 name: config.name,
287 description: config.description,
288 deliver_group: config.deliver_group,
289 deliver_policy: config.deliver_policy,
290 ack_policy: config.ack_policy,
291 ack_wait: config.ack_wait,
292 max_deliver: config.max_deliver,
293 filter_subject: config.filter_subject,
294 #[cfg(feature = "server_2_10")]
295 filter_subjects: config.filter_subjects,
296 replay_policy: config.replay_policy,
297 rate_limit: config.rate_limit,
298 sample_frequency: config.sample_frequency,
299 max_waiting: config.max_waiting,
300 max_ack_pending: config.max_ack_pending,
301 headers_only: config.headers_only,
302 flow_control: config.flow_control,
303 idle_heartbeat: config.idle_heartbeat,
304 num_replicas: config.num_replicas,
305 memory_storage: config.memory_storage,
306 #[cfg(feature = "server_2_10")]
307 metadata: config.metadata,
308 backoff: config.backoff,
309 inactive_threshold: config.inactive_threshold,
310 })
311 }
312}
313
314impl IntoConsumerConfig for Config {
315 fn into_consumer_config(self) -> jetstream::consumer::Config {
316 jetstream::consumer::Config {
317 deliver_subject: Some(self.deliver_subject),
318 durable_name: self.durable_name,
319 name: self.name,
320 description: self.description,
321 deliver_group: self.deliver_group,
322 deliver_policy: self.deliver_policy,
323 ack_policy: self.ack_policy,
324 ack_wait: self.ack_wait,
325 max_deliver: self.max_deliver,
326 filter_subject: self.filter_subject,
327 #[cfg(feature = "server_2_10")]
328 filter_subjects: self.filter_subjects,
329 replay_policy: self.replay_policy,
330 rate_limit: self.rate_limit,
331 sample_frequency: self.sample_frequency,
332 max_waiting: self.max_waiting,
333 max_ack_pending: self.max_ack_pending,
334 headers_only: self.headers_only,
335 flow_control: self.flow_control,
336 idle_heartbeat: self.idle_heartbeat,
337 max_batch: 0,
338 max_bytes: 0,
339 max_expires: Duration::default(),
340 inactive_threshold: self.inactive_threshold,
341 num_replicas: self.num_replicas,
342 memory_storage: self.memory_storage,
343 #[cfg(feature = "server_2_10")]
344 metadata: self.metadata,
345 backoff: self.backoff,
346 }
347 }
348}
349impl IntoConsumerConfig for &Config {
350 fn into_consumer_config(self) -> jetstream::consumer::Config {
351 self.clone().into_consumer_config()
352 }
353}
354fn is_default<T: Default + Eq>(t: &T) -> bool {
355 t == &T::default()
356}
357
358#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
362pub struct OrderedConfig {
363 #[serde(default)]
365 pub deliver_subject: String,
366 #[serde(default, skip_serializing_if = "Option::is_none")]
369 pub name: Option<String>,
370 #[serde(default, skip_serializing_if = "Option::is_none")]
372 pub description: Option<String>,
373 #[serde(default, skip_serializing_if = "is_default")]
374 pub filter_subject: String,
375 #[cfg(feature = "server_2_10")]
376 #[serde(default, skip_serializing_if = "is_default")]
378 pub filter_subjects: Vec<String>,
379 pub replay_policy: ReplayPolicy,
381 #[serde(default, skip_serializing_if = "is_default")]
383 pub rate_limit: u64,
384 #[serde(default, skip_serializing_if = "is_default")]
386 pub sample_frequency: u8,
387 #[serde(default, skip_serializing_if = "is_default")]
389 pub headers_only: bool,
390 #[serde(flatten)]
392 pub deliver_policy: DeliverPolicy,
393 #[serde(default, skip_serializing_if = "is_default")]
395 pub max_waiting: i64,
396 #[cfg(feature = "server_2_10")]
397 #[serde(default, skip_serializing_if = "is_default")]
399 pub metadata: HashMap<String, String>,
400}
401
402impl FromConsumer for OrderedConfig {
403 fn try_from_consumer_config(
404 config: crate::jetstream::consumer::Config,
405 ) -> Result<Self, crate::Error>
406 where
407 Self: Sized,
408 {
409 if config.deliver_subject.is_none() {
410 return Err(Box::new(io::Error::new(
411 ErrorKind::Other,
412 "push consumer must have delivery subject",
413 )));
414 }
415 Ok(OrderedConfig {
416 name: config.name,
417 deliver_subject: config.deliver_subject.unwrap(),
418 description: config.description,
419 filter_subject: config.filter_subject,
420 #[cfg(feature = "server_2_10")]
421 filter_subjects: config.filter_subjects,
422 replay_policy: config.replay_policy,
423 rate_limit: config.rate_limit,
424 sample_frequency: config.sample_frequency,
425 headers_only: config.headers_only,
426 deliver_policy: config.deliver_policy,
427 max_waiting: config.max_waiting,
428 #[cfg(feature = "server_2_10")]
429 metadata: config.metadata,
430 })
431 }
432}
433
434impl IntoConsumerConfig for OrderedConfig {
435 fn into_consumer_config(self) -> super::Config {
436 jetstream::consumer::Config {
437 deliver_subject: Some(self.deliver_subject),
438 durable_name: None,
439 name: self.name,
440 description: self.description,
441 deliver_group: None,
442 deliver_policy: self.deliver_policy,
443 ack_policy: AckPolicy::None,
444 ack_wait: Duration::default(),
445 max_deliver: 1,
446 filter_subject: self.filter_subject,
447 #[cfg(feature = "server_2_10")]
448 filter_subjects: self.filter_subjects,
449 replay_policy: self.replay_policy,
450 rate_limit: self.rate_limit,
451 sample_frequency: self.sample_frequency,
452 max_waiting: self.max_waiting,
453 max_ack_pending: 0,
454 headers_only: self.headers_only,
455 flow_control: true,
456 idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
457 max_batch: 0,
458 max_bytes: 0,
459 max_expires: Duration::default(),
460 inactive_threshold: Duration::from_secs(30),
461 num_replicas: 1,
462 memory_storage: true,
463 #[cfg(feature = "server_2_10")]
464 metadata: self.metadata,
465 backoff: Vec::new(),
466 }
467 }
468}
469
470impl Consumer<OrderedConfig> {
471 pub async fn messages<'a>(self) -> Result<Ordered, StreamError> {
472 let subscriber = self
473 .context
474 .client
475 .subscribe(self.info.config.deliver_subject.clone().unwrap())
476 .await
477 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
478
479 let last_sequence = Arc::new(AtomicU64::new(0));
480 let consumer_sequence = Arc::new(AtomicU64::new(0));
481 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
482 let handle = tokio::task::spawn({
483 let stream_name = self.info.stream_name.clone();
484 let config = self.config.clone();
485 let mut context = self.context.clone();
486 let last_sequence = last_sequence.clone();
487 let consumer_sequence = consumer_sequence.clone();
488 let state = self.context.client.state.clone();
489 async move {
490 loop {
491 let current_state = state.borrow().to_owned();
492
493 context.client.state.changed().await.unwrap();
494 if state.borrow().to_owned() != State::Connected
496 || current_state == State::Connected
497 {
498 continue;
499 }
500 debug!("reconnected. trigger consumer recreation");
501
502 debug!(
503 "idle heartbeats expired. recreating consumer s: {}, {:?}",
504 stream_name, config
505 );
506 let consumer = tryhard::retry_fn(|| {
507 recreate_ephemeral_consumer(
508 context.clone(),
509 config.clone(),
510 stream_name.clone(),
511 last_sequence.load(Ordering::Relaxed),
512 )
513 })
514 .retries(5)
515 .exponential_backoff(Duration::from_millis(500))
516 .await;
517 if let Err(err) = consumer {
518 shutdown_tx.send(err).unwrap();
519 break;
520 }
521 debug!("resetting consume sequence to 0");
522 consumer_sequence.store(0, Ordering::Relaxed);
523 }
524 }
525 });
526
527 Ok(Ordered {
528 context: self.context.clone(),
529 consumer: self,
530 subscriber: Some(subscriber),
531 subscriber_future: None,
532 stream_sequence: last_sequence,
533 consumer_sequence,
534 shutdown: shutdown_rx,
535 handle,
536 heartbeat_sleep: None,
537 })
538 }
539}
540
541pub struct Ordered {
542 context: Context,
543 consumer: Consumer<OrderedConfig>,
544 subscriber: Option<Subscriber>,
545 subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
546 stream_sequence: Arc<AtomicU64>,
547 consumer_sequence: Arc<AtomicU64>,
548 shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
549 handle: JoinHandle<()>,
550 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
551}
552
553impl Drop for Ordered {
554 fn drop(&mut self) {
555 self.handle.abort()
557 }
558}
559
560impl futures::Stream for Ordered {
561 type Item = Result<Message, OrderedError>;
562
563 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
564 match self
565 .heartbeat_sleep
566 .get_or_insert_with(|| {
567 Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
568 })
569 .poll_unpin(cx)
570 {
571 Poll::Ready(_) => {
572 self.heartbeat_sleep = None;
573 return Poll::Ready(Some(Err(OrderedError::new(
574 OrderedErrorKind::MissingHeartbeat,
575 ))));
576 }
577 Poll::Pending => (),
578 }
579
580 loop {
581 match self.shutdown.try_recv() {
582 Ok(err) => {
583 return Poll::Ready(Some(Err(OrderedError::with_source(
584 OrderedErrorKind::Other,
585 err,
586 ))))
587 }
588 Err(TryRecvError::Closed) => {
589 return Poll::Ready(Some(Err(OrderedError::with_source(
590 OrderedErrorKind::Other,
591 "consumer task closed",
592 ))))
593 }
594 Err(TryRecvError::Empty) => {}
595 }
596 if self.subscriber.is_none() {
597 match self.subscriber_future.as_mut() {
598 None => {
599 trace!(
600 "subscriber and subscriber future are None. Recreating the consumer"
601 );
602 let context = self.context.clone();
603 let sequence = self.stream_sequence.clone();
604 let config = self.consumer.config.clone();
605 let stream_name = self.consumer.info.stream_name.clone();
606 let subscriber_future =
607 self.subscriber_future.insert(Box::pin(async move {
608 recreate_consumer_and_subscription(
609 context,
610 config,
611 stream_name,
612 sequence.load(Ordering::Relaxed),
613 )
614 .await
615 }));
616 match subscriber_future.as_mut().poll(cx) {
617 Poll::Ready(subscriber) => {
618 self.subscriber_future = None;
619 self.consumer_sequence.store(0, Ordering::Relaxed);
620 self.subscriber = Some(subscriber.map_err(|err| {
621 OrderedError::with_source(OrderedErrorKind::Recreate, err)
622 })?);
623 }
624 Poll::Pending => {
625 return Poll::Pending;
626 }
627 }
628 }
629 Some(subscriber) => match subscriber.as_mut().poll(cx) {
630 Poll::Ready(subscriber) => {
631 self.subscriber_future = None;
632 self.consumer_sequence.store(0, Ordering::Relaxed);
633 self.subscriber = Some(subscriber.map_err(|err| {
634 OrderedError::with_source(OrderedErrorKind::Recreate, err)
635 })?);
636 }
637 Poll::Pending => {
638 return Poll::Pending;
639 }
640 },
641 }
642 }
643 if let Some(subscriber) = self.subscriber.as_mut() {
644 match subscriber.receiver.poll_recv(cx) {
645 Poll::Ready(maybe_message) => match maybe_message {
646 Some(message) => {
647 self.heartbeat_sleep = None;
648 match message.status {
649 Some(StatusCode::IDLE_HEARTBEAT) => {
650 debug!("received idle heartbeats");
651 if let Some(headers) = message.headers.as_ref() {
652 if let Some(sequence) =
653 headers.get_last(crate::header::NATS_LAST_CONSUMER)
654 {
655 let sequence: u64 =
656 sequence.as_str().parse().map_err(|err| {
657 OrderedError::with_source(
658 OrderedErrorKind::Other,
659 err,
660 )
661 })?;
662
663 let last_sequence =
664 self.consumer_sequence.load(Ordering::Relaxed);
665
666 if sequence != last_sequence {
667 debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
668 self.subscriber = None;
669 }
670 }
671 }
672 if let Some(subject) = message.reply.clone() {
674 trace!("received flow control message");
675 let client = self.context.client.clone();
676 tokio::task::spawn(async move {
677 client
678 .publish(subject, Bytes::from_static(b""))
679 .await
680 .ok();
681 });
682 }
683 continue;
684 }
685 Some(status) => {
686 debug!("received status message: {}", status);
687 continue;
688 }
689 None => {
690 trace!("received a message");
691 let jetstream_message = jetstream::message::Message {
692 message,
693 context: self.context.clone(),
694 };
695
696 let info = jetstream_message.info().map_err(|err| {
697 OrderedError::with_source(OrderedErrorKind::Other, err)
698 })?;
699 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
700 self.consumer_sequence,
701 self.stream_sequence,
702 info.consumer_sequence,
703 info.stream_sequence);
704 if info.consumer_sequence
705 != self.consumer_sequence.load(Ordering::Relaxed) + 1
706 {
707 debug!(
708 "ordered consumer mismatch. current {}, info: {}",
709 self.consumer_sequence.load(Ordering::Relaxed),
710 info.consumer_sequence
711 );
712 self.subscriber = None;
713 self.consumer_sequence.store(0, Ordering::Relaxed);
714 continue;
715 }
716 self.stream_sequence
717 .store(info.stream_sequence, Ordering::Relaxed);
718 self.consumer_sequence
719 .store(info.consumer_sequence, Ordering::Relaxed);
720 return Poll::Ready(Some(Ok(jetstream_message)));
721 }
722 }
723 }
724 None => {
725 return Poll::Ready(None);
726 }
727 },
728 Poll::Pending => return Poll::Pending,
729 }
730 }
731 }
732 }
733}
734
735#[derive(Clone, Debug, PartialEq)]
736pub enum OrderedErrorKind {
737 MissingHeartbeat,
738 ConsumerDeleted,
739 PullBasedConsumer,
740 Recreate,
741 Other,
742}
743
744impl std::fmt::Display for OrderedErrorKind {
745 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746 match self {
747 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
748 Self::ConsumerDeleted => write!(f, "consumer deleted"),
749 Self::Other => write!(f, "error"),
750 Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
751 Self::Recreate => write!(f, "consumer recreation failed"),
752 }
753 }
754}
755
756pub type OrderedError = Error<OrderedErrorKind>;
757
758impl From<MessagesError> for OrderedError {
759 fn from(err: MessagesError) -> Self {
760 match err.kind() {
761 MessagesErrorKind::MissingHeartbeat => {
762 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
763 }
764 MessagesErrorKind::ConsumerDeleted => {
765 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
766 }
767 MessagesErrorKind::PullBasedConsumer => {
768 OrderedError::new(OrderedErrorKind::PullBasedConsumer)
769 }
770 MessagesErrorKind::Other => OrderedError {
771 kind: OrderedErrorKind::Other,
772 source: err.source,
773 },
774 }
775 }
776}
777
778#[derive(Clone, Copy, Debug, PartialEq)]
779pub enum MessagesErrorKind {
780 MissingHeartbeat,
781 ConsumerDeleted,
782 PullBasedConsumer,
783 Other,
784}
785
786impl std::fmt::Display for MessagesErrorKind {
787 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
788 match self {
789 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
790 Self::ConsumerDeleted => write!(f, "consumer deleted"),
791 Self::Other => write!(f, "error"),
792 Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
793 }
794 }
795}
796
797pub type MessagesError = Error<MessagesErrorKind>;
798
799#[derive(Clone, Copy, Debug, PartialEq)]
800pub enum ConsumerRecreateErrorKind {
801 GetStream,
802 Subscription,
803 Recreate,
804 TimedOut,
805}
806
807impl std::fmt::Display for ConsumerRecreateErrorKind {
808 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809 match self {
810 Self::GetStream => write!(f, "error getting stream"),
811 Self::Recreate => write!(f, "consumer creation failed"),
812 Self::TimedOut => write!(f, "timed out"),
813 Self::Subscription => write!(f, "failed to resubscribe"),
814 }
815 }
816}
817
818pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
819
820async fn recreate_consumer_and_subscription(
821 context: Context,
822 mut config: OrderedConfig,
823 stream_name: String,
824 sequence: u64,
825) -> Result<Subscriber, ConsumerRecreateError> {
826 let delivery_subject = context.client.new_inbox();
827 config.deliver_subject = delivery_subject;
828
829 let subscriber = context
830 .client
831 .subscribe(config.deliver_subject.clone())
832 .await
833 .map_err(|err| {
834 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
835 })?;
836
837 recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
838 Ok(subscriber)
839}
840async fn recreate_ephemeral_consumer(
841 context: Context,
842 config: OrderedConfig,
843 stream_name: String,
844 sequence: u64,
845) -> Result<(), ConsumerRecreateError> {
846 let strategy =
847 tryhard::RetryFutureConfig::new(5).exponential_backoff(Duration::from_millis(500));
848
849 let stream = tryhard::retry_fn(|| context.get_stream(stream_name.clone()))
850 .with_config(strategy)
851 .await
852 .map_err(|err| {
853 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
854 })?;
855
856 let deliver_policy = {
857 if sequence == 0 {
858 DeliverPolicy::All
859 } else {
860 DeliverPolicy::ByStartSequence {
861 start_sequence: sequence + 1,
862 }
863 }
864 };
865
866 tryhard::retry_fn(|| {
867 let config = config.clone();
868 tokio::time::timeout(
869 Duration::from_secs(5),
870 stream.create_consumer(jetstream::consumer::push::OrderedConfig {
871 deliver_policy,
872 ..config
873 }),
874 )
875 })
876 .with_config(strategy)
877 .await
878 .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
879 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
880
881 Ok(())
882}