async_nats_wrpc/jetstream/consumer/pull.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16 future::{BoxFuture, Either},
17 FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_10")]
21use std::collections::HashMap;
22use std::{future, pin::Pin, task::Poll, time::Duration};
23use tokio::{task::JoinHandle, time::Sleep};
24
25use serde::{Deserialize, Serialize};
26use tracing::{debug, trace};
27
28use crate::{
29 connection::State,
30 error::Error,
31 jetstream::{self, Context},
32 StatusCode, SubscribeError, Subscriber,
33};
34
35use crate::subject::Subject;
36
37use super::{
38 AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
39 StreamError, StreamErrorKind,
40};
41use jetstream::consumer;
42
43impl Consumer<Config> {
44 /// Returns a stream of messages for Pull Consumer.
45 ///
46 /// # Example
47 ///
48 /// ```no_run
49 /// # #[tokio::main]
50 /// # async fn mains() -> Result<(), async_nats::Error> {
51 /// use futures::StreamExt;
52 /// use futures::TryStreamExt;
53 ///
54 /// let client = async_nats::connect("localhost:4222").await?;
55 /// let jetstream = async_nats::jetstream::new(client);
56 ///
57 /// let stream = jetstream
58 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
59 /// name: "events".to_string(),
60 /// max_messages: 10_000,
61 /// ..Default::default()
62 /// })
63 /// .await?;
64 ///
65 /// jetstream.publish("events", "data".into()).await?;
66 ///
67 /// let consumer = stream
68 /// .get_or_create_consumer(
69 /// "consumer",
70 /// async_nats::jetstream::consumer::pull::Config {
71 /// durable_name: Some("consumer".to_string()),
72 /// ..Default::default()
73 /// },
74 /// )
75 /// .await?;
76 ///
77 /// let mut messages = consumer.messages().await?.take(100);
78 /// while let Some(Ok(message)) = messages.next().await {
79 /// println!("got message {:?}", message);
80 /// message.ack().await?;
81 /// }
82 /// Ok(())
83 /// # }
84 /// ```
85 pub async fn messages(&self) -> Result<Stream, StreamError> {
86 Stream::stream(
87 BatchConfig {
88 batch: 200,
89 expires: Some(Duration::from_secs(30)),
90 no_wait: false,
91 max_bytes: 0,
92 idle_heartbeat: Duration::from_secs(15),
93 },
94 self,
95 )
96 .await
97 }
98
99 /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
100 /// messages or bytes buffered.
101 ///
102 /// # Examples
103 ///
104 /// ```no_run
105 /// # #[tokio::main]
106 /// # async fn main() -> Result<(), async_nats::Error> {
107 /// use async_nats::jetstream::consumer::PullConsumer;
108 /// use futures::StreamExt;
109 /// let client = async_nats::connect("localhost:4222").await?;
110 /// let jetstream = async_nats::jetstream::new(client);
111 ///
112 /// let consumer: PullConsumer = jetstream
113 /// .get_stream("events")
114 /// .await?
115 /// .get_consumer("pull")
116 /// .await?;
117 ///
118 /// let mut messages = consumer
119 /// .stream()
120 /// .max_messages_per_batch(100)
121 /// .max_bytes_per_batch(1024)
122 /// .messages()
123 /// .await?;
124 ///
125 /// while let Some(message) = messages.next().await {
126 /// let message = message?;
127 /// println!("message: {:?}", message);
128 /// message.ack().await?;
129 /// }
130 /// # Ok(())
131 /// # }
132 /// ```
133 pub fn stream(&self) -> StreamBuilder<'_> {
134 StreamBuilder::new(self)
135 }
136
137 pub(crate) async fn request_batch<I: Into<BatchConfig>>(
138 &self,
139 batch: I,
140 inbox: Subject,
141 ) -> Result<(), BatchRequestError> {
142 debug!("sending batch");
143 let subject = format!(
144 "{}.CONSUMER.MSG.NEXT.{}.{}",
145 self.context.prefix, self.info.stream_name, self.info.name
146 );
147
148 let payload = serde_json::to_vec(&batch.into())
149 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
150
151 self.context
152 .client
153 .publish_with_reply(subject, inbox, payload.into())
154 .await
155 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
156 debug!("batch request sent");
157 Ok(())
158 }
159
160 /// Returns a batch of specified number of messages, or if there are less messages on the
161 /// [Stream] than requested, returns all available messages.
162 ///
163 /// # Example
164 ///
165 /// ```no_run
166 /// # #[tokio::main]
167 /// # async fn mains() -> Result<(), async_nats::Error> {
168 /// use futures::StreamExt;
169 /// use futures::TryStreamExt;
170 ///
171 /// let client = async_nats::connect("localhost:4222").await?;
172 /// let jetstream = async_nats::jetstream::new(client);
173 ///
174 /// let stream = jetstream
175 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
176 /// name: "events".to_string(),
177 /// max_messages: 10_000,
178 /// ..Default::default()
179 /// })
180 /// .await?;
181 ///
182 /// jetstream.publish("events", "data".into()).await?;
183 ///
184 /// let consumer = stream
185 /// .get_or_create_consumer(
186 /// "consumer",
187 /// async_nats::jetstream::consumer::pull::Config {
188 /// durable_name: Some("consumer".to_string()),
189 /// ..Default::default()
190 /// },
191 /// )
192 /// .await?;
193 ///
194 /// for _ in 0..100 {
195 /// jetstream.publish("events", "data".into()).await?;
196 /// }
197 ///
198 /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
199 /// // will finish after 100 messages, as that is the number of messages available on the
200 /// // stream.
201 /// while let Some(Ok(message)) = messages.next().await {
202 /// println!("got message {:?}", message);
203 /// message.ack().await?;
204 /// }
205 /// Ok(())
206 /// # }
207 /// ```
208 pub fn fetch(&self) -> FetchBuilder {
209 FetchBuilder::new(self)
210 }
211
212 /// Returns a batch of specified number of messages unless timeout happens first.
213 ///
214 /// # Example
215 ///
216 /// ```no_run
217 /// # #[tokio::main]
218 /// # async fn mains() -> Result<(), async_nats::Error> {
219 /// use futures::StreamExt;
220 /// use futures::TryStreamExt;
221 ///
222 /// let client = async_nats::connect("localhost:4222").await?;
223 /// let jetstream = async_nats::jetstream::new(client);
224 ///
225 /// let stream = jetstream
226 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
227 /// name: "events".to_string(),
228 /// max_messages: 10_000,
229 /// ..Default::default()
230 /// })
231 /// .await?;
232 ///
233 /// jetstream.publish("events", "data".into()).await?;
234 ///
235 /// let consumer = stream
236 /// .get_or_create_consumer(
237 /// "consumer",
238 /// async_nats::jetstream::consumer::pull::Config {
239 /// durable_name: Some("consumer".to_string()),
240 /// ..Default::default()
241 /// },
242 /// )
243 /// .await?;
244 ///
245 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
246 /// while let Some(Ok(message)) = messages.next().await {
247 /// println!("got message {:?}", message);
248 /// message.ack().await?;
249 /// }
250 /// Ok(())
251 /// # }
252 /// ```
253 pub fn batch(&self) -> BatchBuilder {
254 BatchBuilder::new(self)
255 }
256
257 /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
258 /// messages in those batches.
259 ///
260 /// # Example
261 ///
262 /// ```no_run
263 /// # #[tokio::main]
264 /// # async fn mains() -> Result<(), async_nats::Error> {
265 /// use futures::StreamExt;
266 /// use futures::TryStreamExt;
267 ///
268 /// let client = async_nats::connect("localhost:4222").await?;
269 /// let jetstream = async_nats::jetstream::new(client);
270 ///
271 /// let stream = jetstream
272 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
273 /// name: "events".to_string(),
274 /// max_messages: 10_000,
275 /// ..Default::default()
276 /// })
277 /// .await?;
278 ///
279 /// jetstream.publish("events", "data".into()).await?;
280 ///
281 /// let consumer = stream
282 /// .get_or_create_consumer(
283 /// "consumer",
284 /// async_nats::jetstream::consumer::pull::Config {
285 /// durable_name: Some("consumer".to_string()),
286 /// ..Default::default()
287 /// },
288 /// )
289 /// .await?;
290 ///
291 /// let mut iter = consumer.sequence(50).unwrap().take(10);
292 /// while let Ok(Some(mut batch)) = iter.try_next().await {
293 /// while let Ok(Some(message)) = batch.try_next().await {
294 /// println!("message received: {:?}", message);
295 /// }
296 /// }
297 /// Ok(())
298 /// # }
299 /// ```
300 pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
301 let context = self.context.clone();
302 let subject = format!(
303 "{}.CONSUMER.MSG.NEXT.{}.{}",
304 self.context.prefix, self.info.stream_name, self.info.name
305 );
306
307 let request = serde_json::to_vec(&BatchConfig {
308 batch,
309 expires: Some(Duration::from_secs(60)),
310 ..Default::default()
311 })
312 .map(Bytes::from)
313 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
314
315 Ok(Sequence {
316 context,
317 subject,
318 request,
319 pending_messages: batch,
320 next: None,
321 })
322 }
323}
324
325pub struct Batch {
326 pending_messages: usize,
327 subscriber: Subscriber,
328 context: Context,
329 timeout: Option<Pin<Box<Sleep>>>,
330 terminated: bool,
331}
332
333impl<'a> Batch {
334 async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
335 let inbox = Subject::from(consumer.context.client.new_inbox());
336 let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
337 consumer.request_batch(batch, inbox.clone()).await?;
338
339 let sleep = batch.expires.map(|expires| {
340 Box::pin(tokio::time::sleep(
341 expires.saturating_add(Duration::from_secs(5)),
342 ))
343 });
344
345 Ok(Batch {
346 pending_messages: batch.batch,
347 subscriber: subscription,
348 context: consumer.context.clone(),
349 terminated: false,
350 timeout: sleep,
351 })
352 }
353}
354
355impl futures::Stream for Batch {
356 type Item = Result<jetstream::Message, crate::Error>;
357
358 fn poll_next(
359 mut self: std::pin::Pin<&mut Self>,
360 cx: &mut std::task::Context<'_>,
361 ) -> std::task::Poll<Option<Self::Item>> {
362 if self.terminated {
363 return Poll::Ready(None);
364 }
365 if self.pending_messages == 0 {
366 self.terminated = true;
367 return Poll::Ready(None);
368 }
369 if let Some(sleep) = self.timeout.as_mut() {
370 match sleep.poll_unpin(cx) {
371 Poll::Ready(_) => {
372 debug!("batch timeout timer triggered");
373 // TODO(tp): Maybe we can be smarter here and before timing out, check if
374 // we consumed all the messages from the subscription buffer in case of user
375 // slowly consuming messages. Keep in mind that we time out here only if
376 // for some reason we missed timeout from the server and few seconds have
377 // passed since expected timeout message.
378 self.terminated = true;
379 return Poll::Ready(None);
380 }
381 Poll::Pending => (),
382 }
383 }
384 match self.subscriber.receiver.poll_recv(cx) {
385 Poll::Ready(maybe_message) => match maybe_message {
386 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
387 StatusCode::TIMEOUT => {
388 debug!("received timeout. Iterator done");
389 self.terminated = true;
390 Poll::Ready(None)
391 }
392 StatusCode::IDLE_HEARTBEAT => {
393 debug!("received heartbeat");
394 Poll::Pending
395 }
396 // If this is fetch variant, terminate on no more messages.
397 // We do not need to check if this is a fetch, not batch,
398 // as only fetch will send back `NO_MESSAGES` status.
399 StatusCode::NOT_FOUND => {
400 debug!("received `NO_MESSAGES`. Iterator done");
401 self.terminated = true;
402 Poll::Ready(None)
403 }
404 StatusCode::OK => {
405 debug!("received message");
406 self.pending_messages -= 1;
407 Poll::Ready(Some(Ok(jetstream::Message {
408 context: self.context.clone(),
409 message,
410 })))
411 }
412 status => {
413 debug!("received error");
414 self.terminated = true;
415 Poll::Ready(Some(Err(Box::new(std::io::Error::new(
416 std::io::ErrorKind::Other,
417 format!(
418 "error while processing messages from the stream: {}, {:?}",
419 status, message.description
420 ),
421 )))))
422 }
423 },
424 None => Poll::Ready(None),
425 },
426 std::task::Poll::Pending => std::task::Poll::Pending,
427 }
428 }
429}
430
431pub struct Sequence {
432 context: Context,
433 subject: String,
434 request: Bytes,
435 pending_messages: usize,
436 next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
437}
438
439impl futures::Stream for Sequence {
440 type Item = Result<Batch, MessagesError>;
441
442 fn poll_next(
443 mut self: std::pin::Pin<&mut Self>,
444 cx: &mut std::task::Context<'_>,
445 ) -> std::task::Poll<Option<Self::Item>> {
446 match self.next.as_mut() {
447 None => {
448 let context = self.context.clone();
449 let subject = self.subject.clone();
450 let request = self.request.clone();
451 let pending_messages = self.pending_messages;
452
453 let next = self.next.insert(Box::pin(async move {
454 let inbox = context.client.new_inbox();
455 let subscriber = context
456 .client
457 .subscribe(inbox.clone())
458 .await
459 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
460
461 context
462 .client
463 .publish_with_reply(subject, inbox, request)
464 .await
465 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
466
467 // TODO(tp): Add timeout config and defaults.
468 Ok(Batch {
469 pending_messages,
470 subscriber,
471 context,
472 terminated: false,
473 timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
474 })
475 }));
476
477 match next.as_mut().poll(cx) {
478 Poll::Ready(result) => {
479 self.next = None;
480 Poll::Ready(Some(result.map_err(|err| {
481 MessagesError::with_source(MessagesErrorKind::Pull, err)
482 })))
483 }
484 Poll::Pending => Poll::Pending,
485 }
486 }
487
488 Some(next) => match next.as_mut().poll(cx) {
489 Poll::Ready(result) => {
490 self.next = None;
491 Poll::Ready(Some(result.map_err(|err| {
492 MessagesError::with_source(MessagesErrorKind::Pull, err)
493 })))
494 }
495 Poll::Pending => Poll::Pending,
496 },
497 }
498 }
499}
500
501impl Consumer<OrderedConfig> {
502 /// Returns a stream of messages for Ordered Pull Consumer.
503 ///
504 /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
505 /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
506 /// sees mismatch.
507 ///
508 /// # Example
509 ///
510 /// ```no_run
511 /// # #[tokio::main]
512 /// # async fn mains() -> Result<(), async_nats::Error> {
513 /// use futures::StreamExt;
514 /// use futures::TryStreamExt;
515 ///
516 /// let client = async_nats::connect("localhost:4222").await?;
517 /// let jetstream = async_nats::jetstream::new(client);
518 ///
519 /// let stream = jetstream
520 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
521 /// name: "events".to_string(),
522 /// max_messages: 10_000,
523 /// ..Default::default()
524 /// })
525 /// .await?;
526 ///
527 /// jetstream.publish("events", "data".into()).await?;
528 ///
529 /// let consumer = stream
530 /// .get_or_create_consumer(
531 /// "consumer",
532 /// async_nats::jetstream::consumer::pull::OrderedConfig {
533 /// name: Some("consumer".to_string()),
534 /// ..Default::default()
535 /// },
536 /// )
537 /// .await?;
538 ///
539 /// let mut messages = consumer.messages().await?.take(100);
540 /// while let Some(Ok(message)) = messages.next().await {
541 /// println!("got message {:?}", message);
542 /// }
543 /// Ok(())
544 /// # }
545 /// ```
546 pub async fn messages(self) -> Result<Ordered, StreamError> {
547 let config = Consumer {
548 config: self.config.clone().into(),
549 context: self.context.clone(),
550 info: self.info.clone(),
551 };
552 let stream = Stream::stream(
553 BatchConfig {
554 batch: 500,
555 expires: Some(Duration::from_secs(30)),
556 no_wait: false,
557 max_bytes: 0,
558 idle_heartbeat: Duration::from_secs(15),
559 },
560 &config,
561 )
562 .await?;
563
564 Ok(Ordered {
565 consumer_sequence: 0,
566 stream_sequence: 0,
567 missed_heartbeats: false,
568 create_stream: None,
569 context: self.context.clone(),
570 consumer_name: self
571 .config
572 .name
573 .clone()
574 .unwrap_or_else(|| self.context.client.new_inbox()),
575 consumer: self.config,
576 stream: Some(stream),
577 stream_name: self.info.stream_name.clone(),
578 })
579 }
580}
581
582/// Configuration for consumers. From a high level, the
583/// `durable_name` and `deliver_subject` fields have a particularly
584/// strong influence on the consumer's overall behavior.
585#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
586pub struct OrderedConfig {
587 /// A name of the consumer. Can be specified for both durable and ephemeral
588 /// consumers.
589 #[serde(default, skip_serializing_if = "Option::is_none")]
590 pub name: Option<String>,
591 /// A short description of the purpose of this consumer.
592 #[serde(default, skip_serializing_if = "Option::is_none")]
593 pub description: Option<String>,
594 #[serde(default, skip_serializing_if = "is_default")]
595 pub filter_subject: String,
596 #[cfg(feature = "server_2_10")]
597 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
598 #[serde(default, skip_serializing_if = "is_default")]
599 pub filter_subjects: Vec<String>,
600 /// Whether messages are sent as quickly as possible or at the rate of receipt
601 pub replay_policy: ReplayPolicy,
602 /// The rate of message delivery in bits per second
603 #[serde(default, skip_serializing_if = "is_default")]
604 pub rate_limit: u64,
605 /// What percentage of acknowledgments should be samples for observability, 0-100
606 #[serde(default, skip_serializing_if = "is_default")]
607 pub sample_frequency: u8,
608 /// Only deliver headers without payloads.
609 #[serde(default, skip_serializing_if = "is_default")]
610 pub headers_only: bool,
611 /// Allows for a variety of options that determine how this consumer will receive messages
612 #[serde(flatten)]
613 pub deliver_policy: DeliverPolicy,
614 /// The maximum number of waiting consumers.
615 #[serde(default, skip_serializing_if = "is_default")]
616 pub max_waiting: i64,
617 #[cfg(feature = "server_2_10")]
618 // Additional consumer metadata.
619 #[serde(default, skip_serializing_if = "is_default")]
620 pub metadata: HashMap<String, String>,
621 // Maximum number of messages that can be requested in single Pull Request.
622 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
623 // [stream]
624 pub max_batch: i64,
625 // Maximum number of bytes that can be requested in single Pull Request.
626 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
627 // [stream]
628 pub max_bytes: i64,
629 // Maximum expiry that can be set for a single Pull Request.
630 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
631 // [stream]
632 pub max_expires: Duration,
633}
634
635impl From<OrderedConfig> for Config {
636 fn from(config: OrderedConfig) -> Self {
637 Config {
638 durable_name: None,
639 name: config.name,
640 description: config.description,
641 deliver_policy: config.deliver_policy,
642 ack_policy: AckPolicy::None,
643 ack_wait: Duration::default(),
644 max_deliver: 1,
645 filter_subject: config.filter_subject,
646 #[cfg(feature = "server_2_10")]
647 filter_subjects: config.filter_subjects,
648 replay_policy: config.replay_policy,
649 rate_limit: config.rate_limit,
650 sample_frequency: config.sample_frequency,
651 max_waiting: config.max_waiting,
652 max_ack_pending: 0,
653 headers_only: config.headers_only,
654 max_batch: config.max_batch,
655 max_bytes: config.max_bytes,
656 max_expires: config.max_expires,
657 inactive_threshold: Duration::from_secs(30),
658 num_replicas: 1,
659 memory_storage: true,
660 #[cfg(feature = "server_2_10")]
661 metadata: config.metadata,
662 backoff: Vec::new(),
663 }
664 }
665}
666
667impl FromConsumer for OrderedConfig {
668 fn try_from_consumer_config(
669 config: crate::jetstream::consumer::Config,
670 ) -> Result<Self, crate::Error>
671 where
672 Self: Sized,
673 {
674 Ok(OrderedConfig {
675 name: config.name,
676 description: config.description,
677 filter_subject: config.filter_subject,
678 #[cfg(feature = "server_2_10")]
679 filter_subjects: config.filter_subjects,
680 replay_policy: config.replay_policy,
681 rate_limit: config.rate_limit,
682 sample_frequency: config.sample_frequency,
683 headers_only: config.headers_only,
684 deliver_policy: config.deliver_policy,
685 max_waiting: config.max_waiting,
686 #[cfg(feature = "server_2_10")]
687 metadata: config.metadata,
688 max_batch: config.max_batch,
689 max_bytes: config.max_bytes,
690 max_expires: config.max_expires,
691 })
692 }
693}
694
695impl IntoConsumerConfig for OrderedConfig {
696 fn into_consumer_config(self) -> super::Config {
697 jetstream::consumer::Config {
698 deliver_subject: None,
699 durable_name: None,
700 name: self.name,
701 description: self.description,
702 deliver_group: None,
703 deliver_policy: self.deliver_policy,
704 ack_policy: AckPolicy::None,
705 ack_wait: Duration::default(),
706 max_deliver: 1,
707 filter_subject: self.filter_subject,
708 #[cfg(feature = "server_2_10")]
709 filter_subjects: self.filter_subjects,
710 replay_policy: self.replay_policy,
711 rate_limit: self.rate_limit,
712 sample_frequency: self.sample_frequency,
713 max_waiting: self.max_waiting,
714 max_ack_pending: 0,
715 headers_only: self.headers_only,
716 flow_control: false,
717 idle_heartbeat: Duration::default(),
718 max_batch: 0,
719 max_bytes: 0,
720 max_expires: Duration::default(),
721 inactive_threshold: Duration::from_secs(30),
722 num_replicas: 1,
723 memory_storage: true,
724 #[cfg(feature = "server_2_10")]
725 metadata: self.metadata,
726 backoff: Vec::new(),
727 }
728 }
729}
730
731pub struct Ordered {
732 context: Context,
733 stream_name: String,
734 consumer: OrderedConfig,
735 consumer_name: String,
736 stream: Option<Stream>,
737 create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
738 consumer_sequence: u64,
739 stream_sequence: u64,
740 missed_heartbeats: bool,
741}
742
743impl futures::Stream for Ordered {
744 type Item = Result<jetstream::Message, OrderedError>;
745
746 fn poll_next(
747 mut self: Pin<&mut Self>,
748 cx: &mut std::task::Context<'_>,
749 ) -> Poll<Option<Self::Item>> {
750 let mut recreate = false;
751 // Poll messages
752 if let Some(stream) = self.stream.as_mut() {
753 match stream.poll_next_unpin(cx) {
754 Poll::Ready(message) => match message {
755 Some(message) => match message {
756 Ok(message) => {
757 self.missed_heartbeats = false;
758 let info = message.info().map_err(|err| {
759 OrderedError::with_source(OrderedErrorKind::Other, err)
760 })?;
761 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
762 self.consumer_sequence,
763 self.stream_sequence,
764 info.consumer_sequence,
765 info.stream_sequence);
766 if info.consumer_sequence != self.consumer_sequence + 1 {
767 debug!(
768 "ordered consumer mismatch. current {}, info: {}",
769 self.consumer_sequence, info.consumer_sequence
770 );
771 recreate = true;
772 self.consumer_sequence = 0;
773 } else {
774 self.stream_sequence = info.stream_sequence;
775 self.consumer_sequence = info.consumer_sequence;
776 return Poll::Ready(Some(Ok(message)));
777 }
778 }
779 Err(err) => match err.kind() {
780 MessagesErrorKind::MissingHeartbeat => {
781 // If we have missed heartbeats set, it means this is a second
782 // missed heartbeat, so we need to recreate consumer.
783 if self.missed_heartbeats {
784 self.consumer_sequence = 0;
785 recreate = true;
786 } else {
787 self.missed_heartbeats = true;
788 }
789 }
790 MessagesErrorKind::ConsumerDeleted => {
791 recreate = true;
792 self.consumer_sequence = 0;
793 }
794 MessagesErrorKind::Pull
795 | MessagesErrorKind::PushBasedConsumer
796 | MessagesErrorKind::Other => {
797 return Poll::Ready(Some(Err(err.into())));
798 }
799 },
800 },
801 None => return Poll::Ready(None),
802 },
803 Poll::Pending => (),
804 }
805 }
806 // Recreate consumer if needed
807 if recreate {
808 self.stream = None;
809 self.create_stream = Some(Box::pin({
810 let context = self.context.clone();
811 let config = self.consumer.clone();
812 let stream_name = self.stream_name.clone();
813 let consumer_name = self.consumer_name.clone();
814 let sequence = self.stream_sequence;
815 async move {
816 tryhard::retry_fn(|| {
817 recreate_consumer_stream(
818 &context,
819 &config,
820 &stream_name,
821 &consumer_name,
822 sequence,
823 )
824 })
825 .retries(5)
826 .exponential_backoff(Duration::from_millis(500))
827 .await
828 }
829 }))
830 }
831 // check for recreation future
832 if let Some(result) = self.create_stream.as_mut() {
833 match result.poll_unpin(cx) {
834 Poll::Ready(result) => match result {
835 Ok(stream) => {
836 self.create_stream = None;
837 self.stream = Some(stream);
838 return self.poll_next(cx);
839 }
840 Err(err) => {
841 return Poll::Ready(Some(Err(OrderedError::with_source(
842 OrderedErrorKind::Recreate,
843 err,
844 ))))
845 }
846 },
847 Poll::Pending => (),
848 }
849 }
850 Poll::Pending
851 }
852}
853
854pub struct Stream {
855 pending_messages: usize,
856 pending_bytes: usize,
857 request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
858 request_tx: tokio::sync::watch::Sender<()>,
859 subscriber: Subscriber,
860 batch_config: BatchConfig,
861 context: Context,
862 pending_request: bool,
863 task_handle: JoinHandle<()>,
864 terminated: bool,
865 heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
866}
867
868impl Drop for Stream {
869 fn drop(&mut self) {
870 self.task_handle.abort();
871 }
872}
873
874impl Stream {
875 async fn stream(
876 batch_config: BatchConfig,
877 consumer: &Consumer<Config>,
878 ) -> Result<Stream, StreamError> {
879 let inbox = consumer.context.client.new_inbox();
880 let subscription = consumer
881 .context
882 .client
883 .subscribe(inbox.clone())
884 .await
885 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
886 let subject = format!(
887 "{}.CONSUMER.MSG.NEXT.{}.{}",
888 consumer.context.prefix, consumer.info.stream_name, consumer.info.name
889 );
890
891 let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
892 let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
893 let task_handle = tokio::task::spawn({
894 let batch = batch_config;
895 let consumer = consumer.clone();
896 let mut context = consumer.context.clone();
897 let inbox = inbox.clone();
898 async move {
899 loop {
900 // this is just in edge case of missing response for some reason.
901 let expires = batch_config
902 .expires
903 .map(|expires| {
904 if expires.is_zero() {
905 Either::Left(future::pending())
906 } else {
907 Either::Right(tokio::time::sleep(
908 expires.saturating_add(Duration::from_secs(5)),
909 ))
910 }
911 })
912 .unwrap_or_else(|| Either::Left(future::pending()));
913 // Need to check previous state, as `changed` will always fire on first
914 // call.
915 let prev_state = context.client.state.borrow().to_owned();
916 let mut pending_reset = false;
917
918 tokio::select! {
919 _ = context.client.state.changed() => {
920 let state = context.client.state.borrow().to_owned();
921 if !(state == crate::connection::State::Connected
922 && prev_state != State::Connected) {
923 continue;
924 }
925 debug!("detected !Connected -> Connected state change");
926
927 match tryhard::retry_fn(|| consumer.fetch_info()).retries(5).exponential_backoff(Duration::from_millis(500)).await {
928 Ok(info) => {
929 if info.num_waiting == 0 {
930 pending_reset = true;
931 }
932 }
933 Err(err) => {
934 if let Err(err) = request_result_tx.send(Err(err)).await {
935 debug!("failed to sent request result: {}", err);
936 }
937 },
938 }
939 },
940 _ = request_rx.changed() => debug!("task received request request"),
941 _ = expires => {
942 pending_reset = true;
943 debug!("expired pull request")},
944 }
945
946 let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
947 let result = context
948 .client
949 .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
950 .await
951 .map(|_| pending_reset);
952 // TODO: add tracing instead of ignoring this.
953 request_result_tx
954 .send(result.map(|_| pending_reset).map_err(|err| {
955 crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
956 .into()
957 }))
958 .await
959 .unwrap();
960 trace!("result send over tx");
961 }
962 }
963 });
964
965 Ok(Stream {
966 task_handle,
967 request_result_rx,
968 request_tx,
969 batch_config,
970 pending_messages: 0,
971 pending_bytes: 0,
972 subscriber: subscription,
973 context: consumer.context.clone(),
974 pending_request: false,
975 terminated: false,
976 heartbeat_timeout: None,
977 })
978 }
979}
980
981#[derive(Clone, Copy, Debug, PartialEq)]
982pub enum OrderedErrorKind {
983 MissingHeartbeat,
984 ConsumerDeleted,
985 Pull,
986 PushBasedConsumer,
987 Recreate,
988 Other,
989}
990
991impl std::fmt::Display for OrderedErrorKind {
992 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
993 match self {
994 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
995 Self::ConsumerDeleted => write!(f, "consumer deleted"),
996 Self::Pull => write!(f, "pull request failed"),
997 Self::Other => write!(f, "error"),
998 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
999 Self::Recreate => write!(f, "consumer recreation failed"),
1000 }
1001 }
1002}
1003
1004pub type OrderedError = Error<OrderedErrorKind>;
1005
1006impl From<MessagesError> for OrderedError {
1007 fn from(err: MessagesError) -> Self {
1008 match err.kind() {
1009 MessagesErrorKind::MissingHeartbeat => {
1010 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1011 }
1012 MessagesErrorKind::ConsumerDeleted => {
1013 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1014 }
1015 MessagesErrorKind::Pull => OrderedError {
1016 kind: OrderedErrorKind::Pull,
1017 source: err.source,
1018 },
1019 MessagesErrorKind::PushBasedConsumer => {
1020 OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1021 }
1022 MessagesErrorKind::Other => OrderedError {
1023 kind: OrderedErrorKind::Other,
1024 source: err.source,
1025 },
1026 }
1027 }
1028}
1029
1030#[derive(Clone, Copy, Debug, PartialEq)]
1031pub enum MessagesErrorKind {
1032 MissingHeartbeat,
1033 ConsumerDeleted,
1034 Pull,
1035 PushBasedConsumer,
1036 Other,
1037}
1038
1039impl std::fmt::Display for MessagesErrorKind {
1040 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1041 match self {
1042 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1043 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1044 Self::Pull => write!(f, "pull request failed"),
1045 Self::Other => write!(f, "error"),
1046 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1047 }
1048 }
1049}
1050
1051pub type MessagesError = Error<MessagesErrorKind>;
1052
1053impl futures::Stream for Stream {
1054 type Item = Result<jetstream::Message, MessagesError>;
1055
1056 fn poll_next(
1057 mut self: std::pin::Pin<&mut Self>,
1058 cx: &mut std::task::Context<'_>,
1059 ) -> std::task::Poll<Option<Self::Item>> {
1060 if self.terminated {
1061 return Poll::Ready(None);
1062 }
1063
1064 if !self.batch_config.idle_heartbeat.is_zero() {
1065 trace!("checking idle hearbeats");
1066 let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1067 match self
1068 .heartbeat_timeout
1069 .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1070 .poll_unpin(cx)
1071 {
1072 Poll::Ready(_) => {
1073 self.heartbeat_timeout = None;
1074 return Poll::Ready(Some(Err(MessagesError::new(
1075 MessagesErrorKind::MissingHeartbeat,
1076 ))));
1077 }
1078 Poll::Pending => (),
1079 }
1080 }
1081
1082 loop {
1083 trace!("pending messages: {}", self.pending_messages);
1084 if (self.pending_messages <= self.batch_config.batch / 2
1085 || (self.batch_config.max_bytes > 0
1086 && self.pending_bytes <= self.batch_config.max_bytes / 2))
1087 && !self.pending_request
1088 {
1089 debug!("pending messages reached threshold to send new fetch request");
1090 self.request_tx.send(()).unwrap();
1091 self.pending_request = true;
1092 }
1093
1094 match self.request_result_rx.poll_recv(cx) {
1095 Poll::Ready(resp) => match resp {
1096 Some(resp) => match resp {
1097 Ok(reset) => {
1098 trace!("request response: {:?}", reset);
1099 debug!("request sent, setting pending messages");
1100 if reset {
1101 self.pending_messages = self.batch_config.batch;
1102 self.pending_bytes = self.batch_config.max_bytes;
1103 } else {
1104 self.pending_messages += self.batch_config.batch;
1105 self.pending_bytes += self.batch_config.max_bytes;
1106 }
1107 self.pending_request = false;
1108 continue;
1109 }
1110 Err(err) => {
1111 return Poll::Ready(Some(Err(MessagesError::with_source(
1112 MessagesErrorKind::Pull,
1113 err,
1114 ))))
1115 }
1116 },
1117 None => return Poll::Ready(None),
1118 },
1119 Poll::Pending => {
1120 trace!("pending result");
1121 }
1122 }
1123
1124 trace!("polling subscriber");
1125 match self.subscriber.receiver.poll_recv(cx) {
1126 Poll::Ready(maybe_message) => {
1127 self.heartbeat_timeout = None;
1128 match maybe_message {
1129 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1130 StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1131 debug!("received status message: {:?}", message);
1132 // If consumer has been deleted, error and shutdown the iterator.
1133 if message.description.as_deref() == Some("Consumer Deleted") {
1134 self.terminated = true;
1135 return Poll::Ready(Some(Err(MessagesError::new(
1136 MessagesErrorKind::ConsumerDeleted,
1137 ))));
1138 }
1139 // If consumer is not pull based, error and shutdown the iterator.
1140 if message.description.as_deref() == Some("Consumer is push based")
1141 {
1142 self.terminated = true;
1143 return Poll::Ready(Some(Err(MessagesError::new(
1144 MessagesErrorKind::PushBasedConsumer,
1145 ))));
1146 }
1147
1148 // Do accounting for messages left after terminated/completed pull request.
1149 let pending_messages = message
1150 .headers
1151 .as_ref()
1152 .and_then(|headers| headers.get("Nats-Pending-Messages"))
1153 .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1154 .map_err(|err| {
1155 MessagesError::with_source(MessagesErrorKind::Other, err)
1156 })?;
1157
1158 let pending_bytes = message
1159 .headers
1160 .as_ref()
1161 .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1162 .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1163 .map_err(|err| {
1164 MessagesError::with_source(MessagesErrorKind::Other, err)
1165 })?;
1166
1167 debug!(
1168 "timeout reached. remaining messages: {}, bytes {}",
1169 pending_messages, pending_bytes
1170 );
1171 self.pending_messages =
1172 self.pending_messages.saturating_sub(pending_messages);
1173 trace!("message bytes len: {}", pending_bytes);
1174 self.pending_bytes =
1175 self.pending_bytes.saturating_sub(pending_bytes);
1176 continue;
1177 }
1178 // Idle Hearbeat means we have no messages, but consumer is fine.
1179 StatusCode::IDLE_HEARTBEAT => {
1180 debug!("received idle heartbeat");
1181 continue;
1182 }
1183 // We got an message from a stream.
1184 StatusCode::OK => {
1185 trace!("message received");
1186 self.pending_messages = self.pending_messages.saturating_sub(1);
1187 self.pending_bytes =
1188 self.pending_bytes.saturating_sub(message.length);
1189 return Poll::Ready(Some(Ok(jetstream::Message {
1190 context: self.context.clone(),
1191 message,
1192 })));
1193 }
1194 status => {
1195 debug!("received unknown message: {:?}", message);
1196 return Poll::Ready(Some(Err(MessagesError::with_source(
1197 MessagesErrorKind::Other,
1198 format!(
1199 "error while processing messages from the stream: {}, {:?}",
1200 status, message.description
1201 ),
1202 ))));
1203 }
1204 },
1205 None => return Poll::Ready(None),
1206 }
1207 }
1208 Poll::Pending => {
1209 debug!("subscriber still pending");
1210 return std::task::Poll::Pending;
1211 }
1212 }
1213 }
1214 }
1215}
1216
1217/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1218///
1219/// # Examples
1220///
1221/// ```no_run
1222/// # #[tokio::main]
1223/// # async fn main() -> Result<(), async_nats::Error> {
1224/// use futures::StreamExt;
1225/// use async_nats::jetstream::consumer::PullConsumer;
1226/// let client = async_nats::connect("localhost:4222").await?;
1227/// let jetstream = async_nats::jetstream::new(client);
1228///
1229/// let consumer: PullConsumer = jetstream
1230/// .get_stream("events").await?
1231/// .get_consumer("pull").await?;
1232///
1233/// let mut messages = consumer.stream()
1234/// .max_messages_per_batch(100)
1235/// .max_bytes_per_batch(1024)
1236/// .messages().await?;
1237///
1238/// while let Some(message) = messages.next().await {
1239/// let message = message?;
1240/// println!("message: {:?}", message);
1241/// message.ack().await?;
1242/// }
1243/// # Ok(())
1244/// # }
1245pub struct StreamBuilder<'a> {
1246 batch: usize,
1247 max_bytes: usize,
1248 heartbeat: Duration,
1249 expires: Duration,
1250 consumer: &'a Consumer<Config>,
1251}
1252
1253impl<'a> StreamBuilder<'a> {
1254 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1255 StreamBuilder {
1256 consumer,
1257 batch: 200,
1258 max_bytes: 0,
1259 expires: Duration::from_secs(30),
1260 heartbeat: Duration::default(),
1261 }
1262 }
1263
1264 /// Sets max bytes that can be buffered on the Client while processing already received
1265 /// messages.
1266 /// Higher values will yield better performance, but also potentially increase memory usage if
1267 /// application is acknowledging messages much slower than they arrive.
1268 ///
1269 /// Default values should provide reasonable balance between performance and memory usage.
1270 ///
1271 /// # Examples
1272 ///
1273 /// ```no_run
1274 /// # #[tokio::main]
1275 /// # async fn main() -> Result<(), async_nats::Error> {
1276 /// use async_nats::jetstream::consumer::PullConsumer;
1277 /// use futures::StreamExt;
1278 /// let client = async_nats::connect("localhost:4222").await?;
1279 /// let jetstream = async_nats::jetstream::new(client);
1280 ///
1281 /// let consumer: PullConsumer = jetstream
1282 /// .get_stream("events")
1283 /// .await?
1284 /// .get_consumer("pull")
1285 /// .await?;
1286 ///
1287 /// let mut messages = consumer
1288 /// .stream()
1289 /// .max_bytes_per_batch(1024)
1290 /// .messages()
1291 /// .await?;
1292 ///
1293 /// while let Some(message) = messages.next().await {
1294 /// let message = message?;
1295 /// println!("message: {:?}", message);
1296 /// message.ack().await?;
1297 /// }
1298 /// # Ok(())
1299 /// # }
1300 /// ```
1301 pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1302 self.max_bytes = max_bytes;
1303 self
1304 }
1305
1306 /// Sets max number of messages that can be buffered on the Client while processing already received
1307 /// messages.
1308 /// Higher values will yield better performance, but also potentially increase memory usage if
1309 /// application is acknowledging messages much slower than they arrive.
1310 ///
1311 /// Default values should provide reasonable balance between performance and memory usage.
1312 ///
1313 /// # Examples
1314 ///
1315 /// ```no_run
1316 /// # #[tokio::main]
1317 /// # async fn main() -> Result<(), async_nats::Error> {
1318 /// use async_nats::jetstream::consumer::PullConsumer;
1319 /// use futures::StreamExt;
1320 /// let client = async_nats::connect("localhost:4222").await?;
1321 /// let jetstream = async_nats::jetstream::new(client);
1322 ///
1323 /// let consumer: PullConsumer = jetstream
1324 /// .get_stream("events")
1325 /// .await?
1326 /// .get_consumer("pull")
1327 /// .await?;
1328 ///
1329 /// let mut messages = consumer
1330 /// .stream()
1331 /// .max_messages_per_batch(100)
1332 /// .messages()
1333 /// .await?;
1334 ///
1335 /// while let Some(message) = messages.next().await {
1336 /// let message = message?;
1337 /// println!("message: {:?}", message);
1338 /// message.ack().await?;
1339 /// }
1340 /// # Ok(())
1341 /// # }
1342 /// ```
1343 pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1344 self.batch = batch;
1345 self
1346 }
1347
1348 /// Sets heartbeat which will be send by the server if there are no messages for a given
1349 /// [Consumer] pending.
1350 ///
1351 /// # Examples
1352 ///
1353 /// ```no_run
1354 /// # #[tokio::main]
1355 /// # async fn main() -> Result<(), async_nats::Error> {
1356 /// use async_nats::jetstream::consumer::PullConsumer;
1357 /// use futures::StreamExt;
1358 /// let client = async_nats::connect("localhost:4222").await?;
1359 /// let jetstream = async_nats::jetstream::new(client);
1360 ///
1361 /// let consumer: PullConsumer = jetstream
1362 /// .get_stream("events")
1363 /// .await?
1364 /// .get_consumer("pull")
1365 /// .await?;
1366 ///
1367 /// let mut messages = consumer
1368 /// .stream()
1369 /// .heartbeat(std::time::Duration::from_secs(10))
1370 /// .messages()
1371 /// .await?;
1372 ///
1373 /// while let Some(message) = messages.next().await {
1374 /// let message = message?;
1375 /// println!("message: {:?}", message);
1376 /// message.ack().await?;
1377 /// }
1378 /// # Ok(())
1379 /// # }
1380 /// ```
1381 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1382 self.heartbeat = heartbeat;
1383 self
1384 }
1385
1386 /// Low level API that does not need tweaking for most use cases.
1387 /// Sets how long each batch request waits for whole batch of messages before timing out.
1388 /// [Consumer] pending.
1389 ///
1390 /// # Examples
1391 ///
1392 /// ```no_run
1393 /// # #[tokio::main]
1394 /// # async fn main() -> Result<(), async_nats::Error> {
1395 /// use async_nats::jetstream::consumer::PullConsumer;
1396 /// use futures::StreamExt;
1397 /// let client = async_nats::connect("localhost:4222").await?;
1398 /// let jetstream = async_nats::jetstream::new(client);
1399 ///
1400 /// let consumer: PullConsumer = jetstream
1401 /// .get_stream("events")
1402 /// .await?
1403 /// .get_consumer("pull")
1404 /// .await?;
1405 ///
1406 /// let mut messages = consumer
1407 /// .stream()
1408 /// .expires(std::time::Duration::from_secs(30))
1409 /// .messages()
1410 /// .await?;
1411 ///
1412 /// while let Some(message) = messages.next().await {
1413 /// let message = message?;
1414 /// println!("message: {:?}", message);
1415 /// message.ack().await?;
1416 /// }
1417 /// # Ok(())
1418 /// # }
1419 /// ```
1420 pub fn expires(mut self, expires: Duration) -> Self {
1421 self.expires = expires;
1422 self
1423 }
1424
1425 /// Creates actual [Stream] with provided configuration.
1426 ///
1427 /// # Examples
1428 ///
1429 /// ```no_run
1430 /// # #[tokio::main]
1431 /// # async fn main() -> Result<(), async_nats::Error> {
1432 /// use async_nats::jetstream::consumer::PullConsumer;
1433 /// use futures::StreamExt;
1434 /// let client = async_nats::connect("localhost:4222").await?;
1435 /// let jetstream = async_nats::jetstream::new(client);
1436 ///
1437 /// let consumer: PullConsumer = jetstream
1438 /// .get_stream("events")
1439 /// .await?
1440 /// .get_consumer("pull")
1441 /// .await?;
1442 ///
1443 /// let mut messages = consumer
1444 /// .stream()
1445 /// .max_messages_per_batch(100)
1446 /// .messages()
1447 /// .await?;
1448 ///
1449 /// while let Some(message) = messages.next().await {
1450 /// let message = message?;
1451 /// println!("message: {:?}", message);
1452 /// message.ack().await?;
1453 /// }
1454 /// # Ok(())
1455 /// # }
1456 /// ```
1457 pub async fn messages(self) -> Result<Stream, StreamError> {
1458 Stream::stream(
1459 BatchConfig {
1460 batch: self.batch,
1461 expires: Some(self.expires),
1462 no_wait: false,
1463 max_bytes: self.max_bytes,
1464 idle_heartbeat: self.heartbeat,
1465 },
1466 self.consumer,
1467 )
1468 .await
1469 }
1470}
1471
1472/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1473///
1474/// # Examples
1475///
1476/// ```no_run
1477/// # #[tokio::main]
1478/// # async fn main() -> Result<(), async_nats::Error> {
1479/// use async_nats::jetstream::consumer::PullConsumer;
1480/// use futures::StreamExt;
1481/// let client = async_nats::connect("localhost:4222").await?;
1482/// let jetstream = async_nats::jetstream::new(client);
1483///
1484/// let consumer: PullConsumer = jetstream
1485/// .get_stream("events")
1486/// .await?
1487/// .get_consumer("pull")
1488/// .await?;
1489///
1490/// let mut messages = consumer
1491/// .fetch()
1492/// .max_messages(100)
1493/// .max_bytes(1024)
1494/// .messages()
1495/// .await?;
1496///
1497/// while let Some(message) = messages.next().await {
1498/// let message = message?;
1499/// println!("message: {:?}", message);
1500/// message.ack().await?;
1501/// }
1502/// # Ok(())
1503/// # }
1504/// ```
1505pub struct FetchBuilder<'a> {
1506 batch: usize,
1507 max_bytes: usize,
1508 heartbeat: Duration,
1509 expires: Option<Duration>,
1510 consumer: &'a Consumer<Config>,
1511}
1512
1513impl<'a> FetchBuilder<'a> {
1514 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1515 FetchBuilder {
1516 consumer,
1517 batch: 200,
1518 max_bytes: 0,
1519 expires: None,
1520 heartbeat: Duration::default(),
1521 }
1522 }
1523
1524 /// Sets max bytes that can be buffered on the Client while processing already received
1525 /// messages.
1526 /// Higher values will yield better performance, but also potentially increase memory usage if
1527 /// application is acknowledging messages much slower than they arrive.
1528 ///
1529 /// Default values should provide reasonable balance between performance and memory usage.
1530 ///
1531 /// # Examples
1532 ///
1533 /// ```no_run
1534 /// # #[tokio::main]
1535 /// # async fn main() -> Result<(), async_nats::Error> {
1536 /// use futures::StreamExt;
1537 /// let client = async_nats::connect("localhost:4222").await?;
1538 /// let jetstream = async_nats::jetstream::new(client);
1539 ///
1540 /// let consumer = jetstream
1541 /// .get_stream("events")
1542 /// .await?
1543 /// .get_consumer("pull")
1544 /// .await?;
1545 ///
1546 /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1547 ///
1548 /// while let Some(message) = messages.next().await {
1549 /// let message = message?;
1550 /// println!("message: {:?}", message);
1551 /// message.ack().await?;
1552 /// }
1553 /// # Ok(())
1554 /// # }
1555 /// ```
1556 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1557 self.max_bytes = max_bytes;
1558 self
1559 }
1560
1561 /// Sets max number of messages that can be buffered on the Client while processing already received
1562 /// messages.
1563 /// Higher values will yield better performance, but also potentially increase memory usage if
1564 /// application is acknowledging messages much slower than they arrive.
1565 ///
1566 /// Default values should provide reasonable balance between performance and memory usage.
1567 ///
1568 /// # Examples
1569 ///
1570 /// ```no_run
1571 /// # #[tokio::main]
1572 /// # async fn main() -> Result<(), async_nats::Error> {
1573 /// use futures::StreamExt;
1574 /// let client = async_nats::connect("localhost:4222").await?;
1575 /// let jetstream = async_nats::jetstream::new(client);
1576 ///
1577 /// let consumer = jetstream
1578 /// .get_stream("events")
1579 /// .await?
1580 /// .get_consumer("pull")
1581 /// .await?;
1582 ///
1583 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1584 ///
1585 /// while let Some(message) = messages.next().await {
1586 /// let message = message?;
1587 /// println!("message: {:?}", message);
1588 /// message.ack().await?;
1589 /// }
1590 /// # Ok(())
1591 /// # }
1592 /// ```
1593 pub fn max_messages(mut self, batch: usize) -> Self {
1594 self.batch = batch;
1595 self
1596 }
1597
1598 /// Sets heartbeat which will be send by the server if there are no messages for a given
1599 /// [Consumer] pending.
1600 ///
1601 /// # Examples
1602 ///
1603 /// ```no_run
1604 /// # #[tokio::main]
1605 /// # async fn main() -> Result<(), async_nats::Error> {
1606 /// use async_nats::jetstream::consumer::PullConsumer;
1607 /// use futures::StreamExt;
1608 /// let client = async_nats::connect("localhost:4222").await?;
1609 /// let jetstream = async_nats::jetstream::new(client);
1610 ///
1611 /// let consumer = jetstream
1612 /// .get_stream("events")
1613 /// .await?
1614 /// .get_consumer("pull")
1615 /// .await?;
1616 ///
1617 /// let mut messages = consumer
1618 /// .fetch()
1619 /// .heartbeat(std::time::Duration::from_secs(10))
1620 /// .messages()
1621 /// .await?;
1622 ///
1623 /// while let Some(message) = messages.next().await {
1624 /// let message = message?;
1625 /// println!("message: {:?}", message);
1626 /// message.ack().await?;
1627 /// }
1628 /// # Ok(())
1629 /// # }
1630 /// ```
1631 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1632 self.heartbeat = heartbeat;
1633 self
1634 }
1635
1636 /// Low level API that does not need tweaking for most use cases.
1637 /// Sets how long each batch request waits for whole batch of messages before timing out.
1638 /// [Consumer] pending.
1639 ///
1640 /// # Examples
1641 ///
1642 /// ```no_run
1643 /// # #[tokio::main]
1644 /// # async fn main() -> Result<(), async_nats::Error> {
1645 /// use async_nats::jetstream::consumer::PullConsumer;
1646 /// use futures::StreamExt;
1647 ///
1648 /// let client = async_nats::connect("localhost:4222").await?;
1649 /// let jetstream = async_nats::jetstream::new(client);
1650 ///
1651 /// let consumer: PullConsumer = jetstream
1652 /// .get_stream("events")
1653 /// .await?
1654 /// .get_consumer("pull")
1655 /// .await?;
1656 ///
1657 /// let mut messages = consumer
1658 /// .fetch()
1659 /// .expires(std::time::Duration::from_secs(30))
1660 /// .messages()
1661 /// .await?;
1662 ///
1663 /// while let Some(message) = messages.next().await {
1664 /// let message = message?;
1665 /// println!("message: {:?}", message);
1666 /// message.ack().await?;
1667 /// }
1668 /// # Ok(())
1669 /// # }
1670 /// ```
1671 pub fn expires(mut self, expires: Duration) -> Self {
1672 self.expires = Some(expires);
1673 self
1674 }
1675
1676 /// Creates actual [Stream] with provided configuration.
1677 ///
1678 /// # Examples
1679 ///
1680 /// ```no_run
1681 /// # #[tokio::main]
1682 /// # async fn main() -> Result<(), async_nats::Error> {
1683 /// use async_nats::jetstream::consumer::PullConsumer;
1684 /// use futures::StreamExt;
1685 /// let client = async_nats::connect("localhost:4222").await?;
1686 /// let jetstream = async_nats::jetstream::new(client);
1687 ///
1688 /// let consumer: PullConsumer = jetstream
1689 /// .get_stream("events")
1690 /// .await?
1691 /// .get_consumer("pull")
1692 /// .await?;
1693 ///
1694 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1695 ///
1696 /// while let Some(message) = messages.next().await {
1697 /// let message = message?;
1698 /// println!("message: {:?}", message);
1699 /// message.ack().await?;
1700 /// }
1701 /// # Ok(())
1702 /// # }
1703 /// ```
1704 pub async fn messages(self) -> Result<Batch, BatchError> {
1705 Batch::batch(
1706 BatchConfig {
1707 batch: self.batch,
1708 expires: self.expires,
1709 no_wait: true,
1710 max_bytes: self.max_bytes,
1711 idle_heartbeat: self.heartbeat,
1712 },
1713 self.consumer,
1714 )
1715 .await
1716 }
1717}
1718
1719/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
1720///
1721/// # Examples
1722///
1723/// ```no_run
1724/// # #[tokio::main]
1725/// # async fn main() -> Result<(), async_nats::Error> {
1726/// use async_nats::jetstream::consumer::PullConsumer;
1727/// use futures::StreamExt;
1728/// let client = async_nats::connect("localhost:4222").await?;
1729/// let jetstream = async_nats::jetstream::new(client);
1730///
1731/// let consumer: PullConsumer = jetstream
1732/// .get_stream("events")
1733/// .await?
1734/// .get_consumer("pull")
1735/// .await?;
1736///
1737/// let mut messages = consumer
1738/// .batch()
1739/// .max_messages(100)
1740/// .max_bytes(1024)
1741/// .messages()
1742/// .await?;
1743///
1744/// while let Some(message) = messages.next().await {
1745/// let message = message?;
1746/// println!("message: {:?}", message);
1747/// message.ack().await?;
1748/// }
1749/// # Ok(())
1750/// # }
1751/// ```
1752pub struct BatchBuilder<'a> {
1753 batch: usize,
1754 max_bytes: usize,
1755 heartbeat: Duration,
1756 expires: Duration,
1757 consumer: &'a Consumer<Config>,
1758}
1759
1760impl<'a> BatchBuilder<'a> {
1761 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1762 BatchBuilder {
1763 consumer,
1764 batch: 200,
1765 max_bytes: 0,
1766 expires: Duration::ZERO,
1767 heartbeat: Duration::default(),
1768 }
1769 }
1770
1771 /// Sets max bytes that can be buffered on the Client while processing already received
1772 /// messages.
1773 /// Higher values will yield better performance, but also potentially increase memory usage if
1774 /// application is acknowledging messages much slower than they arrive.
1775 ///
1776 /// Default values should provide reasonable balance between performance and memory usage.
1777 ///
1778 /// # Examples
1779 ///
1780 /// ```no_run
1781 /// # #[tokio::main]
1782 /// # async fn main() -> Result<(), async_nats::Error> {
1783 /// use async_nats::jetstream::consumer::PullConsumer;
1784 /// use futures::StreamExt;
1785 /// let client = async_nats::connect("localhost:4222").await?;
1786 /// let jetstream = async_nats::jetstream::new(client);
1787 ///
1788 /// let consumer: PullConsumer = jetstream
1789 /// .get_stream("events")
1790 /// .await?
1791 /// .get_consumer("pull")
1792 /// .await?;
1793 ///
1794 /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
1795 ///
1796 /// while let Some(message) = messages.next().await {
1797 /// let message = message?;
1798 /// println!("message: {:?}", message);
1799 /// message.ack().await?;
1800 /// }
1801 /// # Ok(())
1802 /// # }
1803 /// ```
1804 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1805 self.max_bytes = max_bytes;
1806 self
1807 }
1808
1809 /// Sets max number of messages that can be buffered on the Client while processing already received
1810 /// messages.
1811 /// Higher values will yield better performance, but also potentially increase memory usage if
1812 /// application is acknowledging messages much slower than they arrive.
1813 ///
1814 /// Default values should provide reasonable balance between performance and memory usage.
1815 ///
1816 /// # Examples
1817 ///
1818 /// ```no_run
1819 /// # #[tokio::main]
1820 /// # async fn main() -> Result<(), async_nats::Error> {
1821 /// use async_nats::jetstream::consumer::PullConsumer;
1822 /// use futures::StreamExt;
1823 /// let client = async_nats::connect("localhost:4222").await?;
1824 /// let jetstream = async_nats::jetstream::new(client);
1825 ///
1826 /// let consumer: PullConsumer = jetstream
1827 /// .get_stream("events")
1828 /// .await?
1829 /// .get_consumer("pull")
1830 /// .await?;
1831 ///
1832 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1833 ///
1834 /// while let Some(message) = messages.next().await {
1835 /// let message = message?;
1836 /// println!("message: {:?}", message);
1837 /// message.ack().await?;
1838 /// }
1839 /// # Ok(())
1840 /// # }
1841 /// ```
1842 pub fn max_messages(mut self, batch: usize) -> Self {
1843 self.batch = batch;
1844 self
1845 }
1846
1847 /// Sets heartbeat which will be send by the server if there are no messages for a given
1848 /// [Consumer] pending.
1849 ///
1850 /// # Examples
1851 ///
1852 /// ```no_run
1853 /// # #[tokio::main]
1854 /// # async fn main() -> Result<(), async_nats::Error> {
1855 /// use async_nats::jetstream::consumer::PullConsumer;
1856 /// use futures::StreamExt;
1857 /// let client = async_nats::connect("localhost:4222").await?;
1858 /// let jetstream = async_nats::jetstream::new(client);
1859 ///
1860 /// let consumer: PullConsumer = jetstream
1861 /// .get_stream("events")
1862 /// .await?
1863 /// .get_consumer("pull")
1864 /// .await?;
1865 ///
1866 /// let mut messages = consumer
1867 /// .batch()
1868 /// .heartbeat(std::time::Duration::from_secs(10))
1869 /// .messages()
1870 /// .await?;
1871 ///
1872 /// while let Some(message) = messages.next().await {
1873 /// let message = message?;
1874 /// println!("message: {:?}", message);
1875 /// message.ack().await?;
1876 /// }
1877 /// # Ok(())
1878 /// # }
1879 /// ```
1880 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1881 self.heartbeat = heartbeat;
1882 self
1883 }
1884
1885 /// Low level API that does not need tweaking for most use cases.
1886 /// Sets how long each batch request waits for whole batch of messages before timing out.
1887 /// [Consumer] pending.
1888 ///
1889 /// # Examples
1890 ///
1891 /// ```no_run
1892 /// # #[tokio::main]
1893 /// # async fn main() -> Result<(), async_nats::Error> {
1894 /// use async_nats::jetstream::consumer::PullConsumer;
1895 /// use futures::StreamExt;
1896 /// let client = async_nats::connect("localhost:4222").await?;
1897 /// let jetstream = async_nats::jetstream::new(client);
1898 ///
1899 /// let consumer: PullConsumer = jetstream
1900 /// .get_stream("events")
1901 /// .await?
1902 /// .get_consumer("pull")
1903 /// .await?;
1904 ///
1905 /// let mut messages = consumer
1906 /// .batch()
1907 /// .expires(std::time::Duration::from_secs(30))
1908 /// .messages()
1909 /// .await?;
1910 ///
1911 /// while let Some(message) = messages.next().await {
1912 /// let message = message?;
1913 /// println!("message: {:?}", message);
1914 /// message.ack().await?;
1915 /// }
1916 /// # Ok(())
1917 /// # }
1918 /// ```
1919 pub fn expires(mut self, expires: Duration) -> Self {
1920 self.expires = expires;
1921 self
1922 }
1923
1924 /// Creates actual [Stream] with provided configuration.
1925 ///
1926 /// # Examples
1927 ///
1928 /// ```no_run
1929 /// # #[tokio::main]
1930 /// # async fn main() -> Result<(), async_nats::Error> {
1931 /// use async_nats::jetstream::consumer::PullConsumer;
1932 /// use futures::StreamExt;
1933 /// let client = async_nats::connect("localhost:4222").await?;
1934 /// let jetstream = async_nats::jetstream::new(client);
1935 ///
1936 /// let consumer: PullConsumer = jetstream
1937 /// .get_stream("events")
1938 /// .await?
1939 /// .get_consumer("pull")
1940 /// .await?;
1941 ///
1942 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
1943 ///
1944 /// while let Some(message) = messages.next().await {
1945 /// let message = message?;
1946 /// println!("message: {:?}", message);
1947 /// message.ack().await?;
1948 /// }
1949 /// # Ok(())
1950 /// # }
1951 /// ```
1952 pub async fn messages(self) -> Result<Batch, BatchError> {
1953 Batch::batch(
1954 BatchConfig {
1955 batch: self.batch,
1956 expires: Some(self.expires),
1957 no_wait: false,
1958 max_bytes: self.max_bytes,
1959 idle_heartbeat: self.heartbeat,
1960 },
1961 self.consumer,
1962 )
1963 .await
1964 }
1965}
1966
1967/// Used for next Pull Request for Pull Consumer
1968#[derive(Debug, Default, Serialize, Clone, Copy, PartialEq, Eq)]
1969pub struct BatchConfig {
1970 /// The number of messages that are being requested to be delivered.
1971 pub batch: usize,
1972 /// The optional number of nanoseconds that the server will store this next request for
1973 /// before forgetting about the pending batch size.
1974 #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
1975 pub expires: Option<Duration>,
1976 /// This optionally causes the server not to store this pending request at all, but when there are no
1977 /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
1978 /// can know when you reached the end of the stream for example. A 409 is returned if the
1979 /// Consumer has reached MaxAckPending limits.
1980 #[serde(skip_serializing_if = "is_default")]
1981 pub no_wait: bool,
1982
1983 /// Sets max number of bytes in total in given batch size. This works together with `batch`.
1984 /// Whichever value is reached first, batch will complete.
1985 pub max_bytes: usize,
1986
1987 /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
1988 /// client
1989 #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
1990 pub idle_heartbeat: Duration,
1991}
1992
1993fn is_default<T: Default + Eq>(t: &T) -> bool {
1994 t == &T::default()
1995}
1996
1997#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
1998pub struct Config {
1999 /// Setting `durable_name` to `Some(...)` will cause this consumer
2000 /// to be "durable". This may be a good choice for workloads that
2001 /// benefit from the `JetStream` server or cluster remembering the
2002 /// progress of consumers for fault tolerance purposes. If a consumer
2003 /// crashes, the `JetStream` server or cluster will remember which
2004 /// messages the consumer acknowledged. When the consumer recovers,
2005 /// this information will allow the consumer to resume processing
2006 /// where it left off. If you're unsure, set this to `Some(...)`.
2007 ///
2008 /// Setting `durable_name` to `None` will cause this consumer to
2009 /// be "ephemeral". This may be a good choice for workloads where
2010 /// you don't need the `JetStream` server to remember the consumer's
2011 /// progress in the case of a crash, such as certain "high churn"
2012 /// workloads or workloads where a crashed instance is not required
2013 /// to recover.
2014 #[serde(default, skip_serializing_if = "Option::is_none")]
2015 pub durable_name: Option<String>,
2016 /// A name of the consumer. Can be specified for both durable and ephemeral
2017 /// consumers.
2018 #[serde(default, skip_serializing_if = "Option::is_none")]
2019 pub name: Option<String>,
2020 /// A short description of the purpose of this consumer.
2021 #[serde(default, skip_serializing_if = "Option::is_none")]
2022 pub description: Option<String>,
2023 /// Allows for a variety of options that determine how this consumer will receive messages
2024 #[serde(flatten)]
2025 pub deliver_policy: DeliverPolicy,
2026 /// How messages should be acknowledged
2027 pub ack_policy: AckPolicy,
2028 /// How long to allow messages to remain un-acknowledged before attempting redelivery
2029 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2030 pub ack_wait: Duration,
2031 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2032 #[serde(default, skip_serializing_if = "is_default")]
2033 pub max_deliver: i64,
2034 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2035 #[serde(default, skip_serializing_if = "is_default")]
2036 pub filter_subject: String,
2037 #[cfg(feature = "server_2_10")]
2038 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2039 #[serde(default, skip_serializing_if = "is_default")]
2040 pub filter_subjects: Vec<String>,
2041 /// Whether messages are sent as quickly as possible or at the rate of receipt
2042 pub replay_policy: ReplayPolicy,
2043 /// The rate of message delivery in bits per second
2044 #[serde(default, skip_serializing_if = "is_default")]
2045 pub rate_limit: u64,
2046 /// What percentage of acknowledgments should be samples for observability, 0-100
2047 #[serde(default, skip_serializing_if = "is_default")]
2048 pub sample_frequency: u8,
2049 /// The maximum number of waiting consumers.
2050 #[serde(default, skip_serializing_if = "is_default")]
2051 pub max_waiting: i64,
2052 /// The maximum number of unacknowledged messages that may be
2053 /// in-flight before pausing sending additional messages to
2054 /// this consumer.
2055 #[serde(default, skip_serializing_if = "is_default")]
2056 pub max_ack_pending: i64,
2057 /// Only deliver headers without payloads.
2058 #[serde(default, skip_serializing_if = "is_default")]
2059 pub headers_only: bool,
2060 /// Maximum size of a request batch
2061 #[serde(default, skip_serializing_if = "is_default")]
2062 pub max_batch: i64,
2063 /// Maximum value of request max_bytes
2064 #[serde(default, skip_serializing_if = "is_default")]
2065 pub max_bytes: i64,
2066 /// Maximum value for request expiration
2067 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2068 pub max_expires: Duration,
2069 /// Threshold for consumer inactivity
2070 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2071 pub inactive_threshold: Duration,
2072 /// Number of consumer replicas
2073 #[serde(default, skip_serializing_if = "is_default")]
2074 pub num_replicas: usize,
2075 /// Force consumer to use memory storage.
2076 #[serde(default, skip_serializing_if = "is_default")]
2077 pub memory_storage: bool,
2078 #[cfg(feature = "server_2_10")]
2079 // Additional consumer metadata.
2080 #[serde(default, skip_serializing_if = "is_default")]
2081 pub metadata: HashMap<String, String>,
2082 /// Custom backoff for missed acknowledgments.
2083 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2084 pub backoff: Vec<Duration>,
2085}
2086
2087impl IntoConsumerConfig for &Config {
2088 fn into_consumer_config(self) -> consumer::Config {
2089 self.clone().into_consumer_config()
2090 }
2091}
2092
2093impl IntoConsumerConfig for Config {
2094 fn into_consumer_config(self) -> consumer::Config {
2095 jetstream::consumer::Config {
2096 deliver_subject: None,
2097 name: self.name,
2098 durable_name: self.durable_name,
2099 description: self.description,
2100 deliver_group: None,
2101 deliver_policy: self.deliver_policy,
2102 ack_policy: self.ack_policy,
2103 ack_wait: self.ack_wait,
2104 max_deliver: self.max_deliver,
2105 filter_subject: self.filter_subject,
2106 #[cfg(feature = "server_2_10")]
2107 filter_subjects: self.filter_subjects,
2108 replay_policy: self.replay_policy,
2109 rate_limit: self.rate_limit,
2110 sample_frequency: self.sample_frequency,
2111 max_waiting: self.max_waiting,
2112 max_ack_pending: self.max_ack_pending,
2113 headers_only: self.headers_only,
2114 flow_control: false,
2115 idle_heartbeat: Duration::default(),
2116 max_batch: self.max_batch,
2117 max_bytes: self.max_bytes,
2118 max_expires: self.max_expires,
2119 inactive_threshold: self.inactive_threshold,
2120 num_replicas: self.num_replicas,
2121 memory_storage: self.memory_storage,
2122 #[cfg(feature = "server_2_10")]
2123 metadata: self.metadata,
2124 backoff: self.backoff,
2125 }
2126 }
2127}
2128impl FromConsumer for Config {
2129 fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2130 if config.deliver_subject.is_some() {
2131 return Err(Box::new(std::io::Error::new(
2132 std::io::ErrorKind::Other,
2133 "pull consumer cannot have delivery subject",
2134 )));
2135 }
2136 Ok(Config {
2137 durable_name: config.durable_name,
2138 name: config.name,
2139 description: config.description,
2140 deliver_policy: config.deliver_policy,
2141 ack_policy: config.ack_policy,
2142 ack_wait: config.ack_wait,
2143 max_deliver: config.max_deliver,
2144 filter_subject: config.filter_subject,
2145 #[cfg(feature = "server_2_10")]
2146 filter_subjects: config.filter_subjects,
2147 replay_policy: config.replay_policy,
2148 rate_limit: config.rate_limit,
2149 sample_frequency: config.sample_frequency,
2150 max_waiting: config.max_waiting,
2151 max_ack_pending: config.max_ack_pending,
2152 headers_only: config.headers_only,
2153 max_batch: config.max_batch,
2154 max_bytes: config.max_bytes,
2155 max_expires: config.max_expires,
2156 inactive_threshold: config.inactive_threshold,
2157 num_replicas: config.num_replicas,
2158 memory_storage: config.memory_storage,
2159 #[cfg(feature = "server_2_10")]
2160 metadata: config.metadata,
2161 backoff: config.backoff,
2162 })
2163 }
2164}
2165
2166#[derive(Clone, Copy, Debug, PartialEq)]
2167pub enum BatchRequestErrorKind {
2168 Publish,
2169 Flush,
2170 Serialize,
2171}
2172
2173impl std::fmt::Display for BatchRequestErrorKind {
2174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2175 match self {
2176 Self::Publish => write!(f, "publish failed"),
2177 Self::Flush => write!(f, "flush failed"),
2178 Self::Serialize => write!(f, "serialize failed"),
2179 }
2180 }
2181}
2182
2183pub type BatchRequestError = Error<BatchRequestErrorKind>;
2184
2185#[derive(Clone, Copy, Debug, PartialEq)]
2186pub enum BatchErrorKind {
2187 Subscribe,
2188 Pull,
2189 Flush,
2190 Serialize,
2191}
2192
2193impl std::fmt::Display for BatchErrorKind {
2194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2195 match self {
2196 Self::Pull => write!(f, "pull request failed"),
2197 Self::Flush => write!(f, "flush failed"),
2198 Self::Serialize => write!(f, "serialize failed"),
2199 Self::Subscribe => write!(f, "subscribe failed"),
2200 }
2201 }
2202}
2203
2204pub type BatchError = Error<BatchErrorKind>;
2205
2206impl From<SubscribeError> for BatchError {
2207 fn from(err: SubscribeError) -> Self {
2208 BatchError::with_source(BatchErrorKind::Subscribe, err)
2209 }
2210}
2211
2212impl From<BatchRequestError> for BatchError {
2213 fn from(err: BatchRequestError) -> Self {
2214 BatchError::with_source(BatchErrorKind::Pull, err)
2215 }
2216}
2217
2218#[derive(Clone, Copy, Debug, PartialEq)]
2219pub enum ConsumerRecreateErrorKind {
2220 GetStream,
2221 Recreate,
2222 TimedOut,
2223}
2224
2225impl std::fmt::Display for ConsumerRecreateErrorKind {
2226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2227 match self {
2228 Self::GetStream => write!(f, "error getting stream"),
2229 Self::Recreate => write!(f, "consumer creation failed"),
2230 Self::TimedOut => write!(f, "timed out"),
2231 }
2232 }
2233}
2234
2235pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2236
2237async fn recreate_consumer_stream(
2238 context: &Context,
2239 config: &OrderedConfig,
2240 stream_name: &str,
2241 consumer_name: &str,
2242 sequence: u64,
2243) -> Result<Stream, ConsumerRecreateError> {
2244 let span = tracing::span!(
2245 tracing::Level::DEBUG,
2246 "recreate_ordered_consumer",
2247 stream_name = stream_name,
2248 consumer_name = consumer_name,
2249 sequence = sequence
2250 );
2251 let _span_handle = span.enter();
2252 let config = config.to_owned();
2253 trace!("delete old consumer before creating new one");
2254
2255 tokio::time::timeout(
2256 Duration::from_secs(5),
2257 context.delete_consumer_from_stream(consumer_name, stream_name),
2258 )
2259 .await
2260 .ok();
2261
2262 let deliver_policy = {
2263 if sequence == 0 {
2264 DeliverPolicy::All
2265 } else {
2266 DeliverPolicy::ByStartSequence {
2267 start_sequence: sequence + 1,
2268 }
2269 }
2270 };
2271 trace!("create the new ordered consumer for sequence {}", sequence);
2272 let consumer = tokio::time::timeout(
2273 Duration::from_secs(5),
2274 context.create_consumer_on_stream(
2275 jetstream::consumer::pull::OrderedConfig {
2276 deliver_policy,
2277 ..config.clone()
2278 },
2279 stream_name,
2280 ),
2281 )
2282 .await
2283 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2284 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2285
2286 let config = Consumer {
2287 config: config.clone().into(),
2288 context: context.clone(),
2289 info: consumer.info,
2290 };
2291
2292 trace!("create iterator");
2293 let stream = tokio::time::timeout(
2294 Duration::from_secs(5),
2295 Stream::stream(
2296 BatchConfig {
2297 batch: 500,
2298 expires: Some(Duration::from_secs(30)),
2299 no_wait: false,
2300 max_bytes: 0,
2301 idle_heartbeat: Duration::from_secs(15),
2302 },
2303 &config,
2304 ),
2305 )
2306 .await
2307 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2308 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2309 trace!("recreated consumer");
2310 stream
2311}