1use super::{
15 backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16 StreamError, StreamErrorKind,
17};
18
19#[cfg(feature = "server_2_11")]
20use super::PriorityPolicy;
21
22use crate::{
23 connection::State,
24 error::Error,
25 jetstream::{self, Context, Message},
26 StatusCode, Subscriber,
27};
28
29use bytes::Bytes;
30use futures_util::{future::BoxFuture, FutureExt};
31use portable_atomic::AtomicU64;
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "server_2_10")]
34use std::collections::HashMap;
35use std::task::{self, Poll};
36use std::{
37 io::{self},
38 pin::Pin,
39 sync::Arc,
40};
41use std::{sync::atomic::Ordering, time::Duration};
42#[cfg(feature = "server_2_11")]
43use time::{serde::rfc3339, OffsetDateTime};
44use tracing::{debug, trace};
45
46const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
47
48impl Consumer<Config> {
49 pub async fn messages(&self) -> Result<Messages, StreamError> {
93 let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
94 let subscriber = if let Some(ref group) = self.info.config.deliver_group {
95 self.context
96 .client
97 .queue_subscribe(deliver_subject, group.to_owned())
98 .await
99 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
100 } else {
101 self.context
102 .client
103 .subscribe(deliver_subject)
104 .await
105 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
106 };
107
108 Ok(Messages {
109 context: self.context.clone(),
110 config: self.config.clone(),
111 subscriber,
112 heartbeat_sleep: None,
113 })
114 }
115}
116
117pub struct Messages {
118 context: Context,
119 subscriber: Subscriber,
120 config: Config,
121 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
122}
123
124impl futures_util::Stream for Messages {
125 type Item = Result<Message, MessagesError>;
126
127 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
128 if !self.config.idle_heartbeat.is_zero() {
129 let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
130 match self
131 .heartbeat_sleep
132 .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
133 .poll_unpin(cx)
134 {
135 Poll::Ready(_) => {
136 self.heartbeat_sleep = None;
137 return Poll::Ready(Some(Err(MessagesError::new(
138 MessagesErrorKind::MissingHeartbeat,
139 ))));
140 }
141 Poll::Pending => (),
142 }
143 }
144 loop {
145 match self.subscriber.receiver.poll_recv(cx) {
146 Poll::Ready(maybe_message) => {
147 self.heartbeat_sleep = None;
148 match maybe_message {
149 Some(message) => match message.status {
150 Some(StatusCode::IDLE_HEARTBEAT) => {
151 if let Some(subject) = message.reply {
152 let client = self.context.client.clone();
154 tokio::task::spawn(async move {
155 client
156 .publish(subject, Bytes::from_static(b""))
157 .await
158 .unwrap();
159 });
160 }
161
162 continue;
163 }
164 Some(_) => {
165 continue;
166 }
167 None => {
168 return Poll::Ready(Some(Ok(jetstream::Message {
169 context: self.context.clone(),
170 message,
171 })))
172 }
173 },
174 None => return Poll::Ready(None),
175 }
176 }
177 Poll::Pending => return Poll::Pending,
178 }
179 }
180 }
181}
182
183#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
187pub struct Config {
188 #[serde(default)]
190 pub deliver_subject: String,
191 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub durable_name: Option<String>,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
211 pub name: Option<String>,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
214 pub description: Option<String>,
215 #[serde(default, skip_serializing_if = "Option::is_none")]
216 pub deliver_group: Option<String>,
218 #[serde(flatten)]
220 pub deliver_policy: DeliverPolicy,
221 pub ack_policy: AckPolicy,
223 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
225 pub ack_wait: Duration,
226 #[serde(default, skip_serializing_if = "is_default")]
228 pub max_deliver: i64,
229 #[serde(default, skip_serializing_if = "is_default")]
231 pub filter_subject: String,
232 #[cfg(feature = "server_2_10")]
233 #[serde(default, skip_serializing_if = "is_default")]
235 pub filter_subjects: Vec<String>,
236 pub replay_policy: ReplayPolicy,
238 #[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
240 pub rate_limit: u64,
241 #[serde(
243 rename = "sample_freq",
244 with = "super::sample_freq_deser",
245 default,
246 skip_serializing_if = "is_default"
247 )]
248 pub sample_frequency: u8,
249 #[serde(default, skip_serializing_if = "is_default")]
251 pub max_waiting: i64,
252 #[serde(default, skip_serializing_if = "is_default")]
256 pub max_ack_pending: i64,
257 #[serde(default, skip_serializing_if = "is_default")]
259 pub headers_only: bool,
260 #[serde(default, skip_serializing_if = "is_default")]
262 pub flow_control: bool,
263 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
265 pub idle_heartbeat: Duration,
266 #[serde(default, skip_serializing_if = "is_default")]
268 pub num_replicas: usize,
269 #[serde(default, skip_serializing_if = "is_default")]
271 pub memory_storage: bool,
272 #[cfg(feature = "server_2_10")]
273 #[serde(default, skip_serializing_if = "is_default")]
275 pub metadata: HashMap<String, String>,
276 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
278 pub backoff: Vec<Duration>,
279 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
281 pub inactive_threshold: Duration,
282 #[cfg(feature = "server_2_11")]
284 #[serde(
285 default,
286 with = "rfc3339::option",
287 skip_serializing_if = "Option::is_none"
288 )]
289 pub pause_until: Option<OffsetDateTime>,
290}
291
292impl FromConsumer for Config {
293 fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
294 if config.deliver_subject.is_none() {
295 return Err(Box::new(io::Error::other(
296 "push consumer must have delivery subject",
297 )));
298 }
299
300 Ok(Config {
301 deliver_subject: config.deliver_subject.unwrap(),
302 durable_name: config.durable_name,
303 name: config.name,
304 description: config.description,
305 deliver_group: config.deliver_group,
306 deliver_policy: config.deliver_policy,
307 ack_policy: config.ack_policy,
308 ack_wait: config.ack_wait,
309 max_deliver: config.max_deliver,
310 filter_subject: config.filter_subject,
311 #[cfg(feature = "server_2_10")]
312 filter_subjects: config.filter_subjects,
313 replay_policy: config.replay_policy,
314 rate_limit: config.rate_limit,
315 sample_frequency: config.sample_frequency,
316 max_waiting: config.max_waiting,
317 max_ack_pending: config.max_ack_pending,
318 headers_only: config.headers_only,
319 flow_control: config.flow_control,
320 idle_heartbeat: config.idle_heartbeat,
321 num_replicas: config.num_replicas,
322 memory_storage: config.memory_storage,
323 #[cfg(feature = "server_2_10")]
324 metadata: config.metadata,
325 backoff: config.backoff,
326 inactive_threshold: config.inactive_threshold,
327 #[cfg(feature = "server_2_11")]
328 pause_until: config.pause_until,
329 })
330 }
331}
332
333impl IntoConsumerConfig for Config {
334 fn into_consumer_config(self) -> jetstream::consumer::Config {
335 jetstream::consumer::Config {
336 deliver_subject: Some(self.deliver_subject),
337 durable_name: self.durable_name,
338 name: self.name,
339 description: self.description,
340 deliver_group: self.deliver_group,
341 deliver_policy: self.deliver_policy,
342 ack_policy: self.ack_policy,
343 ack_wait: self.ack_wait,
344 max_deliver: self.max_deliver,
345 filter_subject: self.filter_subject,
346 #[cfg(feature = "server_2_10")]
347 filter_subjects: self.filter_subjects,
348 replay_policy: self.replay_policy,
349 rate_limit: self.rate_limit,
350 sample_frequency: self.sample_frequency,
351 max_waiting: self.max_waiting,
352 max_ack_pending: self.max_ack_pending,
353 headers_only: self.headers_only,
354 flow_control: self.flow_control,
355 idle_heartbeat: self.idle_heartbeat,
356 max_batch: 0,
357 max_bytes: 0,
358 max_expires: Duration::default(),
359 inactive_threshold: self.inactive_threshold,
360 num_replicas: self.num_replicas,
361 memory_storage: self.memory_storage,
362 #[cfg(feature = "server_2_10")]
363 metadata: self.metadata,
364 backoff: self.backoff,
365 #[cfg(feature = "server_2_11")]
366 priority_policy: PriorityPolicy::None,
367 #[cfg(feature = "server_2_11")]
368 priority_groups: Vec::new(),
369 #[cfg(feature = "server_2_11")]
370 pause_until: self.pause_until,
371 }
372 }
373}
374impl IntoConsumerConfig for &Config {
375 fn into_consumer_config(self) -> jetstream::consumer::Config {
376 self.clone().into_consumer_config()
377 }
378}
379fn is_default<T: Default + Eq>(t: &T) -> bool {
380 t == &T::default()
381}
382
383#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
387pub struct OrderedConfig {
388 #[serde(default)]
390 pub deliver_subject: String,
391 #[serde(default, skip_serializing_if = "Option::is_none")]
394 pub name: Option<String>,
395 #[serde(default, skip_serializing_if = "Option::is_none")]
397 pub description: Option<String>,
398 #[serde(default, skip_serializing_if = "is_default")]
399 pub filter_subject: String,
400 #[cfg(feature = "server_2_10")]
401 #[serde(default, skip_serializing_if = "is_default")]
403 pub filter_subjects: Vec<String>,
404 pub replay_policy: ReplayPolicy,
406 #[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
408 pub rate_limit: u64,
409 #[serde(
411 rename = "sample_freq",
412 with = "super::sample_freq_deser",
413 default,
414 skip_serializing_if = "is_default"
415 )]
416 pub sample_frequency: u8,
417 #[serde(default, skip_serializing_if = "is_default")]
419 pub headers_only: bool,
420 #[serde(flatten)]
422 pub deliver_policy: DeliverPolicy,
423 #[serde(default, skip_serializing_if = "is_default")]
425 pub max_waiting: i64,
426 #[cfg(feature = "server_2_10")]
427 #[serde(default, skip_serializing_if = "is_default")]
429 pub metadata: HashMap<String, String>,
430}
431
432impl FromConsumer for OrderedConfig {
433 fn try_from_consumer_config(
434 config: crate::jetstream::consumer::Config,
435 ) -> Result<Self, crate::Error>
436 where
437 Self: Sized,
438 {
439 if config.deliver_subject.is_none() {
440 return Err(Box::new(io::Error::other(
441 "push consumer must have delivery subject",
442 )));
443 }
444 Ok(OrderedConfig {
445 name: config.name,
446 deliver_subject: config.deliver_subject.unwrap(),
447 description: config.description,
448 filter_subject: config.filter_subject,
449 #[cfg(feature = "server_2_10")]
450 filter_subjects: config.filter_subjects,
451 replay_policy: config.replay_policy,
452 rate_limit: config.rate_limit,
453 sample_frequency: config.sample_frequency,
454 headers_only: config.headers_only,
455 deliver_policy: config.deliver_policy,
456 max_waiting: config.max_waiting,
457 #[cfg(feature = "server_2_10")]
458 metadata: config.metadata,
459 })
460 }
461}
462
463impl IntoConsumerConfig for OrderedConfig {
464 fn into_consumer_config(self) -> super::Config {
465 jetstream::consumer::Config {
466 deliver_subject: Some(self.deliver_subject),
467 durable_name: None,
468 name: self.name,
469 description: self.description,
470 deliver_group: None,
471 deliver_policy: self.deliver_policy,
472 ack_policy: AckPolicy::None,
473 ack_wait: Duration::default(),
474 max_deliver: 1,
475 filter_subject: self.filter_subject,
476 #[cfg(feature = "server_2_10")]
477 filter_subjects: self.filter_subjects,
478 replay_policy: self.replay_policy,
479 rate_limit: self.rate_limit,
480 sample_frequency: self.sample_frequency,
481 max_waiting: self.max_waiting,
482 max_ack_pending: 0,
483 headers_only: self.headers_only,
484 flow_control: true,
485 idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
486 max_batch: 0,
487 max_bytes: 0,
488 max_expires: Duration::default(),
489 inactive_threshold: Duration::from_secs(30),
490 num_replicas: 1,
491 memory_storage: true,
492 #[cfg(feature = "server_2_10")]
493 metadata: self.metadata,
494 backoff: Vec::new(),
495 #[cfg(feature = "server_2_11")]
496 priority_policy: PriorityPolicy::None,
497 #[cfg(feature = "server_2_11")]
498 priority_groups: Vec::new(),
499 #[cfg(feature = "server_2_11")]
500 pause_until: None,
501 }
502 }
503}
504
505impl Consumer<OrderedConfig> {
506 pub async fn messages(self) -> Result<Ordered, StreamError> {
507 let subscriber = self
508 .context
509 .client
510 .subscribe(self.info.config.deliver_subject.clone().unwrap())
511 .await
512 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
513
514 let last_sequence = Arc::new(AtomicU64::new(0));
515 let consumer_sequence = Arc::new(AtomicU64::new(0));
516
517 let state = self.context.client.state.clone();
518 Ok(Ordered {
519 context: self.context.clone(),
520 consumer: self,
521 subscriber: Some(subscriber),
522 subscriber_future: None,
523 stream_sequence: last_sequence,
524 consumer_sequence,
525 heartbeat_sleep: None,
526 state,
527 last_known_state: State::Connected,
528 state_change_future: None,
529 })
530 }
531}
532
533pub struct Ordered {
534 context: Context,
535 consumer: Consumer<OrderedConfig>,
536 subscriber: Option<Subscriber>,
537 subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
538 stream_sequence: Arc<AtomicU64>,
539 consumer_sequence: Arc<AtomicU64>,
540 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
541 state: tokio::sync::watch::Receiver<State>,
542 last_known_state: State,
543 state_change_future:
544 Option<BoxFuture<'static, Result<(), tokio::sync::watch::error::RecvError>>>,
545}
546
547impl futures_util::Stream for Ordered {
548 type Item = Result<Message, OrderedError>;
549
550 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
551 loop {
552 match &mut self.state_change_future {
554 None => {
555 if self.state.has_changed().unwrap_or(false) {
557 let current_state = self.state.borrow_and_update().clone();
558
559 use State::*;
561 match (&self.last_known_state, ¤t_state) {
562 (Connected, Disconnected | Pending) => {
563 debug!("Connection lost, marking subscriber as invalid");
564 self.subscriber = None;
565 self.heartbeat_sleep = None;
566 }
567 (Disconnected | Pending, Connected) => {
568 debug!("Connection restored, triggering consumer recreation");
569 self.subscriber = None;
570 self.consumer_sequence.store(0, Ordering::Relaxed);
571 self.heartbeat_sleep = None;
572 }
573 _ => {}
574 }
575
576 self.last_known_state = current_state;
577 } else {
578 let mut state = self.state.clone();
580 self.state_change_future =
581 Some(Box::pin(async move { state.changed().await }));
582 }
583 }
584 Some(fut) => match fut.poll_unpin(cx) {
585 Poll::Ready(Ok(())) => {
586 self.state_change_future = None;
587 continue; }
589 Poll::Ready(Err(_)) => {
590 return Poll::Ready(Some(Err(OrderedError::with_source(
591 OrderedErrorKind::Other,
592 "Connection state watcher dropped",
593 ))));
594 }
595 Poll::Pending => {}
596 },
597 }
598
599 if self.subscriber.is_none() {
603 if self.subscriber_future.is_none() {
605 trace!("Creating subscriber recreation future");
606 let context = self.context.clone();
607 let sequence = self.stream_sequence.clone();
608 let config = self.consumer.config.clone();
609 let stream_name = self.consumer.info.stream_name.clone();
610
611 self.subscriber_future = Some(Box::pin(async move {
612 tryhard::retry_fn(|| {
613 recreate_consumer_and_subscription(
614 context.clone(),
615 config.clone(),
616 stream_name.clone(),
617 sequence.load(Ordering::Relaxed),
618 )
619 })
620 .retries(u32::MAX)
621 .custom_backoff(backoff)
622 .await
623 .map_err(|err| {
624 ConsumerRecreateError::with_source(
625 ConsumerRecreateErrorKind::Recreate,
626 err,
627 )
628 })
629 }));
630 }
631
632 if let Some(fut) = &mut self.subscriber_future {
634 match fut.poll_unpin(cx) {
635 Poll::Ready(result) => {
636 self.subscriber_future = None;
637 self.consumer_sequence.store(0, Ordering::Relaxed);
638 self.subscriber = Some(result.map_err(|err| {
639 OrderedError::with_source(OrderedErrorKind::Recreate, err)
640 })?);
641 }
642 Poll::Pending => return Poll::Pending,
643 }
644 }
645 }
646 if let Some(subscriber) = self.subscriber.as_mut() {
648 match subscriber.receiver.poll_recv(cx) {
649 Poll::Ready(None) => return Poll::Ready(None),
650 Poll::Ready(Some(message)) => {
651 self.heartbeat_sleep = None;
652 match message.status {
653 Some(StatusCode::IDLE_HEARTBEAT) => {
654 debug!("received idle heartbeats");
655 if let Some(headers) = message.headers.as_ref() {
656 if let Some(sequence) =
657 headers.get_last(crate::header::NATS_LAST_CONSUMER)
658 {
659 let sequence: u64 =
660 sequence.as_str().parse().map_err(|err| {
661 OrderedError::with_source(
662 OrderedErrorKind::Other,
663 err,
664 )
665 })?;
666
667 let last_sequence =
668 self.consumer_sequence.load(Ordering::Relaxed);
669
670 if sequence != last_sequence {
671 debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
672 self.subscriber = None;
673 self.heartbeat_sleep = None;
674 }
675 }
676 }
677 if let Some(subject) = message.reply.clone() {
679 trace!("received flow control message");
680 let client = self.context.client.clone();
681 tokio::task::spawn(async move {
682 client.publish(subject, Bytes::from_static(b"")).await.ok();
683 });
684 }
685 continue;
686 }
687 Some(status) => {
688 debug!("received status message: {}", status);
689 continue;
690 }
691 None => {
692 trace!("received a message");
693 let jetstream_message = jetstream::message::Message {
694 message,
695 context: self.context.clone(),
696 };
697
698 let info = jetstream_message.info().map_err(|err| {
699 OrderedError::with_source(OrderedErrorKind::Other, err)
700 })?;
701 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
702 self.consumer_sequence,
703 self.stream_sequence,
704 info.consumer_sequence,
705 info.stream_sequence);
706 if info.consumer_sequence
707 != self.consumer_sequence.load(Ordering::Relaxed) + 1
708 {
709 debug!(
710 "ordered consumer mismatch. current {}, info: {}",
711 self.consumer_sequence.load(Ordering::Relaxed),
712 info.consumer_sequence
713 );
714 self.subscriber = None;
715 self.consumer_sequence.store(0, Ordering::Relaxed);
716 self.heartbeat_sleep = None;
717 continue;
718 }
719 self.stream_sequence
720 .store(info.stream_sequence, Ordering::Relaxed);
721 self.consumer_sequence
722 .store(info.consumer_sequence, Ordering::Relaxed);
723 return Poll::Ready(Some(Ok(jetstream_message)));
724 }
725 }
726 }
727 Poll::Pending => {
728 if self
742 .heartbeat_sleep
743 .get_or_insert_with(|| {
744 Box::pin(tokio::time::sleep(
745 ORDERED_IDLE_HEARTBEAT.saturating_mul(2),
746 ))
747 })
748 .poll_unpin(cx)
749 .is_ready()
750 {
751 self.heartbeat_sleep = None;
752 self.subscriber = None;
753 self.consumer_sequence.store(0, Ordering::Relaxed);
754 return Poll::Ready(Some(Err(OrderedError::new(
755 OrderedErrorKind::MissingHeartbeat,
756 ))));
757 }
758 return Poll::Pending;
759 }
760 }
761 }
762 }
763 }
764}
765
766#[derive(Clone, Debug, PartialEq)]
767pub enum OrderedErrorKind {
768 MissingHeartbeat,
769 ConsumerDeleted,
770 PullBasedConsumer,
771 Recreate,
772 Other,
773}
774
775impl std::fmt::Display for OrderedErrorKind {
776 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
777 match self {
778 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
779 Self::ConsumerDeleted => write!(f, "consumer deleted"),
780 Self::Other => write!(f, "error"),
781 Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
782 Self::Recreate => write!(f, "consumer recreation failed"),
783 }
784 }
785}
786
787pub type OrderedError = Error<OrderedErrorKind>;
788
789impl From<MessagesError> for OrderedError {
790 fn from(err: MessagesError) -> Self {
791 match err.kind() {
792 MessagesErrorKind::MissingHeartbeat => {
793 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
794 }
795 MessagesErrorKind::ConsumerDeleted => {
796 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
797 }
798 MessagesErrorKind::PullBasedConsumer => {
799 OrderedError::new(OrderedErrorKind::PullBasedConsumer)
800 }
801 MessagesErrorKind::Other => OrderedError {
802 kind: OrderedErrorKind::Other,
803 source: err.source,
804 },
805 }
806 }
807}
808
809#[derive(Clone, Copy, Debug, PartialEq)]
810pub enum MessagesErrorKind {
811 MissingHeartbeat,
812 ConsumerDeleted,
813 PullBasedConsumer,
814 Other,
815}
816
817impl std::fmt::Display for MessagesErrorKind {
818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819 match self {
820 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
821 Self::ConsumerDeleted => write!(f, "consumer deleted"),
822 Self::Other => write!(f, "error"),
823 Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
824 }
825 }
826}
827
828pub type MessagesError = Error<MessagesErrorKind>;
829
830#[derive(Clone, Copy, Debug, PartialEq)]
831pub enum ConsumerRecreateErrorKind {
832 GetStream,
833 Subscription,
834 Recreate,
835 TimedOut,
836}
837
838impl std::fmt::Display for ConsumerRecreateErrorKind {
839 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
840 match self {
841 Self::GetStream => write!(f, "error getting stream"),
842 Self::Recreate => write!(f, "consumer creation failed"),
843 Self::TimedOut => write!(f, "timed out"),
844 Self::Subscription => write!(f, "failed to resubscribe"),
845 }
846 }
847}
848
849pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
850
851async fn recreate_consumer_and_subscription(
852 context: Context,
853 mut config: OrderedConfig,
854 stream_name: String,
855 sequence: u64,
856) -> Result<Subscriber, ConsumerRecreateError> {
857 let delivery_subject = context.client.new_inbox();
858 config.deliver_subject = delivery_subject;
859
860 let subscriber = context
861 .client
862 .subscribe(config.deliver_subject.clone())
863 .await
864 .map_err(|err| {
865 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
866 })?;
867
868 recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
869 Ok(subscriber)
870}
871async fn recreate_ephemeral_consumer(
872 context: Context,
873 config: OrderedConfig,
874 stream_name: String,
875 sequence: u64,
876) -> Result<(), ConsumerRecreateError> {
877 let deliver_policy = {
878 if sequence == 0 {
879 DeliverPolicy::All
880 } else {
881 DeliverPolicy::ByStartSequence {
882 start_sequence: sequence + 1,
883 }
884 }
885 };
886
887 let config = config.clone();
888 tokio::time::timeout(
889 Duration::from_secs(5),
890 context.create_consumer_on_stream(
891 jetstream::consumer::push::OrderedConfig {
892 deliver_policy,
893 ..config
894 },
895 stream_name.clone(),
896 ),
897 )
898 .await
899 .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
900 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
901
902 Ok(())
903}