1use super::{
15 backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
16 StreamError, StreamErrorKind,
17};
18
19#[cfg(feature = "server_2_11")]
20use super::PriorityPolicy;
21
22use crate::{
23 connection::State,
24 error::Error,
25 jetstream::{self, Context, Message},
26 StatusCode, Subscriber,
27};
28
29use bytes::Bytes;
30use futures_util::{future::BoxFuture, FutureExt};
31use portable_atomic::AtomicU64;
32use serde::{Deserialize, Serialize};
33#[cfg(feature = "server_2_10")]
34use std::collections::HashMap;
35use std::task::{self, Poll};
36use std::{
37 io::{self},
38 pin::Pin,
39 sync::Arc,
40};
41use std::{sync::atomic::Ordering, time::Duration};
42#[cfg(feature = "server_2_11")]
43use time::{serde::rfc3339, OffsetDateTime};
44use tracing::{debug, trace};
45
46const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_secs(5);
47
48impl Consumer<Config> {
49 pub async fn messages(&self) -> Result<Messages, StreamError> {
93 let deliver_subject = self.info.config.deliver_subject.clone().unwrap();
94 let subscriber = if let Some(ref group) = self.info.config.deliver_group {
95 self.context
96 .client
97 .queue_subscribe(deliver_subject, group.to_owned())
98 .await
99 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
100 } else {
101 self.context
102 .client
103 .subscribe(deliver_subject)
104 .await
105 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?
106 };
107
108 Ok(Messages {
109 context: self.context.clone(),
110 config: self.config.clone(),
111 subscriber,
112 heartbeat_sleep: None,
113 })
114 }
115}
116
117pub struct Messages {
118 context: Context,
119 subscriber: Subscriber,
120 config: Config,
121 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
122}
123
124impl futures_util::Stream for Messages {
125 type Item = Result<Message, MessagesError>;
126
127 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
128 if !self.config.idle_heartbeat.is_zero() {
129 let heartbeat_sleep = self.config.idle_heartbeat.saturating_mul(2);
130 match self
131 .heartbeat_sleep
132 .get_or_insert_with(|| Box::pin(tokio::time::sleep(heartbeat_sleep)))
133 .poll_unpin(cx)
134 {
135 Poll::Ready(_) => {
136 self.heartbeat_sleep = None;
137 return Poll::Ready(Some(Err(MessagesError::new(
138 MessagesErrorKind::MissingHeartbeat,
139 ))));
140 }
141 Poll::Pending => (),
142 }
143 }
144 loop {
145 match self.subscriber.receiver.poll_recv(cx) {
146 Poll::Ready(maybe_message) => {
147 self.heartbeat_sleep = None;
148 match maybe_message {
149 Some(message) => match message.status {
150 Some(StatusCode::IDLE_HEARTBEAT) => {
151 if let Some(subject) = message.reply {
152 let client = self.context.client.clone();
154 tokio::task::spawn(async move {
155 client
156 .publish(subject, Bytes::from_static(b""))
157 .await
158 .unwrap();
159 });
160 }
161
162 continue;
163 }
164 Some(_) => {
165 continue;
166 }
167 None => {
168 return Poll::Ready(Some(Ok(jetstream::Message {
169 context: self.context.clone(),
170 message,
171 })))
172 }
173 },
174 None => return Poll::Ready(None),
175 }
176 }
177 Poll::Pending => return Poll::Pending,
178 }
179 }
180 }
181}
182
183#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
187pub struct Config {
188 #[serde(default)]
190 pub deliver_subject: String,
191 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub durable_name: Option<String>,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
211 pub name: Option<String>,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
214 pub description: Option<String>,
215 #[serde(default, skip_serializing_if = "Option::is_none")]
216 pub deliver_group: Option<String>,
218 #[serde(flatten)]
220 pub deliver_policy: DeliverPolicy,
221 pub ack_policy: AckPolicy,
223 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
225 pub ack_wait: Duration,
226 #[serde(default, skip_serializing_if = "is_default")]
228 pub max_deliver: i64,
229 #[serde(default, skip_serializing_if = "is_default")]
231 pub filter_subject: String,
232 #[cfg(feature = "server_2_10")]
233 #[serde(default, skip_serializing_if = "is_default")]
235 pub filter_subjects: Vec<String>,
236 pub replay_policy: ReplayPolicy,
238 #[serde(default, skip_serializing_if = "is_default")]
240 pub rate_limit: u64,
241 #[serde(
243 rename = "sample_freq",
244 with = "super::sample_freq_deser",
245 default,
246 skip_serializing_if = "is_default"
247 )]
248 pub sample_frequency: u8,
249 #[serde(default, skip_serializing_if = "is_default")]
251 pub max_waiting: i64,
252 #[serde(default, skip_serializing_if = "is_default")]
256 pub max_ack_pending: i64,
257 #[serde(default, skip_serializing_if = "is_default")]
259 pub headers_only: bool,
260 #[serde(default, skip_serializing_if = "is_default")]
262 pub flow_control: bool,
263 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
265 pub idle_heartbeat: Duration,
266 #[serde(default, skip_serializing_if = "is_default")]
268 pub num_replicas: usize,
269 #[serde(default, skip_serializing_if = "is_default")]
271 pub memory_storage: bool,
272 #[cfg(feature = "server_2_10")]
273 #[serde(default, skip_serializing_if = "is_default")]
275 pub metadata: HashMap<String, String>,
276 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
278 pub backoff: Vec<Duration>,
279 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
281 pub inactive_threshold: Duration,
282 #[cfg(feature = "server_2_11")]
284 #[serde(
285 default,
286 with = "rfc3339::option",
287 skip_serializing_if = "Option::is_none"
288 )]
289 pub pause_until: Option<OffsetDateTime>,
290}
291
292impl FromConsumer for Config {
293 fn try_from_consumer_config(config: super::Config) -> Result<Self, crate::Error> {
294 if config.deliver_subject.is_none() {
295 return Err(Box::new(io::Error::other(
296 "push consumer must have delivery subject",
297 )));
298 }
299
300 Ok(Config {
301 deliver_subject: config.deliver_subject.unwrap(),
302 durable_name: config.durable_name,
303 name: config.name,
304 description: config.description,
305 deliver_group: config.deliver_group,
306 deliver_policy: config.deliver_policy,
307 ack_policy: config.ack_policy,
308 ack_wait: config.ack_wait,
309 max_deliver: config.max_deliver,
310 filter_subject: config.filter_subject,
311 #[cfg(feature = "server_2_10")]
312 filter_subjects: config.filter_subjects,
313 replay_policy: config.replay_policy,
314 rate_limit: config.rate_limit,
315 sample_frequency: config.sample_frequency,
316 max_waiting: config.max_waiting,
317 max_ack_pending: config.max_ack_pending,
318 headers_only: config.headers_only,
319 flow_control: config.flow_control,
320 idle_heartbeat: config.idle_heartbeat,
321 num_replicas: config.num_replicas,
322 memory_storage: config.memory_storage,
323 #[cfg(feature = "server_2_10")]
324 metadata: config.metadata,
325 backoff: config.backoff,
326 inactive_threshold: config.inactive_threshold,
327 #[cfg(feature = "server_2_11")]
328 pause_until: config.pause_until,
329 })
330 }
331}
332
333impl IntoConsumerConfig for Config {
334 fn into_consumer_config(self) -> jetstream::consumer::Config {
335 jetstream::consumer::Config {
336 deliver_subject: Some(self.deliver_subject),
337 durable_name: self.durable_name,
338 name: self.name,
339 description: self.description,
340 deliver_group: self.deliver_group,
341 deliver_policy: self.deliver_policy,
342 ack_policy: self.ack_policy,
343 ack_wait: self.ack_wait,
344 max_deliver: self.max_deliver,
345 filter_subject: self.filter_subject,
346 #[cfg(feature = "server_2_10")]
347 filter_subjects: self.filter_subjects,
348 replay_policy: self.replay_policy,
349 rate_limit: self.rate_limit,
350 sample_frequency: self.sample_frequency,
351 max_waiting: self.max_waiting,
352 max_ack_pending: self.max_ack_pending,
353 headers_only: self.headers_only,
354 flow_control: self.flow_control,
355 idle_heartbeat: self.idle_heartbeat,
356 max_batch: 0,
357 max_bytes: 0,
358 max_expires: Duration::default(),
359 inactive_threshold: self.inactive_threshold,
360 num_replicas: self.num_replicas,
361 memory_storage: self.memory_storage,
362 #[cfg(feature = "server_2_10")]
363 metadata: self.metadata,
364 backoff: self.backoff,
365 #[cfg(feature = "server_2_11")]
366 priority_policy: PriorityPolicy::None,
367 #[cfg(feature = "server_2_11")]
368 priority_groups: Vec::new(),
369 #[cfg(feature = "server_2_11")]
370 pause_until: self.pause_until,
371 }
372 }
373}
374impl IntoConsumerConfig for &Config {
375 fn into_consumer_config(self) -> jetstream::consumer::Config {
376 self.clone().into_consumer_config()
377 }
378}
379fn is_default<T: Default + Eq>(t: &T) -> bool {
380 t == &T::default()
381}
382
383#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
387pub struct OrderedConfig {
388 #[serde(default)]
390 pub deliver_subject: String,
391 #[serde(default, skip_serializing_if = "Option::is_none")]
394 pub name: Option<String>,
395 #[serde(default, skip_serializing_if = "Option::is_none")]
397 pub description: Option<String>,
398 #[serde(default, skip_serializing_if = "is_default")]
399 pub filter_subject: String,
400 #[cfg(feature = "server_2_10")]
401 #[serde(default, skip_serializing_if = "is_default")]
403 pub filter_subjects: Vec<String>,
404 pub replay_policy: ReplayPolicy,
406 #[serde(default, skip_serializing_if = "is_default")]
408 pub rate_limit: u64,
409 #[serde(
411 rename = "sample_freq",
412 with = "super::sample_freq_deser",
413 default,
414 skip_serializing_if = "is_default"
415 )]
416 pub sample_frequency: u8,
417 #[serde(default, skip_serializing_if = "is_default")]
419 pub headers_only: bool,
420 #[serde(flatten)]
422 pub deliver_policy: DeliverPolicy,
423 #[serde(default, skip_serializing_if = "is_default")]
425 pub max_waiting: i64,
426 #[cfg(feature = "server_2_10")]
427 #[serde(default, skip_serializing_if = "is_default")]
429 pub metadata: HashMap<String, String>,
430}
431
432impl FromConsumer for OrderedConfig {
433 fn try_from_consumer_config(
434 config: crate::jetstream::consumer::Config,
435 ) -> Result<Self, crate::Error>
436 where
437 Self: Sized,
438 {
439 if config.deliver_subject.is_none() {
440 return Err(Box::new(io::Error::other(
441 "push consumer must have delivery subject",
442 )));
443 }
444 Ok(OrderedConfig {
445 name: config.name,
446 deliver_subject: config.deliver_subject.unwrap(),
447 description: config.description,
448 filter_subject: config.filter_subject,
449 #[cfg(feature = "server_2_10")]
450 filter_subjects: config.filter_subjects,
451 replay_policy: config.replay_policy,
452 rate_limit: config.rate_limit,
453 sample_frequency: config.sample_frequency,
454 headers_only: config.headers_only,
455 deliver_policy: config.deliver_policy,
456 max_waiting: config.max_waiting,
457 #[cfg(feature = "server_2_10")]
458 metadata: config.metadata,
459 })
460 }
461}
462
463impl IntoConsumerConfig for OrderedConfig {
464 fn into_consumer_config(self) -> super::Config {
465 jetstream::consumer::Config {
466 deliver_subject: Some(self.deliver_subject),
467 durable_name: None,
468 name: self.name,
469 description: self.description,
470 deliver_group: None,
471 deliver_policy: self.deliver_policy,
472 ack_policy: AckPolicy::None,
473 ack_wait: Duration::default(),
474 max_deliver: 1,
475 filter_subject: self.filter_subject,
476 #[cfg(feature = "server_2_10")]
477 filter_subjects: self.filter_subjects,
478 replay_policy: self.replay_policy,
479 rate_limit: self.rate_limit,
480 sample_frequency: self.sample_frequency,
481 max_waiting: self.max_waiting,
482 max_ack_pending: 0,
483 headers_only: self.headers_only,
484 flow_control: true,
485 idle_heartbeat: ORDERED_IDLE_HEARTBEAT,
486 max_batch: 0,
487 max_bytes: 0,
488 max_expires: Duration::default(),
489 inactive_threshold: Duration::from_secs(30),
490 num_replicas: 1,
491 memory_storage: true,
492 #[cfg(feature = "server_2_10")]
493 metadata: self.metadata,
494 backoff: Vec::new(),
495 #[cfg(feature = "server_2_11")]
496 priority_policy: PriorityPolicy::None,
497 #[cfg(feature = "server_2_11")]
498 priority_groups: Vec::new(),
499 #[cfg(feature = "server_2_11")]
500 pause_until: None,
501 }
502 }
503}
504
505impl Consumer<OrderedConfig> {
506 pub async fn messages(self) -> Result<Ordered, StreamError> {
507 let subscriber = self
508 .context
509 .client
510 .subscribe(self.info.config.deliver_subject.clone().unwrap())
511 .await
512 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
513
514 let last_sequence = Arc::new(AtomicU64::new(0));
515 let consumer_sequence = Arc::new(AtomicU64::new(0));
516
517 let state = self.context.client.state.clone();
518 Ok(Ordered {
519 context: self.context.clone(),
520 consumer: self,
521 subscriber: Some(subscriber),
522 subscriber_future: None,
523 stream_sequence: last_sequence,
524 consumer_sequence,
525 heartbeat_sleep: None,
526 state,
527 last_known_state: State::Connected,
528 state_change_future: None,
529 })
530 }
531}
532
533pub struct Ordered {
534 context: Context,
535 consumer: Consumer<OrderedConfig>,
536 subscriber: Option<Subscriber>,
537 subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
538 stream_sequence: Arc<AtomicU64>,
539 consumer_sequence: Arc<AtomicU64>,
540 heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
541 state: tokio::sync::watch::Receiver<State>,
542 last_known_state: State,
543 state_change_future:
544 Option<BoxFuture<'static, Result<(), tokio::sync::watch::error::RecvError>>>,
545}
546
547impl futures_util::Stream for Ordered {
548 type Item = Result<Message, OrderedError>;
549
550 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
551 loop {
552 if self.subscriber.is_some()
555 && self.subscriber_future.is_none()
556 && self
557 .heartbeat_sleep
558 .get_or_insert_with(|| {
559 Box::pin(tokio::time::sleep(ORDERED_IDLE_HEARTBEAT.saturating_mul(2)))
560 })
561 .poll_unpin(cx)
562 .is_ready()
563 {
564 self.heartbeat_sleep = None;
565 return Poll::Ready(Some(Err(OrderedError::new(
566 OrderedErrorKind::MissingHeartbeat,
567 ))));
568 }
569 match &mut self.state_change_future {
571 None => {
572 if self.state.has_changed().unwrap_or(false) {
574 let current_state = self.state.borrow_and_update().clone();
575
576 use State::*;
578 match (&self.last_known_state, ¤t_state) {
579 (Connected, Disconnected | Pending) => {
580 debug!("Connection lost, marking subscriber as invalid");
581 self.subscriber = None;
582 self.heartbeat_sleep = None;
583 }
584 (Disconnected | Pending, Connected) => {
585 debug!("Connection restored, triggering consumer recreation");
586 self.subscriber = None;
587 self.consumer_sequence.store(0, Ordering::Relaxed);
588 self.heartbeat_sleep = None;
589 }
590 _ => {}
591 }
592
593 self.last_known_state = current_state;
594 } else {
595 let mut state = self.state.clone();
597 self.state_change_future =
598 Some(Box::pin(async move { state.changed().await }));
599 }
600 }
601 Some(fut) => match fut.poll_unpin(cx) {
602 Poll::Ready(Ok(())) => {
603 self.state_change_future = None;
604 continue; }
606 Poll::Ready(Err(_)) => {
607 return Poll::Ready(Some(Err(OrderedError::with_source(
608 OrderedErrorKind::Other,
609 "Connection state watcher dropped",
610 ))));
611 }
612 Poll::Pending => {}
613 },
614 }
615
616 if self.subscriber.is_none() {
620 if self.subscriber_future.is_none() {
622 trace!("Creating subscriber recreation future");
623 let context = self.context.clone();
624 let sequence = self.stream_sequence.clone();
625 let config = self.consumer.config.clone();
626 let stream_name = self.consumer.info.stream_name.clone();
627
628 self.subscriber_future = Some(Box::pin(async move {
629 tryhard::retry_fn(|| {
630 recreate_consumer_and_subscription(
631 context.clone(),
632 config.clone(),
633 stream_name.clone(),
634 sequence.load(Ordering::Relaxed),
635 )
636 })
637 .retries(u32::MAX)
638 .custom_backoff(backoff)
639 .await
640 .map_err(|err| {
641 ConsumerRecreateError::with_source(
642 ConsumerRecreateErrorKind::Recreate,
643 err,
644 )
645 })
646 }));
647 }
648
649 if let Some(fut) = &mut self.subscriber_future {
651 match fut.poll_unpin(cx) {
652 Poll::Ready(result) => {
653 self.subscriber_future = None;
654 self.consumer_sequence.store(0, Ordering::Relaxed);
655 self.subscriber = Some(result.map_err(|err| {
656 OrderedError::with_source(OrderedErrorKind::Recreate, err)
657 })?);
658 }
659 Poll::Pending => return Poll::Pending,
660 }
661 }
662 }
663 if let Some(subscriber) = self.subscriber.as_mut() {
665 match subscriber.receiver.poll_recv(cx) {
666 Poll::Ready(None) => return Poll::Ready(None),
667 Poll::Ready(Some(message)) => {
668 self.heartbeat_sleep = None;
669 match message.status {
670 Some(StatusCode::IDLE_HEARTBEAT) => {
671 debug!("received idle heartbeats");
672 if let Some(headers) = message.headers.as_ref() {
673 if let Some(sequence) =
674 headers.get_last(crate::header::NATS_LAST_CONSUMER)
675 {
676 let sequence: u64 =
677 sequence.as_str().parse().map_err(|err| {
678 OrderedError::with_source(
679 OrderedErrorKind::Other,
680 err,
681 )
682 })?;
683
684 let last_sequence =
685 self.consumer_sequence.load(Ordering::Relaxed);
686
687 if sequence != last_sequence {
688 debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence);
689 self.subscriber = None;
690 self.heartbeat_sleep = None;
691 }
692 }
693 }
694 if let Some(subject) = message.reply.clone() {
696 trace!("received flow control message");
697 let client = self.context.client.clone();
698 tokio::task::spawn(async move {
699 client.publish(subject, Bytes::from_static(b"")).await.ok();
700 });
701 }
702 continue;
703 }
704 Some(status) => {
705 debug!("received status message: {}", status);
706 continue;
707 }
708 None => {
709 trace!("received a message");
710 let jetstream_message = jetstream::message::Message {
711 message,
712 context: self.context.clone(),
713 };
714
715 let info = jetstream_message.info().map_err(|err| {
716 OrderedError::with_source(OrderedErrorKind::Other, err)
717 })?;
718 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
719 self.consumer_sequence,
720 self.stream_sequence,
721 info.consumer_sequence,
722 info.stream_sequence);
723 if info.consumer_sequence
724 != self.consumer_sequence.load(Ordering::Relaxed) + 1
725 {
726 debug!(
727 "ordered consumer mismatch. current {}, info: {}",
728 self.consumer_sequence.load(Ordering::Relaxed),
729 info.consumer_sequence
730 );
731 self.subscriber = None;
732 self.consumer_sequence.store(0, Ordering::Relaxed);
733 self.heartbeat_sleep = None;
734 continue;
735 }
736 self.stream_sequence
737 .store(info.stream_sequence, Ordering::Relaxed);
738 self.consumer_sequence
739 .store(info.consumer_sequence, Ordering::Relaxed);
740 return Poll::Ready(Some(Ok(jetstream_message)));
741 }
742 }
743 }
744 Poll::Pending => return Poll::Pending,
745 }
746 }
747 }
748 }
749}
750
751#[derive(Clone, Debug, PartialEq)]
752pub enum OrderedErrorKind {
753 MissingHeartbeat,
754 ConsumerDeleted,
755 PullBasedConsumer,
756 Recreate,
757 Other,
758}
759
760impl std::fmt::Display for OrderedErrorKind {
761 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
762 match self {
763 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
764 Self::ConsumerDeleted => write!(f, "consumer deleted"),
765 Self::Other => write!(f, "error"),
766 Self::PullBasedConsumer => write!(f, "cannot use with push consumer"),
767 Self::Recreate => write!(f, "consumer recreation failed"),
768 }
769 }
770}
771
772pub type OrderedError = Error<OrderedErrorKind>;
773
774impl From<MessagesError> for OrderedError {
775 fn from(err: MessagesError) -> Self {
776 match err.kind() {
777 MessagesErrorKind::MissingHeartbeat => {
778 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
779 }
780 MessagesErrorKind::ConsumerDeleted => {
781 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
782 }
783 MessagesErrorKind::PullBasedConsumer => {
784 OrderedError::new(OrderedErrorKind::PullBasedConsumer)
785 }
786 MessagesErrorKind::Other => OrderedError {
787 kind: OrderedErrorKind::Other,
788 source: err.source,
789 },
790 }
791 }
792}
793
794#[derive(Clone, Copy, Debug, PartialEq)]
795pub enum MessagesErrorKind {
796 MissingHeartbeat,
797 ConsumerDeleted,
798 PullBasedConsumer,
799 Other,
800}
801
802impl std::fmt::Display for MessagesErrorKind {
803 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804 match self {
805 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
806 Self::ConsumerDeleted => write!(f, "consumer deleted"),
807 Self::Other => write!(f, "error"),
808 Self::PullBasedConsumer => write!(f, "cannot use with pull consumer"),
809 }
810 }
811}
812
813pub type MessagesError = Error<MessagesErrorKind>;
814
815#[derive(Clone, Copy, Debug, PartialEq)]
816pub enum ConsumerRecreateErrorKind {
817 GetStream,
818 Subscription,
819 Recreate,
820 TimedOut,
821}
822
823impl std::fmt::Display for ConsumerRecreateErrorKind {
824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825 match self {
826 Self::GetStream => write!(f, "error getting stream"),
827 Self::Recreate => write!(f, "consumer creation failed"),
828 Self::TimedOut => write!(f, "timed out"),
829 Self::Subscription => write!(f, "failed to resubscribe"),
830 }
831 }
832}
833
834pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
835
836async fn recreate_consumer_and_subscription(
837 context: Context,
838 mut config: OrderedConfig,
839 stream_name: String,
840 sequence: u64,
841) -> Result<Subscriber, ConsumerRecreateError> {
842 let delivery_subject = context.client.new_inbox();
843 config.deliver_subject = delivery_subject;
844
845 let subscriber = context
846 .client
847 .subscribe(config.deliver_subject.clone())
848 .await
849 .map_err(|err| {
850 ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
851 })?;
852
853 recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
854 Ok(subscriber)
855}
856async fn recreate_ephemeral_consumer(
857 context: Context,
858 config: OrderedConfig,
859 stream_name: String,
860 sequence: u64,
861) -> Result<(), ConsumerRecreateError> {
862 let deliver_policy = {
863 if sequence == 0 {
864 DeliverPolicy::All
865 } else {
866 DeliverPolicy::ByStartSequence {
867 start_sequence: sequence + 1,
868 }
869 }
870 };
871
872 let config = config.clone();
873 tokio::time::timeout(
874 Duration::from_secs(5),
875 context.create_consumer_on_stream(
876 jetstream::consumer::push::OrderedConfig {
877 deliver_policy,
878 ..config
879 },
880 stream_name.clone(),
881 ),
882 )
883 .await
884 .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
885 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
886
887 Ok(())
888}