1use super::{
15 backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16 StreamError, StreamErrorKind,
17};
18
19#[cfg(feature = "server_2_11")]
20use super::PriorityPolicy;
21
22use crate::{
23 connection::State,
24 error::Error,
25 jetstream::{self, Context, Message},
26 StatusCode, Subscriber,
27};
28
29use bytes::Bytes;
30use futures::{future::BoxFuture, FutureExt};
31use portable_atomic::AtomicU64;
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "server_2_10")]
34use std::collections::HashMap;
35use std::task::{self, Poll};
36use std::{
37 io::{self, ErrorKind},
38 pin::Pin,
39 sync::Arc,
40};
41use std::{sync::atomic::Ordering, time::Duration};
42#[cfg(feature = "server_2_11")]
43use time::{serde::rfc3339, OffsetDateTime};
44use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle};
45use tracing::{debug, trace};
46
47const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
48
49impl Consumer<Config> {
50 pub async fn messages(&self) -> Result<Messages, StreamError> {
94 let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
95 let subscriber = if let Some(ref group) = self.info.config.deliver_group {
96 self.context
97 .client
98 .queue_subscribe(deliver_subject, group.to_owned())
99 .await
100 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
101 } else {
102 self.context
103 .client
104 .subscribe(deliver_subject)
105 .await
106 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
107 };
108
109 Ok(Messages {
110 context: self.context.clone(),
111 config: self.config.clone(),
112 subscriber,
113 heartbeat_sleep: None,
114 })
115 }
116}
117
118pub struct Messages {
119 context: Context,
120 subscriber: Subscriber,
121 config: Config,
122 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
123}
124
125impl futures::Stream for Messages {
126 type Item = Result<Message, MessagesError>;
127
128 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
129 if !self.config.idle_heartbeat.is_zero() {
130 let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
131 match self
132 .heartbeat_sleep
133 .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
134 .poll_unpin(cx)
135 {
136 Poll::Ready(_) => {
137 self.heartbeat_sleep = None;
138 return Poll::Ready(Some(Err(MessagesError::new(
139 MessagesErrorKind::MissingHeartbeat,
140 ))));
141 }
142 Poll::Pending => (),
143 }
144 }
145 loop {
146 match self.subscriber.receiver.poll_recv(cx) {
147 Poll::Ready(maybe_message) => {
148 self.heartbeat_sleep = None;
149 match maybe_message {
150 Some(message) => match message.status {
151 Some(StatusCode::IDLE_HEARTBEAT) => {
152 if let Some(subject) = message.reply {
153 let client = self.context.client.clone();
155 tokio::task::spawn(async move {
156 client
157 .publish(subject, Bytes::from_static(b""))
158 .await
159 .unwrap();
160 });
161 }
162
163 continue;
164 }
165 Some(_) => {
166 continue;
167 }
168 None => {
169 return Poll::Ready(Some(Ok(jetstream::Message {
170 context: self.context.clone(),
171 message,
172 })))
173 }
174 },
175 None => return Poll::Ready(None),
176 }
177 }
178 Poll::Pending => return Poll::Pending,
179 }
180 }
181 }
182}
183
184#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
188pub struct Config {
189 #[serde(default)]
191 pub deliver_subject: String,
192 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub durable_name: Option<String>,
209 #[serde(default, skip_serializing_if = "Option::is_none")]
212 pub name: Option<String>,
213 #[serde(default, skip_serializing_if = "Option::is_none")]
215 pub description: Option<String>,
216 #[serde(default, skip_serializing_if = "Option::is_none")]
217 pub deliver_group: Option<String>,
219 #[serde(flatten)]
221 pub deliver_policy: DeliverPolicy,
222 pub ack_policy: AckPolicy,
224 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
226 pub ack_wait: Duration,
227 #[serde(default, skip_serializing_if = "is_default")]
229 pub max_deliver: i64,
230 #[serde(default, skip_serializing_if = "is_default")]
232 pub filter_subject: String,
233 #[cfg(feature = "server_2_10")]
234 #[serde(default, skip_serializing_if = "is_default")]
236 pub filter_subjects: Vec<String>,
237 pub replay_policy: ReplayPolicy,
239 #[serde(default, skip_serializing_if = "is_default")]
241 pub rate_limit: u64,
242 #[serde(
244 rename = "sample_freq",
245 with = "super::sample_freq_deser",
246 default,
247 skip_serializing_if = "is_default"
248 )]
249 pub sample_frequency: u8,
250 #[serde(default, skip_serializing_if = "is_default")]
252 pub max_waiting: i64,
253 #[serde(default, skip_serializing_if = "is_default")]
257 pub max_ack_pending: i64,
258 #[serde(default, skip_serializing_if = "is_default")]
260 pub headers_only: bool,
261 #[serde(default, skip_serializing_if = "is_default")]
263 pub flow_control: bool,
264 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
266 pub idle_heartbeat: Duration,
267 #[serde(default, skip_serializing_if = "is_default")]
269 pub num_replicas: usize,
270 #[serde(default, skip_serializing_if = "is_default")]
272 pub memory_storage: bool,
273 #[cfg(feature = "server_2_10")]
274 #[serde(default, skip_serializing_if = "is_default")]
276 pub metadata: HashMap<String, String>,
277 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
279 pub backoff: Vec<Duration>,
280 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
282 pub inactive_threshold: Duration,
283 #[cfg(feature = "server_2_11")]
285 #[serde(
286 default,
287 with = "rfc3339::option",
288 skip_serializing_if = "Option::is_none"
289 )]
290 pub pause_until: Option<OffsetDateTime>,
291}
292
293impl FromConsumer for Config {
294 fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
295 if config.deliver_subject.is_none() {
296 return Err(Box::new(io::Error::new(
297 ErrorKind::Other,
298 "push consumer must have delivery subject",
299 )));
300 }
301
302 Ok(Config {
303 deliver_subject: config.deliver_subject.unwrap(),
304 durable_name: config.durable_name,
305 name: config.name,
306 description: config.description,
307 deliver_group: config.deliver_group,
308 deliver_policy: config.deliver_policy,
309 ack_policy: config.ack_policy,
310 ack_wait: config.ack_wait,
311 max_deliver: config.max_deliver,
312 filter_subject: config.filter_subject,
313 #[cfg(feature = "server_2_10")]
314 filter_subjects: config.filter_subjects,
315 replay_policy: config.replay_policy,
316 rate_limit: config.rate_limit,
317 sample_frequency: config.sample_frequency,
318 max_waiting: config.max_waiting,
319 max_ack_pending: config.max_ack_pending,
320 headers_only: config.headers_only,
321 flow_control: config.flow_control,
322 idle_heartbeat: config.idle_heartbeat,
323 num_replicas: config.num_replicas,
324 memory_storage: config.memory_storage,
325 #[cfg(feature = "server_2_10")]
326 metadata: config.metadata,
327 backoff: config.backoff,
328 inactive_threshold: config.inactive_threshold,
329 #[cfg(feature = "server_2_11")]
330 pause_until: config.pause_until,
331 })
332 }
333}
334
335impl IntoConsumerConfig for Config {
336 fn into_consumer_config(self) -> jetstream::consumer::Config {
337 jetstream::consumer::Config {
338 deliver_subject: Some(self.deliver_subject),
339 durable_name: self.durable_name,
340 name: self.name,
341 description: self.description,
342 deliver_group: self.deliver_group,
343 deliver_policy: self.deliver_policy,
344 ack_policy: self.ack_policy,
345 ack_wait: self.ack_wait,
346 max_deliver: self.max_deliver,
347 filter_subject: self.filter_subject,
348 #[cfg(feature = "server_2_10")]
349 filter_subjects: self.filter_subjects,
350 replay_policy: self.replay_policy,
351 rate_limit: self.rate_limit,
352 sample_frequency: self.sample_frequency,
353 max_waiting: self.max_waiting,
354 max_ack_pending: self.max_ack_pending,
355 headers_only: self.headers_only,
356 flow_control: self.flow_control,
357 idle_heartbeat: self.idle_heartbeat,
358 max_batch: 0,
359 max_bytes: 0,
360 max_expires: Duration::default(),
361 inactive_threshold: self.inactive_threshold,
362 num_replicas: self.num_replicas,
363 memory_storage: self.memory_storage,
364 #[cfg(feature = "server_2_10")]
365 metadata: self.metadata,
366 backoff: self.backoff,
367 #[cfg(feature = "server_2_11")]
368 priority_policy: PriorityPolicy::None,
369 #[cfg(feature = "server_2_11")]
370 priority_groups: Vec::new(),
371 #[cfg(feature = "server_2_11")]
372 pause_until: self.pause_until,
373 }
374 }
375}
376impl IntoConsumerConfig for &Config {
377 fn into_consumer_config(self) -> jetstream::consumer::Config {
378 self.clone().into_consumer_config()
379 }
380}
381fn is_default<T: Default + Eq>(t: &T) -> bool {
382 t == &T::default()
383}
384
385#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
389pub struct OrderedConfig {
390 #[serde(default)]
392 pub deliver_subject: String,
393 #[serde(default, skip_serializing_if = "Option::is_none")]
396 pub name: Option<String>,
397 #[serde(default, skip_serializing_if = "Option::is_none")]
399 pub description: Option<String>,
400 #[serde(default, skip_serializing_if = "is_default")]
401 pub filter_subject: String,
402 #[cfg(feature = "server_2_10")]
403 #[serde(default, skip_serializing_if = "is_default")]
405 pub filter_subjects: Vec<String>,
406 pub replay_policy: ReplayPolicy,
408 #[serde(default, skip_serializing_if = "is_default")]
410 pub rate_limit: u64,
411 #[serde(
413 rename = "sample_freq",
414 with = "super::sample_freq_deser",
415 default,
416 skip_serializing_if = "is_default"
417 )]
418 pub sample_frequency: u8,
419 #[serde(default, skip_serializing_if = "is_default")]
421 pub headers_only: bool,
422 #[serde(flatten)]
424 pub deliver_policy: DeliverPolicy,
425 #[serde(default, skip_serializing_if = "is_default")]
427 pub max_waiting: i64,
428 #[cfg(feature = "server_2_10")]
429 #[serde(default, skip_serializing_if = "is_default")]
431 pub metadata: HashMap<String, String>,
432}
433
434impl FromConsumer for OrderedConfig {
435 fn try_from_consumer_config(
436 config: crate::jetstream::consumer::Config,
437 ) -> Result<Self, crate::Error>
438 where
439 Self: Sized,
440 {
441 if config.deliver_subject.is_none() {
442 return Err(Box::new(io::Error::new(
443 ErrorKind::Other,
444 "push consumer must have delivery subject",
445 )));
446 }
447 Ok(OrderedConfig {
448 name: config.name,
449 deliver_subject: config.deliver_subject.unwrap(),
450 description: config.description,
451 filter_subject: config.filter_subject,
452 #[cfg(feature = "server_2_10")]
453 filter_subjects: config.filter_subjects,
454 replay_policy: config.replay_policy,
455 rate_limit: config.rate_limit,
456 sample_frequency: config.sample_frequency,
457 headers_only: config.headers_only,
458 deliver_policy: config.deliver_policy,
459 max_waiting: config.max_waiting,
460 #[cfg(feature = "server_2_10")]
461 metadata: config.metadata,
462 })
463 }
464}
465
466impl IntoConsumerConfig for OrderedConfig {
467 fn into_consumer_config(self) -> super::Config {
468 jetstream::consumer::Config {
469 deliver_subject: Some(self.deliver_subject),
470 durable_name: None,
471 name: self.name,
472 description: self.description,
473 deliver_group: None,
474 deliver_policy: self.deliver_policy,
475 ack_policy: AckPolicy::None,
476 ack_wait: Duration::default(),
477 max_deliver: 1,
478 filter_subject: self.filter_subject,
479 #[cfg(feature = "server_2_10")]
480 filter_subjects: self.filter_subjects,
481 replay_policy: self.replay_policy,
482 rate_limit: self.rate_limit,
483 sample_frequency: self.sample_frequency,
484 max_waiting: self.max_waiting,
485 max_ack_pending: 0,
486 headers_only: self.headers_only,
487 flow_control: true,
488 idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
489 max_batch: 0,
490 max_bytes: 0,
491 max_expires: Duration::default(),
492 inactive_threshold: Duration::from_secs(30),
493 num_replicas: 1,
494 memory_storage: true,
495 #[cfg(feature = "server_2_10")]
496 metadata: self.metadata,
497 backoff: Vec::new(),
498 #[cfg(feature = "server_2_11")]
499 priority_policy: PriorityPolicy::None,
500 #[cfg(feature = "server_2_11")]
501 priority_groups: Vec::new(),
502 #[cfg(feature = "server_2_11")]
503 pause_until: None,
504 }
505 }
506}
507
508impl Consumer<OrderedConfig> {
509 pub async fn messages(self) -> Result<Ordered, StreamError> {
510 let subscriber = self
511 .context
512 .client
513 .subscribe(self.info.config.deliver_subject.clone().unwrap())
514 .await
515 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
516
517 let last_sequence = Arc::new(AtomicU64::new(0));
518 let consumer_sequence = Arc::new(AtomicU64::new(0));
519 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
520 let handle = tokio::task::spawn({
521 let stream_name = self.info.stream_name.clone();
522 let config = self.config.clone();
523 let mut context = self.context.clone();
524 let last_sequence = last_sequence.clone();
525 let consumer_sequence = consumer_sequence.clone();
526 let state = self.context.client.state.clone();
527 async move {
528 loop {
529 let current_state = state.borrow().to_owned();
530
531 context.client.state.changed().await.unwrap();
532 if state.borrow().to_owned() != State::Connected
534 || current_state == State::Connected
535 {
536 continue;
537 }
538 debug!("reconnected. trigger consumer recreation");
539
540 debug!(
541 "idle heartbeats expired. recreating consumer s: {}, {:?}",
542 stream_name, config
543 );
544 let consumer = tryhard::retry_fn(|| {
545 recreate_ephemeral_consumer(
546 context.clone(),
547 config.clone(),
548 stream_name.clone(),
549 last_sequence.load(Ordering::Relaxed),
550 )
551 })
552 .retries(u32::MAX)
553 .custom_backoff(backoff)
554 .await;
555 if let Err(err) = consumer {
556 shutdown_tx.send(err).unwrap();
557 break;
558 }
559 debug!("resetting consume sequence to 0");
560 consumer_sequence.store(0, Ordering::Relaxed);
561 }
562 }
563 });
564
565 Ok(Ordered {
566 context: self.context.clone(),
567 consumer: self,
568 subscriber: Some(subscriber),
569 subscriber_future: None,
570 stream_sequence: last_sequence,
571 consumer_sequence,
572 shutdown: shutdown_rx,
573 handle,
574 heartbeat_sleep: None,
575 })
576 }
577}
578
579pub struct Ordered {
580 context: Context,
581 consumer: Consumer<OrderedConfig>,
582 subscriber: Option<Subscriber>,
583 subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
584 stream_sequence: Arc<AtomicU64>,
585 consumer_sequence: Arc<AtomicU64>,
586 shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
587 handle: JoinHandle<()>,
588 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
589}
590
591impl Drop for Ordered {
592 fn drop(&mut self) {
593 self.handle.abort()
595 }
596}
597
598impl futures::Stream for Ordered {
599 type Item = Result<Message, OrderedError>;
600
601 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
602 match self
603 .heartbeat_sleep
604 .get_or_insert_with(|| {
605 Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
606 })
607 .poll_unpin(cx)
608 {
609 Poll::Ready(_) => {
610 self.heartbeat_sleep = None;
611 return Poll::Ready(Some(Err(OrderedError::new(
612 OrderedErrorKind::MissingHeartbeat,
613 ))));
614 }
615 Poll::Pending => (),
616 }
617
618 loop {
619 match self.shutdown.try_recv() {
620 Ok(err) => {
621 return Poll::Ready(Some(Err(OrderedError::with_source(
622 OrderedErrorKind::Other,
623 err,
624 ))))
625 }
626 Err(TryRecvError::Closed) => {
627 return Poll::Ready(Some(Err(OrderedError::with_source(
628 OrderedErrorKind::Other,
629 "consumer task closed",
630 ))))
631 }
632 Err(TryRecvError::Empty) => {}
633 }
634 if self.subscriber.is_none() {
635 match self.subscriber_future.as_mut() {
636 None => {
637 trace!(
638 "subscriber and subscriber future are None. Recreating the consumer"
639 );
640 let context = self.context.clone();
641 let sequence = self.stream_sequence.clone();
642 let config = self.consumer.config.clone();
643 let stream_name = self.consumer.info.stream_name.clone();
644 let subscriber_future =
645 self.subscriber_future.insert(Box::pin(async move {
646 recreate_consumer_and_subscription(
647 context,
648 config,
649 stream_name,
650 sequence.load(Ordering::Relaxed),
651 )
652 .await
653 }));
654 match subscriber_future.as_mut().poll(cx) {
655 Poll::Ready(subscriber) => {
656 self.subscriber_future = None;
657 self.consumer_sequence.store(0, Ordering::Relaxed);
658 self.subscriber = Some(subscriber.map_err(|err| {
659 OrderedError::with_source(OrderedErrorKind::Recreate, err)
660 })?);
661 }
662 Poll::Pending => {
663 return Poll::Pending;
664 }
665 }
666 }
667 Some(subscriber) => match subscriber.as_mut().poll(cx) {
668 Poll::Ready(subscriber) => {
669 self.subscriber_future = None;
670 self.consumer_sequence.store(0, Ordering::Relaxed);
671 self.subscriber = Some(subscriber.map_err(|err| {
672 OrderedError::with_source(OrderedErrorKind::Recreate, err)
673 })?);
674 }
675 Poll::Pending => {
676 return Poll::Pending;
677 }
678 },
679 }
680 }
681 if let Some(subscriber) = self.subscriber.as_mut() {
682 match subscriber.receiver.poll_recv(cx) {
683 Poll::Ready(maybe_message) => match maybe_message {
684 Some(message) => {
685 self.heartbeat_sleep = None;
686 match message.status {
687 Some(StatusCode::IDLE_HEARTBEAT) => {
688 debug!("received idle heartbeats");
689 if let Some(headers) = message.headers.as_ref() {
690 if let Some(sequence) =
691 headers.get_last(crate::header::NATS_LAST_CONSUMER)
692 {
693 let sequence: u64 =
694 sequence.as_str().parse().map_err(|err| {
695 OrderedError::with_source(
696 OrderedErrorKind::Other,
697 err,
698 )
699 })?;
700
701 let last_sequence =
702 self.consumer_sequence.load(Ordering::Relaxed);
703
704 if sequence != last_sequence {
705 debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
706 self.subscriber = None;
707 }
708 }
709 }
710 if let Some(subject) = message.reply.clone() {
712 trace!("received flow control message");
713 let client = self.context.client.clone();
714 tokio::task::spawn(async move {
715 client
716 .publish(subject, Bytes::from_static(b""))
717 .await
718 .ok();
719 });
720 }
721 continue;
722 }
723 Some(status) => {
724 debug!("received status message: {}", status);
725 continue;
726 }
727 None => {
728 trace!("received a message");
729 let jetstream_message = jetstream::message::Message {
730 message,
731 context: self.context.clone(),
732 };
733
734 let info = jetstream_message.info().map_err(|err| {
735 OrderedError::with_source(OrderedErrorKind::Other, err)
736 })?;
737 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
738 self.consumer_sequence,
739 self.stream_sequence,
740 info.consumer_sequence,
741 info.stream_sequence);
742 if info.consumer_sequence
743 != self.consumer_sequence.load(Ordering::Relaxed) + 1
744 {
745 debug!(
746 "ordered consumer mismatch. current {}, info: {}",
747 self.consumer_sequence.load(Ordering::Relaxed),
748 info.consumer_sequence
749 );
750 self.subscriber = None;
751 self.consumer_sequence.store(0, Ordering::Relaxed);
752 continue;
753 }
754 self.stream_sequence
755 .store(info.stream_sequence, Ordering::Relaxed);
756 self.consumer_sequence
757 .store(info.consumer_sequence, Ordering::Relaxed);
758 return Poll::Ready(Some(Ok(jetstream_message)));
759 }
760 }
761 }
762 None => {
763 return Poll::Ready(None);
764 }
765 },
766 Poll::Pending => return Poll::Pending,
767 }
768 }
769 }
770 }
771}
772
773#[derive(Clone, Debug, PartialEq)]
774pub enum OrderedErrorKind {
775 MissingHeartbeat,
776 ConsumerDeleted,
777 PullBasedConsumer,
778 Recreate,
779 Other,
780}
781
782impl std::fmt::Display for OrderedErrorKind {
783 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
784 match self {
785 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
786 Self::ConsumerDeleted => write!(f, "consumer deleted"),
787 Self::Other => write!(f, "error"),
788 Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
789 Self::Recreate => write!(f, "consumer recreation failed"),
790 }
791 }
792}
793
794pub type OrderedError = Error<OrderedErrorKind>;
795
796impl From<MessagesError> for OrderedError {
797 fn from(err: MessagesError) -> Self {
798 match err.kind() {
799 MessagesErrorKind::MissingHeartbeat => {
800 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
801 }
802 MessagesErrorKind::ConsumerDeleted => {
803 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
804 }
805 MessagesErrorKind::PullBasedConsumer => {
806 OrderedError::new(OrderedErrorKind::PullBasedConsumer)
807 }
808 MessagesErrorKind::Other => OrderedError {
809 kind: OrderedErrorKind::Other,
810 source: err.source,
811 },
812 }
813 }
814}
815
816#[derive(Clone, Copy, Debug, PartialEq)]
817pub enum MessagesErrorKind {
818 MissingHeartbeat,
819 ConsumerDeleted,
820 PullBasedConsumer,
821 Other,
822}
823
824impl std::fmt::Display for MessagesErrorKind {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 match self {
827 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
828 Self::ConsumerDeleted => write!(f, "consumer deleted"),
829 Self::Other => write!(f, "error"),
830 Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
831 }
832 }
833}
834
835pub type MessagesError = Error<MessagesErrorKind>;
836
837#[derive(Clone, Copy, Debug, PartialEq)]
838pub enum ConsumerRecreateErrorKind {
839 GetStream,
840 Subscription,
841 Recreate,
842 TimedOut,
843}
844
845impl std::fmt::Display for ConsumerRecreateErrorKind {
846 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
847 match self {
848 Self::GetStream => write!(f, "error getting stream"),
849 Self::Recreate => write!(f, "consumer creation failed"),
850 Self::TimedOut => write!(f, "timed out"),
851 Self::Subscription => write!(f, "failed to resubscribe"),
852 }
853 }
854}
855
856pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
857
858async fn recreate_consumer_and_subscription(
859 context: Context,
860 mut config: OrderedConfig,
861 stream_name: String,
862 sequence: u64,
863) -> Result<Subscriber, ConsumerRecreateError> {
864 let delivery_subject = context.client.new_inbox();
865 config.deliver_subject = delivery_subject;
866
867 let subscriber = context
868 .client
869 .subscribe(config.deliver_subject.clone())
870 .await
871 .map_err(|err| {
872 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
873 })?;
874
875 recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
876 Ok(subscriber)
877}
878async fn recreate_ephemeral_consumer(
879 context: Context,
880 config: OrderedConfig,
881 stream_name: String,
882 sequence: u64,
883) -> Result<(), ConsumerRecreateError> {
884 let stream = tryhard::retry_fn(|| context.get_stream(stream_name.clone()))
885 .retries(u32::MAX)
886 .custom_backoff(backoff)
887 .await
888 .map_err(|err| {
889 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
890 })?;
891
892 let deliver_policy = {
893 if sequence == 0 {
894 DeliverPolicy::All
895 } else {
896 DeliverPolicy::ByStartSequence {
897 start_sequence: sequence + 1,
898 }
899 }
900 };
901
902 tryhard::retry_fn(|| {
903 let config = config.clone();
904 tokio::time::timeout(
905 Duration::from_secs(5),
906 stream.create_consumer(jetstream::consumer::push::OrderedConfig {
907 deliver_policy,
908 ..config
909 }),
910 )
911 })
912 .retries(u32::MAX)
913 .custom_backoff(backoff)
914 .await
915 .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
916 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
917
918 Ok(())
919}