async_nats/jetstream/consumer/pull.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures_util::{
16 future::{BoxFuture, Either},
17 FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32 connection::State,
33 error::Error,
34 jetstream::{self, Context},
35 StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44 backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45 StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50 /// Returns a stream of messages for Pull Consumer.
51 ///
52 /// # Example
53 ///
54 /// ```no_run
55 /// # #[tokio::main]
56 /// # async fn mains() -> Result<(), async_nats::Error> {
57 /// use futures_util::StreamExt;
58 /// use futures_util::TryStreamExt;
59 ///
60 /// let client = async_nats::connect("localhost:4222").await?;
61 /// let jetstream = async_nats::jetstream::new(client);
62 ///
63 /// let stream = jetstream
64 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
65 /// name: "events".to_string(),
66 /// max_messages: 10_000,
67 /// ..Default::default()
68 /// })
69 /// .await?;
70 ///
71 /// jetstream.publish("events", "data".into()).await?;
72 ///
73 /// let consumer = stream
74 /// .get_or_create_consumer(
75 /// "consumer",
76 /// async_nats::jetstream::consumer::pull::Config {
77 /// durable_name: Some("consumer".to_string()),
78 /// ..Default::default()
79 /// },
80 /// )
81 /// .await?;
82 ///
83 /// let mut messages = consumer.messages().await?.take(100);
84 /// while let Some(Ok(message)) = messages.next().await {
85 /// println!("got message {:?}", message);
86 /// message.ack().await?;
87 /// }
88 /// Ok(())
89 /// # }
90 /// ```
91 pub async fn messages(&self) -> Result<Stream, StreamError> {
92 Stream::stream(
93 BatchConfig {
94 batch: 200,
95 expires: Some(Duration::from_secs(30)),
96 no_wait: false,
97 max_bytes: 0,
98 idle_heartbeat: Duration::from_secs(15),
99 min_pending: None,
100 min_ack_pending: None,
101 group: None,
102 #[cfg(feature = "server_2_12")]
103 priority: None,
104 },
105 self,
106 )
107 .await
108 }
109
110 /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
111 /// messages or bytes buffered.
112 ///
113 /// # Examples
114 ///
115 /// ```no_run
116 /// # #[tokio::main]
117 /// # async fn main() -> Result<(), async_nats::Error> {
118 /// use async_nats::jetstream::consumer::PullConsumer;
119 /// use futures_util::StreamExt;
120 /// let client = async_nats::connect("localhost:4222").await?;
121 /// let jetstream = async_nats::jetstream::new(client);
122 ///
123 /// let consumer: PullConsumer = jetstream
124 /// .get_stream("events")
125 /// .await?
126 /// .get_consumer("pull")
127 /// .await?;
128 ///
129 /// let mut messages = consumer
130 /// .stream()
131 /// .max_messages_per_batch(100)
132 /// .max_bytes_per_batch(1024)
133 /// .messages()
134 /// .await?;
135 ///
136 /// while let Some(message) = messages.next().await {
137 /// let message = message?;
138 /// println!("message: {:?}", message);
139 /// message.ack().await?;
140 /// }
141 /// # Ok(())
142 /// # }
143 /// ```
144 pub fn stream(&self) -> StreamBuilder<'_> {
145 StreamBuilder::new(self)
146 }
147
148 pub async fn request_batch<I: Into<BatchConfig>>(
149 &self,
150 batch: I,
151 inbox: Subject,
152 ) -> Result<(), BatchRequestError> {
153 debug!("sending batch");
154 let subject = format!(
155 "{}.CONSUMER.MSG.NEXT.{}.{}",
156 self.context.prefix, self.info.stream_name, self.info.name
157 );
158
159 let payload = serde_json::to_vec(&batch.into())
160 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
161
162 self.context
163 .client
164 .publish_with_reply(subject, inbox, payload.into())
165 .await
166 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
167 debug!("batch request sent");
168 Ok(())
169 }
170
171 /// Returns a batch of specified number of messages, or if there are less messages on the
172 /// [Stream] than requested, returns all available messages.
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// # #[tokio::main]
178 /// # async fn mains() -> Result<(), async_nats::Error> {
179 /// use futures_util::StreamExt;
180 /// use futures_util::TryStreamExt;
181 ///
182 /// let client = async_nats::connect("localhost:4222").await?;
183 /// let jetstream = async_nats::jetstream::new(client);
184 ///
185 /// let stream = jetstream
186 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
187 /// name: "events".to_string(),
188 /// max_messages: 10_000,
189 /// ..Default::default()
190 /// })
191 /// .await?;
192 ///
193 /// jetstream.publish("events", "data".into()).await?;
194 ///
195 /// let consumer = stream
196 /// .get_or_create_consumer(
197 /// "consumer",
198 /// async_nats::jetstream::consumer::pull::Config {
199 /// durable_name: Some("consumer".to_string()),
200 /// ..Default::default()
201 /// },
202 /// )
203 /// .await?;
204 ///
205 /// for _ in 0..100 {
206 /// jetstream.publish("events", "data".into()).await?;
207 /// }
208 ///
209 /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
210 /// // will finish after 100 messages, as that is the number of messages available on the
211 /// // stream.
212 /// while let Some(Ok(message)) = messages.next().await {
213 /// println!("got message {:?}", message);
214 /// message.ack().await?;
215 /// }
216 /// Ok(())
217 /// # }
218 /// ```
219 pub fn fetch(&self) -> FetchBuilder<'_> {
220 FetchBuilder::new(self)
221 }
222
223 /// Returns a batch of specified number of messages unless timeout happens first.
224 ///
225 /// # Example
226 ///
227 /// ```no_run
228 /// # #[tokio::main]
229 /// # async fn mains() -> Result<(), async_nats::Error> {
230 /// use futures_util::StreamExt;
231 /// use futures_util::TryStreamExt;
232 ///
233 /// let client = async_nats::connect("localhost:4222").await?;
234 /// let jetstream = async_nats::jetstream::new(client);
235 ///
236 /// let stream = jetstream
237 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
238 /// name: "events".to_string(),
239 /// max_messages: 10_000,
240 /// ..Default::default()
241 /// })
242 /// .await?;
243 ///
244 /// jetstream.publish("events", "data".into()).await?;
245 ///
246 /// let consumer = stream
247 /// .get_or_create_consumer(
248 /// "consumer",
249 /// async_nats::jetstream::consumer::pull::Config {
250 /// durable_name: Some("consumer".to_string()),
251 /// ..Default::default()
252 /// },
253 /// )
254 /// .await?;
255 ///
256 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
257 /// while let Some(Ok(message)) = messages.next().await {
258 /// println!("got message {:?}", message);
259 /// message.ack().await?;
260 /// }
261 /// Ok(())
262 /// # }
263 /// ```
264 pub fn batch(&self) -> BatchBuilder<'_> {
265 BatchBuilder::new(self)
266 }
267
268 /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
269 /// messages in those batches.
270 ///
271 /// # Example
272 ///
273 /// ```no_run
274 /// # #[tokio::main]
275 /// # async fn mains() -> Result<(), async_nats::Error> {
276 /// use futures_util::StreamExt;
277 /// use futures_util::TryStreamExt;
278 ///
279 /// let client = async_nats::connect("localhost:4222").await?;
280 /// let jetstream = async_nats::jetstream::new(client);
281 ///
282 /// let stream = jetstream
283 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
284 /// name: "events".to_string(),
285 /// max_messages: 10_000,
286 /// ..Default::default()
287 /// })
288 /// .await?;
289 ///
290 /// jetstream.publish("events", "data".into()).await?;
291 ///
292 /// let consumer = stream
293 /// .get_or_create_consumer(
294 /// "consumer",
295 /// async_nats::jetstream::consumer::pull::Config {
296 /// durable_name: Some("consumer".to_string()),
297 /// ..Default::default()
298 /// },
299 /// )
300 /// .await?;
301 ///
302 /// let mut iter = consumer.sequence(50).unwrap().take(10);
303 /// while let Ok(Some(mut batch)) = iter.try_next().await {
304 /// while let Ok(Some(message)) = batch.try_next().await {
305 /// println!("message received: {:?}", message);
306 /// }
307 /// }
308 /// Ok(())
309 /// # }
310 /// ```
311 pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
312 let context = self.context.clone();
313 let subject = format!(
314 "{}.CONSUMER.MSG.NEXT.{}.{}",
315 self.context.prefix, self.info.stream_name, self.info.name
316 );
317
318 let request = serde_json::to_vec(&BatchConfig {
319 batch,
320 expires: Some(Duration::from_secs(60)),
321 ..Default::default()
322 })
323 .map(Bytes::from)
324 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
325
326 Ok(Sequence {
327 context,
328 subject,
329 request,
330 pending_messages: batch,
331 next: None,
332 })
333 }
334}
335
336pub struct Batch {
337 pending_messages: usize,
338 subscriber: Subscriber,
339 context: Context,
340 timeout: Option<Pin<Box<Sleep>>>,
341 terminated: bool,
342}
343
344impl Batch {
345 async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
346 let inbox = Subject::from(consumer.context.client.new_inbox());
347 let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
348 consumer.request_batch(batch.clone(), inbox.clone()).await?;
349
350 let sleep = batch.expires.map(|expires| {
351 Box::pin(tokio::time::sleep(
352 expires.saturating_add(Duration::from_secs(5)),
353 ))
354 });
355
356 Ok(Batch {
357 pending_messages: batch.batch,
358 subscriber: subscription,
359 context: consumer.context.clone(),
360 terminated: false,
361 timeout: sleep,
362 })
363 }
364}
365
366impl futures_util::Stream for Batch {
367 type Item = Result<jetstream::Message, crate::Error>;
368
369 fn poll_next(
370 mut self: std::pin::Pin<&mut Self>,
371 cx: &mut std::task::Context<'_>,
372 ) -> std::task::Poll<Option<Self::Item>> {
373 if self.terminated {
374 return Poll::Ready(None);
375 }
376 if self.pending_messages == 0 {
377 self.terminated = true;
378 return Poll::Ready(None);
379 }
380 if let Some(sleep) = self.timeout.as_mut() {
381 match sleep.poll_unpin(cx) {
382 Poll::Ready(_) => {
383 debug!("batch timeout timer triggered");
384 // TODO(tp): Maybe we can be smarter here and before timing out, check if
385 // we consumed all the messages from the subscription buffer in case of user
386 // slowly consuming messages. Keep in mind that we time out here only if
387 // for some reason we missed timeout from the server and few seconds have
388 // passed since expected timeout message.
389 self.terminated = true;
390 return Poll::Ready(None);
391 }
392 Poll::Pending => (),
393 }
394 }
395 match self.subscriber.receiver.poll_recv(cx) {
396 Poll::Ready(maybe_message) => match maybe_message {
397 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
398 StatusCode::TIMEOUT => {
399 debug!("received timeout. Iterator done");
400 self.terminated = true;
401 Poll::Ready(None)
402 }
403 StatusCode::IDLE_HEARTBEAT => {
404 debug!("received heartbeat");
405 Poll::Pending
406 }
407 // If this is fetch variant, terminate on no more messages.
408 // We do not need to check if this is a fetch, not batch,
409 // as only fetch will send back `NO_MESSAGES` status.
410 StatusCode::NOT_FOUND => {
411 debug!("received `NO_MESSAGES`. Iterator done");
412 self.terminated = true;
413 Poll::Ready(None)
414 }
415 StatusCode::OK => {
416 debug!("received message");
417 self.pending_messages -= 1;
418 Poll::Ready(Some(Ok(jetstream::Message {
419 context: self.context.clone(),
420 message,
421 })))
422 }
423 status => {
424 debug!("received error");
425 self.terminated = true;
426 Poll::Ready(Some(Err(Box::new(std::io::Error::other(format!(
427 "error while processing messages from the stream: {}, {:?}",
428 status, message.description
429 ))))))
430 }
431 },
432 None => Poll::Ready(None),
433 },
434 std::task::Poll::Pending => std::task::Poll::Pending,
435 }
436 }
437}
438
439pub struct Sequence {
440 context: Context,
441 subject: String,
442 request: Bytes,
443 pending_messages: usize,
444 next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
445}
446
447impl futures_util::Stream for Sequence {
448 type Item = Result<Batch, MessagesError>;
449
450 fn poll_next(
451 mut self: std::pin::Pin<&mut Self>,
452 cx: &mut std::task::Context<'_>,
453 ) -> std::task::Poll<Option<Self::Item>> {
454 match self.next.as_mut() {
455 None => {
456 let context = self.context.clone();
457 let subject = self.subject.clone();
458 let request = self.request.clone();
459 let pending_messages = self.pending_messages;
460
461 let next = self.next.insert(Box::pin(async move {
462 let inbox = context.client.new_inbox();
463 let subscriber = context
464 .client
465 .subscribe(inbox.clone())
466 .await
467 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
468
469 context
470 .client
471 .publish_with_reply(subject, inbox, request)
472 .await
473 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
474
475 // TODO(tp): Add timeout config and defaults.
476 Ok(Batch {
477 pending_messages,
478 subscriber,
479 context,
480 terminated: false,
481 timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
482 })
483 }));
484
485 match next.as_mut().poll(cx) {
486 Poll::Ready(result) => {
487 self.next = None;
488 Poll::Ready(Some(result.map_err(|err| {
489 MessagesError::with_source(MessagesErrorKind::Pull, err)
490 })))
491 }
492 Poll::Pending => Poll::Pending,
493 }
494 }
495
496 Some(next) => match next.as_mut().poll(cx) {
497 Poll::Ready(result) => {
498 self.next = None;
499 Poll::Ready(Some(result.map_err(|err| {
500 MessagesError::with_source(MessagesErrorKind::Pull, err)
501 })))
502 }
503 Poll::Pending => Poll::Pending,
504 },
505 }
506 }
507}
508
509impl Consumer<OrderedConfig> {
510 /// Returns a stream of messages for Ordered Pull Consumer.
511 ///
512 /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
513 /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
514 /// sees mismatch.
515 ///
516 /// # Example
517 ///
518 /// ```no_run
519 /// # #[tokio::main]
520 /// # async fn mains() -> Result<(), async_nats::Error> {
521 /// use futures_util::StreamExt;
522 /// use futures_util::TryStreamExt;
523 ///
524 /// let client = async_nats::connect("localhost:4222").await?;
525 /// let jetstream = async_nats::jetstream::new(client);
526 ///
527 /// let stream = jetstream
528 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
529 /// name: "events".to_string(),
530 /// max_messages: 10_000,
531 /// ..Default::default()
532 /// })
533 /// .await?;
534 ///
535 /// jetstream.publish("events", "data".into()).await?;
536 ///
537 /// let consumer = stream
538 /// .get_or_create_consumer(
539 /// "consumer",
540 /// async_nats::jetstream::consumer::pull::OrderedConfig {
541 /// name: Some("consumer".to_string()),
542 /// ..Default::default()
543 /// },
544 /// )
545 /// .await?;
546 ///
547 /// let mut messages = consumer.messages().await?.take(100);
548 /// while let Some(Ok(message)) = messages.next().await {
549 /// println!("got message {:?}", message);
550 /// }
551 /// Ok(())
552 /// # }
553 /// ```
554 pub async fn messages(self) -> Result<Ordered, StreamError> {
555 let config = Consumer {
556 config: self.config.clone().into(),
557 context: self.context.clone(),
558 info: self.info.clone(),
559 };
560 let stream = Stream::stream(
561 BatchConfig {
562 batch: 500,
563 expires: Some(Duration::from_secs(30)),
564 no_wait: false,
565 max_bytes: 0,
566 idle_heartbeat: Duration::from_secs(15),
567 min_pending: None,
568 min_ack_pending: None,
569 group: None,
570 #[cfg(feature = "server_2_12")]
571 priority: None,
572 },
573 &config,
574 )
575 .await?;
576
577 Ok(Ordered {
578 consumer_sequence: 0,
579 stream_sequence: 0,
580 missed_heartbeats: false,
581 create_stream: None,
582 context: self.context.clone(),
583 consumer_name: self
584 .config
585 .name
586 .clone()
587 .unwrap_or_else(|| self.context.client.new_inbox()),
588 consumer: self.config,
589 stream: Some(stream),
590 stream_name: self.info.stream_name.clone(),
591 })
592 }
593}
594
595/// Configuration for consumers. From a high level, the
596/// `durable_name` and `deliver_subject` fields have a particularly
597/// strong influence on the consumer's overall behavior.
598#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
599pub struct OrderedConfig {
600 /// A name of the consumer. Can be specified for both durable and ephemeral
601 /// consumers.
602 #[serde(default, skip_serializing_if = "Option::is_none")]
603 pub name: Option<String>,
604 /// A short description of the purpose of this consumer.
605 #[serde(default, skip_serializing_if = "Option::is_none")]
606 pub description: Option<String>,
607 #[serde(default, skip_serializing_if = "is_default")]
608 pub filter_subject: String,
609 #[cfg(feature = "server_2_10")]
610 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
611 #[serde(default, skip_serializing_if = "is_default")]
612 pub filter_subjects: Vec<String>,
613 /// Whether messages are sent as quickly as possible or at the rate of receipt
614 pub replay_policy: ReplayPolicy,
615 /// The rate of message delivery in bits per second
616 #[serde(default, skip_serializing_if = "is_default")]
617 pub rate_limit: u64,
618 /// What percentage of acknowledgments should be samples for observability, 0-100
619 #[serde(
620 rename = "sample_freq",
621 with = "super::sample_freq_deser",
622 default,
623 skip_serializing_if = "is_default"
624 )]
625 pub sample_frequency: u8,
626 /// Only deliver headers without payloads.
627 #[serde(default, skip_serializing_if = "is_default")]
628 pub headers_only: bool,
629 /// Allows for a variety of options that determine how this consumer will receive messages
630 #[serde(flatten)]
631 pub deliver_policy: DeliverPolicy,
632 /// The maximum number of waiting consumers.
633 #[serde(default, skip_serializing_if = "is_default")]
634 pub max_waiting: i64,
635 #[cfg(feature = "server_2_10")]
636 // Additional consumer metadata.
637 #[serde(default, skip_serializing_if = "is_default")]
638 pub metadata: HashMap<String, String>,
639 // Maximum number of messages that can be requested in single Pull Request.
640 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
641 // [stream]
642 pub max_batch: i64,
643 // Maximum number of bytes that can be requested in single Pull Request.
644 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
645 // [stream]
646 pub max_bytes: i64,
647 // Maximum expiry that can be set for a single Pull Request.
648 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
649 // [stream]
650 pub max_expires: Duration,
651}
652
653impl From<OrderedConfig> for Config {
654 fn from(config: OrderedConfig) -> Self {
655 Config {
656 durable_name: None,
657 name: config.name,
658 description: config.description,
659 deliver_policy: config.deliver_policy,
660 ack_policy: AckPolicy::None,
661 ack_wait: Duration::default(),
662 max_deliver: 1,
663 filter_subject: config.filter_subject,
664 #[cfg(feature = "server_2_10")]
665 filter_subjects: config.filter_subjects,
666 replay_policy: config.replay_policy,
667 rate_limit: config.rate_limit,
668 sample_frequency: config.sample_frequency,
669 max_waiting: config.max_waiting,
670 max_ack_pending: 0,
671 headers_only: config.headers_only,
672 max_batch: config.max_batch,
673 max_bytes: config.max_bytes,
674 max_expires: config.max_expires,
675 inactive_threshold: Duration::from_secs(30),
676 num_replicas: 1,
677 memory_storage: true,
678 #[cfg(feature = "server_2_10")]
679 metadata: config.metadata,
680 backoff: Vec::new(),
681 #[cfg(feature = "server_2_11")]
682 priority_policy: PriorityPolicy::None,
683 #[cfg(feature = "server_2_11")]
684 priority_groups: Vec::new(),
685 #[cfg(feature = "server_2_11")]
686 pause_until: None,
687 }
688 }
689}
690
691impl FromConsumer for OrderedConfig {
692 fn try_from_consumer_config(
693 config: crate::jetstream::consumer::Config,
694 ) -> Result<Self, crate::Error>
695 where
696 Self: Sized,
697 {
698 Ok(OrderedConfig {
699 name: config.name,
700 description: config.description,
701 filter_subject: config.filter_subject,
702 #[cfg(feature = "server_2_10")]
703 filter_subjects: config.filter_subjects,
704 replay_policy: config.replay_policy,
705 rate_limit: config.rate_limit,
706 sample_frequency: config.sample_frequency,
707 headers_only: config.headers_only,
708 deliver_policy: config.deliver_policy,
709 max_waiting: config.max_waiting,
710 #[cfg(feature = "server_2_10")]
711 metadata: config.metadata,
712 max_batch: config.max_batch,
713 max_bytes: config.max_bytes,
714 max_expires: config.max_expires,
715 })
716 }
717}
718
719impl IntoConsumerConfig for OrderedConfig {
720 fn into_consumer_config(self) -> super::Config {
721 jetstream::consumer::Config {
722 deliver_subject: None,
723 durable_name: None,
724 name: self.name,
725 description: self.description,
726 deliver_group: None,
727 deliver_policy: self.deliver_policy,
728 ack_policy: AckPolicy::None,
729 ack_wait: Duration::default(),
730 max_deliver: 1,
731 filter_subject: self.filter_subject,
732 #[cfg(feature = "server_2_10")]
733 filter_subjects: self.filter_subjects,
734 replay_policy: self.replay_policy,
735 rate_limit: self.rate_limit,
736 sample_frequency: self.sample_frequency,
737 max_waiting: self.max_waiting,
738 max_ack_pending: 0,
739 headers_only: self.headers_only,
740 flow_control: false,
741 idle_heartbeat: Duration::default(),
742 max_batch: 0,
743 max_bytes: 0,
744 max_expires: Duration::default(),
745 inactive_threshold: Duration::from_secs(30),
746 num_replicas: 1,
747 memory_storage: true,
748 #[cfg(feature = "server_2_10")]
749 metadata: self.metadata,
750 backoff: Vec::new(),
751 #[cfg(feature = "server_2_11")]
752 priority_policy: PriorityPolicy::None,
753 #[cfg(feature = "server_2_11")]
754 priority_groups: Vec::new(),
755 #[cfg(feature = "server_2_11")]
756 pause_until: None,
757 }
758 }
759}
760
761pub struct Ordered {
762 context: Context,
763 stream_name: String,
764 consumer: OrderedConfig,
765 consumer_name: String,
766 stream: Option<Stream>,
767 create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
768 consumer_sequence: u64,
769 stream_sequence: u64,
770 missed_heartbeats: bool,
771}
772
773impl futures_util::Stream for Ordered {
774 type Item = Result<jetstream::Message, OrderedError>;
775
776 fn poll_next(
777 mut self: Pin<&mut Self>,
778 cx: &mut std::task::Context<'_>,
779 ) -> Poll<Option<Self::Item>> {
780 let mut recreate = false;
781 // Poll messages
782 if let Some(stream) = self.stream.as_mut() {
783 match stream.poll_next_unpin(cx) {
784 Poll::Ready(message) => match message {
785 Some(message) => match message {
786 Ok(message) => {
787 self.missed_heartbeats = false;
788 let info = message.info().map_err(|err| {
789 OrderedError::with_source(OrderedErrorKind::Other, err)
790 })?;
791 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
792 self.consumer_sequence,
793 self.stream_sequence,
794 info.consumer_sequence,
795 info.stream_sequence);
796 if info.consumer_sequence != self.consumer_sequence + 1 {
797 debug!(
798 "ordered consumer mismatch. current {}, info: {}",
799 self.consumer_sequence, info.consumer_sequence
800 );
801 recreate = true;
802 self.consumer_sequence = 0;
803 } else {
804 self.stream_sequence = info.stream_sequence;
805 self.consumer_sequence = info.consumer_sequence;
806 return Poll::Ready(Some(Ok(message)));
807 }
808 }
809 Err(err) => match err.kind() {
810 MessagesErrorKind::MissingHeartbeat => {
811 // If we have missed heartbeats set, it means this is a second
812 // missed heartbeat, so we need to recreate consumer.
813 if self.missed_heartbeats {
814 self.consumer_sequence = 0;
815 recreate = true;
816 } else {
817 self.missed_heartbeats = true;
818 }
819 }
820 MessagesErrorKind::ConsumerDeleted
821 | MessagesErrorKind::NoResponders => {
822 recreate = true;
823 self.consumer_sequence = 0;
824 }
825 MessagesErrorKind::Pull
826 | MessagesErrorKind::PushBasedConsumer
827 | MessagesErrorKind::Other => {
828 return Poll::Ready(Some(Err(err.into())));
829 }
830 },
831 },
832 None => return Poll::Ready(None),
833 },
834 Poll::Pending => (),
835 }
836 }
837 // Recreate consumer if needed
838 if recreate {
839 self.stream = None;
840 self.create_stream = Some(Box::pin({
841 let context = self.context.clone();
842 let config = self.consumer.clone();
843 let stream_name = self.stream_name.clone();
844 let consumer_name = self.consumer_name.clone();
845 let sequence = self.stream_sequence;
846 async move {
847 tryhard::retry_fn(|| {
848 recreate_consumer_stream(
849 &context,
850 &config,
851 &stream_name,
852 &consumer_name,
853 sequence,
854 )
855 })
856 .retries(u32::MAX)
857 .custom_backoff(backoff)
858 .await
859 }
860 }))
861 }
862 // check for recreation future
863 if let Some(result) = self.create_stream.as_mut() {
864 match result.poll_unpin(cx) {
865 Poll::Ready(result) => match result {
866 Ok(stream) => {
867 self.create_stream = None;
868 self.stream = Some(stream);
869 return self.poll_next(cx);
870 }
871 Err(err) => {
872 return Poll::Ready(Some(Err(OrderedError::with_source(
873 OrderedErrorKind::Recreate,
874 err,
875 ))))
876 }
877 },
878 Poll::Pending => (),
879 }
880 }
881 Poll::Pending
882 }
883}
884
885pub struct Stream {
886 pending_messages: usize,
887 pending_bytes: usize,
888 request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
889 request_tx: tokio::sync::watch::Sender<()>,
890 subscriber: Subscriber,
891 batch_config: BatchConfig,
892 context: Context,
893 pending_request: bool,
894 task_handle: JoinHandle<()>,
895 terminated: bool,
896 heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
897 started: Option<tokio::sync::oneshot::Sender<()>>,
898}
899
900impl Drop for Stream {
901 fn drop(&mut self) {
902 self.task_handle.abort();
903 }
904}
905
906impl Stream {
907 async fn stream(
908 batch_config: BatchConfig,
909 consumer: &Consumer<Config>,
910 ) -> Result<Stream, StreamError> {
911 let inbox = consumer.context.client.new_inbox();
912 let subscription = consumer
913 .context
914 .client
915 .subscribe(inbox.clone())
916 .await
917 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
918 let subject = format!(
919 "{}.CONSUMER.MSG.NEXT.{}.{}",
920 consumer.context.prefix, consumer.info.stream_name, consumer.info.name
921 );
922
923 let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
924 let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
925 let (started_tx, started_rx) = tokio::sync::oneshot::channel();
926 let task_handle = tokio::task::spawn({
927 let batch = batch_config.clone();
928 let consumer = consumer.clone();
929 let mut context = consumer.context.clone();
930 let inbox = inbox.clone();
931 async move {
932 started_rx.await.ok();
933 loop {
934 // this is just in edge case of missing response for some reason.
935 let expires = batch_config
936 .expires
937 .map(|expires| {
938 if expires.is_zero() {
939 Either::Left(future::pending())
940 } else {
941 Either::Right(tokio::time::sleep(
942 expires.saturating_add(Duration::from_secs(5)),
943 ))
944 }
945 })
946 .unwrap_or_else(|| Either::Left(future::pending()));
947 // Need to check previous state, as `changed` will always fire on first
948 // call.
949 let prev_state = context.client.state.borrow().to_owned();
950 let mut pending_reset = false;
951
952 tokio::select! {
953 _ = context.client.state.changed() => {
954 let state = context.client.state.borrow().to_owned();
955 if !(state == crate::connection::State::Connected
956 && prev_state != State::Connected) {
957 continue;
958 }
959 debug!("detected !Connected -> Connected state change");
960
961 match tryhard::retry_fn(|| consumer.get_info())
962 .retries(5).custom_backoff(backoff).await
963 .map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
964 Ok(info) => {
965 if info.num_waiting == 0 {
966 pending_reset = true;
967 }
968 }
969 Err(err) => {
970 if let Err(err) = request_result_tx.send(Err(err)).await {
971 debug!("failed to sent request result: {}", err);
972 }
973 },
974 }
975 },
976 _ = request_rx.changed() => debug!("task received request request"),
977 _ = expires => {
978 pending_reset = true;
979 debug!("expired pull request")},
980 }
981
982 let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
983 let result = context
984 .client
985 .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
986 .await
987 .map(|_| pending_reset);
988 // TODO: add tracing instead of ignoring this.
989 request_result_tx
990 .send(result.map(|_| pending_reset).map_err(|err| {
991 crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
992 .into()
993 }))
994 .await
995 .ok();
996 trace!("result send over tx");
997 }
998 }
999 });
1000
1001 Ok(Stream {
1002 task_handle,
1003 request_result_rx,
1004 request_tx,
1005 batch_config,
1006 pending_messages: 0,
1007 pending_bytes: 0,
1008 subscriber: subscription,
1009 context: consumer.context.clone(),
1010 pending_request: false,
1011 terminated: false,
1012 heartbeat_timeout: None,
1013 started: Some(started_tx),
1014 })
1015 }
1016}
1017
1018#[derive(Clone, Copy, Debug, PartialEq)]
1019pub enum OrderedErrorKind {
1020 MissingHeartbeat,
1021 ConsumerDeleted,
1022 Pull,
1023 PushBasedConsumer,
1024 Recreate,
1025 NoResponders,
1026 Other,
1027}
1028
1029impl std::fmt::Display for OrderedErrorKind {
1030 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1031 match self {
1032 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1033 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1034 Self::Pull => write!(f, "pull request failed"),
1035 Self::Other => write!(f, "error"),
1036 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1037 Self::Recreate => write!(f, "consumer recreation failed"),
1038 Self::NoResponders => write!(f, "no responders"),
1039 }
1040 }
1041}
1042
1043pub type OrderedError = Error<OrderedErrorKind>;
1044
1045impl From<MessagesError> for OrderedError {
1046 fn from(err: MessagesError) -> Self {
1047 match err.kind() {
1048 MessagesErrorKind::MissingHeartbeat => {
1049 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1050 }
1051 MessagesErrorKind::ConsumerDeleted => {
1052 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1053 }
1054 MessagesErrorKind::Pull => OrderedError {
1055 kind: OrderedErrorKind::Pull,
1056 source: err.source,
1057 },
1058 MessagesErrorKind::PushBasedConsumer => {
1059 OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1060 }
1061 MessagesErrorKind::Other => OrderedError {
1062 kind: OrderedErrorKind::Other,
1063 source: err.source,
1064 },
1065 MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
1066 }
1067 }
1068}
1069
1070#[derive(Clone, Copy, Debug, PartialEq)]
1071pub enum MessagesErrorKind {
1072 MissingHeartbeat,
1073 ConsumerDeleted,
1074 Pull,
1075 PushBasedConsumer,
1076 NoResponders,
1077 Other,
1078}
1079
1080impl std::fmt::Display for MessagesErrorKind {
1081 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1082 match self {
1083 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1084 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1085 Self::Pull => write!(f, "pull request failed"),
1086 Self::Other => write!(f, "error"),
1087 Self::NoResponders => write!(f, "no responders"),
1088 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1089 }
1090 }
1091}
1092
1093pub type MessagesError = Error<MessagesErrorKind>;
1094
1095impl futures_util::Stream for Stream {
1096 type Item = Result<jetstream::Message, MessagesError>;
1097
1098 fn poll_next(
1099 mut self: std::pin::Pin<&mut Self>,
1100 cx: &mut std::task::Context<'_>,
1101 ) -> std::task::Poll<Option<Self::Item>> {
1102 if let Some(started) = self.started.take() {
1103 trace!("stream started, sending started signal");
1104 if started.send(()).is_err() {
1105 debug!("failed to send started signal");
1106 }
1107 }
1108 if self.terminated {
1109 return Poll::Ready(None);
1110 }
1111
1112 if !self.batch_config.idle_heartbeat.is_zero() {
1113 trace!("checking idle hearbeats");
1114 let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1115 match self
1116 .heartbeat_timeout
1117 .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1118 .poll_unpin(cx)
1119 {
1120 Poll::Ready(_) => {
1121 self.heartbeat_timeout = None;
1122 return Poll::Ready(Some(Err(MessagesError::new(
1123 MessagesErrorKind::MissingHeartbeat,
1124 ))));
1125 }
1126 Poll::Pending => (),
1127 }
1128 }
1129
1130 loop {
1131 trace!("pending messages: {}", self.pending_messages);
1132 if (self.pending_messages <= self.batch_config.batch / 2
1133 || (self.batch_config.max_bytes > 0
1134 && self.pending_bytes <= self.batch_config.max_bytes / 2))
1135 && !self.pending_request
1136 {
1137 debug!("pending messages reached threshold to send new fetch request");
1138 self.request_tx.send(()).ok();
1139 self.pending_request = true;
1140 }
1141
1142 match self.request_result_rx.poll_recv(cx) {
1143 Poll::Ready(resp) => match resp {
1144 Some(resp) => match resp {
1145 Ok(reset) => {
1146 trace!("request response: {:?}", reset);
1147 debug!("request sent, setting pending messages");
1148 if reset {
1149 self.pending_messages = self.batch_config.batch;
1150 self.pending_bytes = self.batch_config.max_bytes;
1151 } else {
1152 self.pending_messages += self.batch_config.batch;
1153 self.pending_bytes += self.batch_config.max_bytes;
1154 }
1155 self.pending_request = false;
1156 continue;
1157 }
1158 Err(err) => {
1159 return Poll::Ready(Some(Err(MessagesError::with_source(
1160 MessagesErrorKind::Pull,
1161 err,
1162 ))))
1163 }
1164 },
1165 None => return Poll::Ready(None),
1166 },
1167 Poll::Pending => {
1168 trace!("pending result");
1169 }
1170 }
1171
1172 trace!("polling subscriber");
1173 match self.subscriber.receiver.poll_recv(cx) {
1174 Poll::Ready(maybe_message) => {
1175 self.heartbeat_timeout = None;
1176 match maybe_message {
1177 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1178 StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1179 debug!("received status message: {:?}", message);
1180 // If consumer has been deleted, error and shutdown the iterator.
1181 if message.description.as_deref() == Some("Consumer Deleted") {
1182 self.terminated = true;
1183 return Poll::Ready(Some(Err(MessagesError::new(
1184 MessagesErrorKind::ConsumerDeleted,
1185 ))));
1186 }
1187 // If consumer is not pull based, error and shutdown the iterator.
1188 if message.description.as_deref() == Some("Consumer is push based")
1189 {
1190 self.terminated = true;
1191 return Poll::Ready(Some(Err(MessagesError::new(
1192 MessagesErrorKind::PushBasedConsumer,
1193 ))));
1194 }
1195
1196 // Do accounting for messages left after terminated/completed pull request.
1197 let pending_messages = message
1198 .headers
1199 .as_ref()
1200 .and_then(|headers| headers.get("Nats-Pending-Messages"))
1201 .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1202 .map_err(|err| {
1203 MessagesError::with_source(MessagesErrorKind::Other, err)
1204 })?;
1205
1206 let pending_bytes = message
1207 .headers
1208 .as_ref()
1209 .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1210 .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1211 .map_err(|err| {
1212 MessagesError::with_source(MessagesErrorKind::Other, err)
1213 })?;
1214
1215 debug!(
1216 "timeout reached. remaining messages: {}, bytes {}",
1217 pending_messages, pending_bytes
1218 );
1219 self.pending_messages =
1220 self.pending_messages.saturating_sub(pending_messages);
1221 trace!("message bytes len: {}", pending_bytes);
1222 self.pending_bytes =
1223 self.pending_bytes.saturating_sub(pending_bytes);
1224 continue;
1225 }
1226 // Idle Hearbeat means we have no messages, but consumer is fine.
1227 StatusCode::IDLE_HEARTBEAT => {
1228 debug!("received idle heartbeat");
1229 continue;
1230 }
1231 // We got an message from a stream.
1232 StatusCode::OK => {
1233 trace!("message received");
1234 self.pending_messages = self.pending_messages.saturating_sub(1);
1235 self.pending_bytes =
1236 self.pending_bytes.saturating_sub(message.length);
1237 return Poll::Ready(Some(Ok(jetstream::Message {
1238 context: self.context.clone(),
1239 message,
1240 })));
1241 }
1242 StatusCode::NO_RESPONDERS => {
1243 debug!("received no responders");
1244 return Poll::Ready(Some(Err(MessagesError::new(
1245 MessagesErrorKind::NoResponders,
1246 ))));
1247 }
1248 status => {
1249 debug!("received unknown message: {:?}", message);
1250 return Poll::Ready(Some(Err(MessagesError::with_source(
1251 MessagesErrorKind::Other,
1252 format!(
1253 "error while processing messages from the stream: {}, {:?}",
1254 status, message.description
1255 ),
1256 ))));
1257 }
1258 },
1259 None => return Poll::Ready(None),
1260 }
1261 }
1262 Poll::Pending => {
1263 debug!("subscriber still pending");
1264 return std::task::Poll::Pending;
1265 }
1266 }
1267 }
1268 }
1269}
1270
1271/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1272///
1273/// # Examples
1274///
1275/// ```no_run
1276/// # #[tokio::main]
1277/// # async fn main() -> Result<(), async_nats::Error> {
1278/// use futures_util::StreamExt;
1279/// use async_nats::jetstream::consumer::PullConsumer;
1280/// let client = async_nats::connect("localhost:4222").await?;
1281/// let jetstream = async_nats::jetstream::new(client);
1282///
1283/// let consumer: PullConsumer = jetstream
1284/// .get_stream("events").await?
1285/// .get_consumer("pull").await?;
1286///
1287/// let mut messages = consumer.stream()
1288/// .max_messages_per_batch(100)
1289/// .max_bytes_per_batch(1024)
1290/// .messages().await?;
1291///
1292/// while let Some(message) = messages.next().await {
1293/// let message = message?;
1294/// println!("message: {:?}", message);
1295/// message.ack().await?;
1296/// }
1297/// # Ok(())
1298/// # }
1299pub struct StreamBuilder<'a> {
1300 batch: usize,
1301 max_bytes: usize,
1302 heartbeat: Duration,
1303 expires: Duration,
1304 group: Option<String>,
1305 min_pending: Option<usize>,
1306 min_ack_pending: Option<usize>,
1307 #[cfg(feature = "server_2_12")]
1308 priority: Option<usize>,
1309 consumer: &'a Consumer<Config>,
1310}
1311
1312impl<'a> StreamBuilder<'a> {
1313 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1314 StreamBuilder {
1315 consumer,
1316 batch: 200,
1317 max_bytes: 0,
1318 expires: Duration::from_secs(30),
1319 heartbeat: Duration::default(),
1320 group: None,
1321 min_pending: None,
1322 min_ack_pending: None,
1323 #[cfg(feature = "server_2_12")]
1324 priority: None,
1325 }
1326 }
1327
1328 /// Sets max bytes that can be buffered on the Client while processing already received
1329 /// messages.
1330 /// Higher values will yield better performance, but also potentially increase memory usage if
1331 /// application is acknowledging messages much slower than they arrive.
1332 ///
1333 /// Default values should provide reasonable balance between performance and memory usage.
1334 ///
1335 /// # Examples
1336 ///
1337 /// ```no_run
1338 /// # #[tokio::main]
1339 /// # async fn main() -> Result<(), async_nats::Error> {
1340 /// use async_nats::jetstream::consumer::PullConsumer;
1341 /// use futures_util::StreamExt;
1342 /// let client = async_nats::connect("localhost:4222").await?;
1343 /// let jetstream = async_nats::jetstream::new(client);
1344 ///
1345 /// let consumer: PullConsumer = jetstream
1346 /// .get_stream("events")
1347 /// .await?
1348 /// .get_consumer("pull")
1349 /// .await?;
1350 ///
1351 /// let mut messages = consumer
1352 /// .stream()
1353 /// .max_bytes_per_batch(1024)
1354 /// .messages()
1355 /// .await?;
1356 ///
1357 /// while let Some(message) = messages.next().await {
1358 /// let message = message?;
1359 /// println!("message: {:?}", message);
1360 /// message.ack().await?;
1361 /// }
1362 /// # Ok(())
1363 /// # }
1364 /// ```
1365 pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1366 self.max_bytes = max_bytes;
1367 self
1368 }
1369
1370 /// Sets max number of messages that can be buffered on the Client while processing already received
1371 /// messages.
1372 /// Higher values will yield better performance, but also potentially increase memory usage if
1373 /// application is acknowledging messages much slower than they arrive.
1374 ///
1375 /// Default values should provide reasonable balance between performance and memory usage.
1376 ///
1377 /// # Examples
1378 ///
1379 /// ```no_run
1380 /// # #[tokio::main]
1381 /// # async fn main() -> Result<(), async_nats::Error> {
1382 /// use async_nats::jetstream::consumer::PullConsumer;
1383 /// use futures_util::StreamExt;
1384 /// let client = async_nats::connect("localhost:4222").await?;
1385 /// let jetstream = async_nats::jetstream::new(client);
1386 ///
1387 /// let consumer: PullConsumer = jetstream
1388 /// .get_stream("events")
1389 /// .await?
1390 /// .get_consumer("pull")
1391 /// .await?;
1392 ///
1393 /// let mut messages = consumer
1394 /// .stream()
1395 /// .max_messages_per_batch(100)
1396 /// .messages()
1397 /// .await?;
1398 ///
1399 /// while let Some(message) = messages.next().await {
1400 /// let message = message?;
1401 /// println!("message: {:?}", message);
1402 /// message.ack().await?;
1403 /// }
1404 /// # Ok(())
1405 /// # }
1406 /// ```
1407 pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1408 self.batch = batch;
1409 self
1410 }
1411
1412 /// Sets heartbeat which will be send by the server if there are no messages for a given
1413 /// [Consumer] pending.
1414 ///
1415 /// # Examples
1416 ///
1417 /// ```no_run
1418 /// # #[tokio::main]
1419 /// # async fn main() -> Result<(), async_nats::Error> {
1420 /// use async_nats::jetstream::consumer::PullConsumer;
1421 /// use futures_util::StreamExt;
1422 /// let client = async_nats::connect("localhost:4222").await?;
1423 /// let jetstream = async_nats::jetstream::new(client);
1424 ///
1425 /// let consumer: PullConsumer = jetstream
1426 /// .get_stream("events")
1427 /// .await?
1428 /// .get_consumer("pull")
1429 /// .await?;
1430 ///
1431 /// let mut messages = consumer
1432 /// .stream()
1433 /// .heartbeat(std::time::Duration::from_secs(10))
1434 /// .messages()
1435 /// .await?;
1436 ///
1437 /// while let Some(message) = messages.next().await {
1438 /// let message = message?;
1439 /// println!("message: {:?}", message);
1440 /// message.ack().await?;
1441 /// }
1442 /// # Ok(())
1443 /// # }
1444 /// ```
1445 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1446 self.heartbeat = heartbeat;
1447 self
1448 }
1449
1450 /// Low level API that does not need tweaking for most use cases.
1451 /// Sets how long each batch request waits for whole batch of messages before timing out.
1452 /// [Consumer] pending.
1453 ///
1454 /// # Examples
1455 ///
1456 /// ```no_run
1457 /// # #[tokio::main]
1458 /// # async fn main() -> Result<(), async_nats::Error> {
1459 /// use async_nats::jetstream::consumer::PullConsumer;
1460 /// use futures_util::StreamExt;
1461 /// let client = async_nats::connect("localhost:4222").await?;
1462 /// let jetstream = async_nats::jetstream::new(client);
1463 ///
1464 /// let consumer: PullConsumer = jetstream
1465 /// .get_stream("events")
1466 /// .await?
1467 /// .get_consumer("pull")
1468 /// .await?;
1469 ///
1470 /// let mut messages = consumer
1471 /// .stream()
1472 /// .expires(std::time::Duration::from_secs(30))
1473 /// .messages()
1474 /// .await?;
1475 ///
1476 /// while let Some(message) = messages.next().await {
1477 /// let message = message?;
1478 /// println!("message: {:?}", message);
1479 /// message.ack().await?;
1480 /// }
1481 /// # Ok(())
1482 /// # }
1483 /// ```
1484 pub fn expires(mut self, expires: Duration) -> Self {
1485 self.expires = expires;
1486 self
1487 }
1488
1489 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1490 /// messages for a [Consumer].
1491 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1492 ///
1493 /// # Examples
1494 ///
1495 /// ```no_run
1496 /// # #[tokio::main]
1497 /// # async fn main() -> Result<(), async_nats::Error> {
1498 /// use async_nats::jetstream::consumer::PullConsumer;
1499 /// use futures_util::StreamExt;
1500 /// let client = async_nats::connect("localhost:4222").await?;
1501 /// let jetstream = async_nats::jetstream::new(client);
1502 ///
1503 /// let consumer: PullConsumer = jetstream
1504 /// .get_stream("events")
1505 /// .await?
1506 /// .get_consumer("pull")
1507 /// .await?;
1508 ///
1509 /// let mut messages = consumer
1510 /// .stream()
1511 /// .expires(std::time::Duration::from_secs(30))
1512 /// .group("A")
1513 /// .min_pending(100)
1514 /// .messages()
1515 /// .await?;
1516 ///
1517 /// while let Some(message) = messages.next().await {
1518 /// let message = message?;
1519 /// println!("message: {:?}", message);
1520 /// message.ack().await?;
1521 /// }
1522 /// # Ok(())
1523 /// # }
1524 /// ```
1525 pub fn min_pending(mut self, min_pending: usize) -> Self {
1526 self.min_pending = Some(min_pending);
1527 self
1528 }
1529
1530 /// Sets the priority at which this stream will get messages. If there are any requests with
1531 /// lower priority number, this stream will not get messages until those are satisfied.
1532 #[cfg(feature = "server_2_12")]
1533 pub fn priority(mut self, priority: usize) -> Self {
1534 self.priority = Some(priority);
1535 self
1536 }
1537
1538 /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1539 /// messages for a [Consumer].
1540 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1541 ///
1542 /// # Examples
1543 ///
1544 /// ```no_run
1545 /// # #[tokio::main]
1546 /// # async fn main() -> Result<(), async_nats::Error> {
1547 /// use async_nats::jetstream::consumer::PullConsumer;
1548 /// use futures_util::StreamExt;
1549 /// let client = async_nats::connect("localhost:4222").await?;
1550 /// let jetstream = async_nats::jetstream::new(client);
1551 ///
1552 /// let consumer: PullConsumer = jetstream
1553 /// .get_stream("events")
1554 /// .await?
1555 /// .get_consumer("pull")
1556 /// .await?;
1557 ///
1558 /// let mut messages = consumer
1559 /// .stream()
1560 /// .expires(std::time::Duration::from_secs(30))
1561 /// .group("A")
1562 /// .min_ack_pending(100)
1563 /// .messages()
1564 /// .await?;
1565 ///
1566 /// while let Some(message) = messages.next().await {
1567 /// let message = message?;
1568 /// println!("message: {:?}", message);
1569 /// message.ack().await?;
1570 /// }
1571 /// # Ok(())
1572 /// # }
1573 /// ```
1574 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1575 self.min_ack_pending = Some(min_ack_pending);
1576 self
1577 }
1578
1579 /// Setting group when using [Consumer] with [Config::priority_groups].
1580 ///
1581 /// # Examples
1582 ///
1583 /// ```no_run
1584 /// # #[tokio::main]
1585 /// # async fn main() -> Result<(), async_nats::Error> {
1586 /// use async_nats::jetstream::consumer::PullConsumer;
1587 /// use futures_util::StreamExt;
1588 /// let client = async_nats::connect("localhost:4222").await?;
1589 /// let jetstream = async_nats::jetstream::new(client);
1590 ///
1591 /// let consumer: PullConsumer = jetstream
1592 /// .get_stream("events")
1593 /// .await?
1594 /// .get_consumer("pull")
1595 /// .await?;
1596 ///
1597 /// let mut messages = consumer
1598 /// .stream()
1599 /// .expires(std::time::Duration::from_secs(30))
1600 /// .group("A")
1601 /// .min_ack_pending(100)
1602 /// .messages()
1603 /// .await?;
1604 ///
1605 /// while let Some(message) = messages.next().await {
1606 /// let message = message?;
1607 /// println!("message: {:?}", message);
1608 /// message.ack().await?;
1609 /// }
1610 /// # Ok(())
1611 /// # }
1612 /// ```
1613 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1614 self.group = Some(group.into());
1615 self
1616 }
1617
1618 /// Creates actual [Stream] with provided configuration.
1619 ///
1620 /// # Examples
1621 ///
1622 /// ```no_run
1623 /// # #[tokio::main]
1624 /// # async fn main() -> Result<(), async_nats::Error> {
1625 /// use async_nats::jetstream::consumer::PullConsumer;
1626 /// use futures_util::StreamExt;
1627 /// let client = async_nats::connect("localhost:4222").await?;
1628 /// let jetstream = async_nats::jetstream::new(client);
1629 ///
1630 /// let consumer: PullConsumer = jetstream
1631 /// .get_stream("events")
1632 /// .await?
1633 /// .get_consumer("pull")
1634 /// .await?;
1635 ///
1636 /// let mut messages = consumer
1637 /// .stream()
1638 /// .max_messages_per_batch(100)
1639 /// .messages()
1640 /// .await?;
1641 ///
1642 /// while let Some(message) = messages.next().await {
1643 /// let message = message?;
1644 /// println!("message: {:?}", message);
1645 /// message.ack().await?;
1646 /// }
1647 /// # Ok(())
1648 /// # }
1649 /// ```
1650 pub async fn messages(self) -> Result<Stream, StreamError> {
1651 Stream::stream(
1652 BatchConfig {
1653 batch: self.batch,
1654 expires: Some(self.expires),
1655 no_wait: false,
1656 max_bytes: self.max_bytes,
1657 idle_heartbeat: self.heartbeat,
1658 min_pending: self.min_pending,
1659 group: self.group,
1660 min_ack_pending: self.min_ack_pending,
1661 #[cfg(feature = "server_2_12")]
1662 priority: self.priority,
1663 },
1664 self.consumer,
1665 )
1666 .await
1667 }
1668}
1669
1670/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1671///
1672/// # Examples
1673///
1674/// ```no_run
1675/// # #[tokio::main]
1676/// # async fn main() -> Result<(), async_nats::Error> {
1677/// use async_nats::jetstream::consumer::PullConsumer;
1678/// use futures_util::StreamExt;
1679/// let client = async_nats::connect("localhost:4222").await?;
1680/// let jetstream = async_nats::jetstream::new(client);
1681///
1682/// let consumer: PullConsumer = jetstream
1683/// .get_stream("events")
1684/// .await?
1685/// .get_consumer("pull")
1686/// .await?;
1687///
1688/// let mut messages = consumer
1689/// .fetch()
1690/// .max_messages(100)
1691/// .max_bytes(1024)
1692/// .messages()
1693/// .await?;
1694///
1695/// while let Some(message) = messages.next().await {
1696/// let message = message?;
1697/// println!("message: {:?}", message);
1698/// message.ack().await?;
1699/// }
1700/// # Ok(())
1701/// # }
1702/// ```
1703pub struct FetchBuilder<'a> {
1704 batch: usize,
1705 max_bytes: usize,
1706 heartbeat: Duration,
1707 expires: Option<Duration>,
1708 min_pending: Option<usize>,
1709 min_ack_pending: Option<usize>,
1710 group: Option<String>,
1711 consumer: &'a Consumer<Config>,
1712}
1713
1714impl<'a> FetchBuilder<'a> {
1715 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1716 FetchBuilder {
1717 consumer,
1718 batch: 200,
1719 max_bytes: 0,
1720 expires: None,
1721 min_pending: None,
1722 min_ack_pending: None,
1723 group: None,
1724 heartbeat: Duration::default(),
1725 }
1726 }
1727
1728 /// Sets max bytes that can be buffered on the Client while processing already received
1729 /// messages.
1730 /// Higher values will yield better performance, but also potentially increase memory usage if
1731 /// application is acknowledging messages much slower than they arrive.
1732 ///
1733 /// Default values should provide reasonable balance between performance and memory usage.
1734 ///
1735 /// # Examples
1736 ///
1737 /// ```no_run
1738 /// # #[tokio::main]
1739 /// # async fn main() -> Result<(), async_nats::Error> {
1740 /// use futures_util::StreamExt;
1741 /// let client = async_nats::connect("localhost:4222").await?;
1742 /// let jetstream = async_nats::jetstream::new(client);
1743 ///
1744 /// let consumer = jetstream
1745 /// .get_stream("events")
1746 /// .await?
1747 /// .get_consumer("pull")
1748 /// .await?;
1749 ///
1750 /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1751 ///
1752 /// while let Some(message) = messages.next().await {
1753 /// let message = message?;
1754 /// println!("message: {:?}", message);
1755 /// message.ack().await?;
1756 /// }
1757 /// # Ok(())
1758 /// # }
1759 /// ```
1760 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1761 self.max_bytes = max_bytes;
1762 self
1763 }
1764
1765 /// Sets max number of messages that can be buffered on the Client while processing already received
1766 /// messages.
1767 /// Higher values will yield better performance, but also potentially increase memory usage if
1768 /// application is acknowledging messages much slower than they arrive.
1769 ///
1770 /// Default values should provide reasonable balance between performance and memory usage.
1771 ///
1772 /// # Examples
1773 ///
1774 /// ```no_run
1775 /// # #[tokio::main]
1776 /// # async fn main() -> Result<(), async_nats::Error> {
1777 /// use futures_util::StreamExt;
1778 /// let client = async_nats::connect("localhost:4222").await?;
1779 /// let jetstream = async_nats::jetstream::new(client);
1780 ///
1781 /// let consumer = jetstream
1782 /// .get_stream("events")
1783 /// .await?
1784 /// .get_consumer("pull")
1785 /// .await?;
1786 ///
1787 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1788 ///
1789 /// while let Some(message) = messages.next().await {
1790 /// let message = message?;
1791 /// println!("message: {:?}", message);
1792 /// message.ack().await?;
1793 /// }
1794 /// # Ok(())
1795 /// # }
1796 /// ```
1797 pub fn max_messages(mut self, batch: usize) -> Self {
1798 self.batch = batch;
1799 self
1800 }
1801
1802 /// Sets heartbeat which will be send by the server if there are no messages for a given
1803 /// [Consumer] pending.
1804 ///
1805 /// # Examples
1806 ///
1807 /// ```no_run
1808 /// # #[tokio::main]
1809 /// # async fn main() -> Result<(), async_nats::Error> {
1810 /// use async_nats::jetstream::consumer::PullConsumer;
1811 /// use futures_util::StreamExt;
1812 /// let client = async_nats::connect("localhost:4222").await?;
1813 /// let jetstream = async_nats::jetstream::new(client);
1814 ///
1815 /// let consumer = jetstream
1816 /// .get_stream("events")
1817 /// .await?
1818 /// .get_consumer("pull")
1819 /// .await?;
1820 ///
1821 /// let mut messages = consumer
1822 /// .fetch()
1823 /// .heartbeat(std::time::Duration::from_secs(10))
1824 /// .messages()
1825 /// .await?;
1826 ///
1827 /// while let Some(message) = messages.next().await {
1828 /// let message = message?;
1829 /// println!("message: {:?}", message);
1830 /// message.ack().await?;
1831 /// }
1832 /// # Ok(())
1833 /// # }
1834 /// ```
1835 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1836 self.heartbeat = heartbeat;
1837 self
1838 }
1839
1840 /// Low level API that does not need tweaking for most use cases.
1841 /// Sets how long each batch request waits for whole batch of messages before timing out.
1842 /// [Consumer] pending.
1843 ///
1844 /// # Examples
1845 ///
1846 /// ```no_run
1847 /// # #[tokio::main]
1848 /// # async fn main() -> Result<(), async_nats::Error> {
1849 /// use async_nats::jetstream::consumer::PullConsumer;
1850 /// use futures_util::StreamExt;
1851 ///
1852 /// let client = async_nats::connect("localhost:4222").await?;
1853 /// let jetstream = async_nats::jetstream::new(client);
1854 ///
1855 /// let consumer: PullConsumer = jetstream
1856 /// .get_stream("events")
1857 /// .await?
1858 /// .get_consumer("pull")
1859 /// .await?;
1860 ///
1861 /// let mut messages = consumer
1862 /// .fetch()
1863 /// .expires(std::time::Duration::from_secs(30))
1864 /// .messages()
1865 /// .await?;
1866 ///
1867 /// while let Some(message) = messages.next().await {
1868 /// let message = message?;
1869 /// println!("message: {:?}", message);
1870 /// message.ack().await?;
1871 /// }
1872 /// # Ok(())
1873 /// # }
1874 /// ```
1875 pub fn expires(mut self, expires: Duration) -> Self {
1876 self.expires = Some(expires);
1877 self
1878 }
1879
1880 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1881 /// messages.
1882 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1883 /// [PriorityPolicy::Overflow] set.
1884 ///
1885 /// # Examples
1886 ///
1887 /// ```no_run
1888 /// # #[tokio::main]
1889 /// # async fn main() -> Result<(), async_nats::Error> {
1890 /// use async_nats::jetstream::consumer::PullConsumer;
1891 /// use futures_util::StreamExt;
1892 ///
1893 /// let client = async_nats::connect("localhost:4222").await?;
1894 /// let jetstream = async_nats::jetstream::new(client);
1895 ///
1896 /// let consumer: PullConsumer = jetstream
1897 /// .get_stream("events")
1898 /// .await?
1899 /// .get_consumer("pull")
1900 /// .await?;
1901 ///
1902 /// let mut messages = consumer
1903 /// .fetch()
1904 /// .expires(std::time::Duration::from_secs(30))
1905 /// .group("A")
1906 /// .min_pending(100)
1907 /// .messages()
1908 /// .await?;
1909 ///
1910 /// while let Some(message) = messages.next().await {
1911 /// let message = message?;
1912 /// println!("message: {:?}", message);
1913 /// message.ack().await?;
1914 /// }
1915 /// # Ok(())
1916 /// # }
1917 /// ```
1918 pub fn min_pending(mut self, min_pending: usize) -> Self {
1919 self.min_pending = Some(min_pending);
1920 self
1921 }
1922
1923 /// Sets the priority at which this stream will get messages. If there are any requests with
1924 /// lower priority number, this stream will not get messages until those are satisfied.
1925 #[cfg(feature = "server_2_12")]
1926 pub fn priority(mut self, priority: usize) -> Self {
1927 self.batch = priority;
1928 self
1929 }
1930
1931 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1932 /// messages.
1933 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1934 /// [PriorityPolicy::Overflow] set.
1935 ///
1936 /// # Examples
1937 ///
1938 /// ```no_run
1939 /// # #[tokio::main]
1940 /// # async fn main() -> Result<(), async_nats::Error> {
1941 /// use async_nats::jetstream::consumer::PullConsumer;
1942 /// use futures_util::StreamExt;
1943 ///
1944 /// let client = async_nats::connect("localhost:4222").await?;
1945 /// let jetstream = async_nats::jetstream::new(client);
1946 ///
1947 /// let consumer: PullConsumer = jetstream
1948 /// .get_stream("events")
1949 /// .await?
1950 /// .get_consumer("pull")
1951 /// .await?;
1952 ///
1953 /// let mut messages = consumer
1954 /// .fetch()
1955 /// .expires(std::time::Duration::from_secs(30))
1956 /// .group("A")
1957 /// .min_ack_pending(100)
1958 /// .messages()
1959 /// .await?;
1960 ///
1961 /// while let Some(message) = messages.next().await {
1962 /// let message = message?;
1963 /// println!("message: {:?}", message);
1964 /// message.ack().await?;
1965 /// }
1966 /// # Ok(())
1967 /// # }
1968 /// ```
1969 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1970 self.min_ack_pending = Some(min_ack_pending);
1971 self
1972 }
1973
1974 /// Setting group when using [Consumer] with [PriorityPolicy].
1975 ///
1976 /// # Examples
1977 ///
1978 /// ```no_run
1979 /// # #[tokio::main]
1980 /// # async fn main() -> Result<(), async_nats::Error> {
1981 /// use async_nats::jetstream::consumer::PullConsumer;
1982 /// use futures_util::StreamExt;
1983 ///
1984 /// let client = async_nats::connect("localhost:4222").await?;
1985 /// let jetstream = async_nats::jetstream::new(client);
1986 ///
1987 /// let consumer: PullConsumer = jetstream
1988 /// .get_stream("events")
1989 /// .await?
1990 /// .get_consumer("pull")
1991 /// .await?;
1992 ///
1993 /// let mut messages = consumer
1994 /// .fetch()
1995 /// .expires(std::time::Duration::from_secs(30))
1996 /// .group("A")
1997 /// .min_ack_pending(100)
1998 /// .messages()
1999 /// .await?;
2000 ///
2001 /// while let Some(message) = messages.next().await {
2002 /// let message = message?;
2003 /// println!("message: {:?}", message);
2004 /// message.ack().await?;
2005 /// }
2006 /// # Ok(())
2007 /// # }
2008 /// ```
2009 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2010 self.group = Some(group.into());
2011 self
2012 }
2013
2014 /// Creates actual [Stream] with provided configuration.
2015 ///
2016 /// # Examples
2017 ///
2018 /// ```no_run
2019 /// # #[tokio::main]
2020 /// # async fn main() -> Result<(), async_nats::Error> {
2021 /// use async_nats::jetstream::consumer::PullConsumer;
2022 /// use futures_util::StreamExt;
2023 /// let client = async_nats::connect("localhost:4222").await?;
2024 /// let jetstream = async_nats::jetstream::new(client);
2025 ///
2026 /// let consumer: PullConsumer = jetstream
2027 /// .get_stream("events")
2028 /// .await?
2029 /// .get_consumer("pull")
2030 /// .await?;
2031 ///
2032 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
2033 ///
2034 /// while let Some(message) = messages.next().await {
2035 /// let message = message?;
2036 /// println!("message: {:?}", message);
2037 /// message.ack().await?;
2038 /// }
2039 /// # Ok(())
2040 /// # }
2041 /// ```
2042 pub async fn messages(self) -> Result<Batch, BatchError> {
2043 Batch::batch(
2044 BatchConfig {
2045 batch: self.batch,
2046 expires: self.expires,
2047 no_wait: true,
2048 max_bytes: self.max_bytes,
2049 idle_heartbeat: self.heartbeat,
2050 min_pending: self.min_pending,
2051 min_ack_pending: self.min_ack_pending,
2052 group: self.group,
2053 #[cfg(feature = "server_2_12")]
2054 priority: None,
2055 },
2056 self.consumer,
2057 )
2058 .await
2059 }
2060}
2061
2062/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2063///
2064/// # Examples
2065///
2066/// ```no_run
2067/// # #[tokio::main]
2068/// # async fn main() -> Result<(), async_nats::Error> {
2069/// use async_nats::jetstream::consumer::PullConsumer;
2070/// use futures_util::StreamExt;
2071/// let client = async_nats::connect("localhost:4222").await?;
2072/// let jetstream = async_nats::jetstream::new(client);
2073///
2074/// let consumer: PullConsumer = jetstream
2075/// .get_stream("events")
2076/// .await?
2077/// .get_consumer("pull")
2078/// .await?;
2079///
2080/// let mut messages = consumer
2081/// .batch()
2082/// .max_messages(100)
2083/// .max_bytes(1024)
2084/// .messages()
2085/// .await?;
2086///
2087/// while let Some(message) = messages.next().await {
2088/// let message = message?;
2089/// println!("message: {:?}", message);
2090/// message.ack().await?;
2091/// }
2092/// # Ok(())
2093/// # }
2094/// ```
2095pub struct BatchBuilder<'a> {
2096 batch: usize,
2097 max_bytes: usize,
2098 heartbeat: Duration,
2099 expires: Duration,
2100 min_pending: Option<usize>,
2101 min_ack_pending: Option<usize>,
2102 group: Option<String>,
2103 consumer: &'a Consumer<Config>,
2104}
2105
2106impl<'a> BatchBuilder<'a> {
2107 pub fn new(consumer: &'a Consumer<Config>) -> Self {
2108 BatchBuilder {
2109 consumer,
2110 batch: 200,
2111 max_bytes: 0,
2112 expires: Duration::ZERO,
2113 heartbeat: Duration::default(),
2114 min_pending: None,
2115 min_ack_pending: None,
2116 group: None,
2117 }
2118 }
2119
2120 /// Sets max bytes that can be buffered on the Client while processing already received
2121 /// messages.
2122 /// Higher values will yield better performance, but also potentially increase memory usage if
2123 /// application is acknowledging messages much slower than they arrive.
2124 ///
2125 /// Default values should provide reasonable balance between performance and memory usage.
2126 ///
2127 /// # Examples
2128 ///
2129 /// ```no_run
2130 /// # #[tokio::main]
2131 /// # async fn main() -> Result<(), async_nats::Error> {
2132 /// use async_nats::jetstream::consumer::PullConsumer;
2133 /// use futures_util::StreamExt;
2134 /// let client = async_nats::connect("localhost:4222").await?;
2135 /// let jetstream = async_nats::jetstream::new(client);
2136 ///
2137 /// let consumer: PullConsumer = jetstream
2138 /// .get_stream("events")
2139 /// .await?
2140 /// .get_consumer("pull")
2141 /// .await?;
2142 ///
2143 /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2144 ///
2145 /// while let Some(message) = messages.next().await {
2146 /// let message = message?;
2147 /// println!("message: {:?}", message);
2148 /// message.ack().await?;
2149 /// }
2150 /// # Ok(())
2151 /// # }
2152 /// ```
2153 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2154 self.max_bytes = max_bytes;
2155 self
2156 }
2157
2158 /// Sets max number of messages that can be buffered on the Client while processing already received
2159 /// messages.
2160 /// Higher values will yield better performance, but also potentially increase memory usage if
2161 /// application is acknowledging messages much slower than they arrive.
2162 ///
2163 /// Default values should provide reasonable balance between performance and memory usage.
2164 ///
2165 /// # Examples
2166 ///
2167 /// ```no_run
2168 /// # #[tokio::main]
2169 /// # async fn main() -> Result<(), async_nats::Error> {
2170 /// use async_nats::jetstream::consumer::PullConsumer;
2171 /// use futures_util::StreamExt;
2172 /// let client = async_nats::connect("localhost:4222").await?;
2173 /// let jetstream = async_nats::jetstream::new(client);
2174 ///
2175 /// let consumer: PullConsumer = jetstream
2176 /// .get_stream("events")
2177 /// .await?
2178 /// .get_consumer("pull")
2179 /// .await?;
2180 ///
2181 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2182 ///
2183 /// while let Some(message) = messages.next().await {
2184 /// let message = message?;
2185 /// println!("message: {:?}", message);
2186 /// message.ack().await?;
2187 /// }
2188 /// # Ok(())
2189 /// # }
2190 /// ```
2191 pub fn max_messages(mut self, batch: usize) -> Self {
2192 self.batch = batch;
2193 self
2194 }
2195
2196 /// Sets heartbeat which will be send by the server if there are no messages for a given
2197 /// [Consumer] pending.
2198 ///
2199 /// # Examples
2200 ///
2201 /// ```no_run
2202 /// # #[tokio::main]
2203 /// # async fn main() -> Result<(), async_nats::Error> {
2204 /// use async_nats::jetstream::consumer::PullConsumer;
2205 /// use futures_util::StreamExt;
2206 /// let client = async_nats::connect("localhost:4222").await?;
2207 /// let jetstream = async_nats::jetstream::new(client);
2208 ///
2209 /// let consumer: PullConsumer = jetstream
2210 /// .get_stream("events")
2211 /// .await?
2212 /// .get_consumer("pull")
2213 /// .await?;
2214 ///
2215 /// let mut messages = consumer
2216 /// .batch()
2217 /// .heartbeat(std::time::Duration::from_secs(10))
2218 /// .messages()
2219 /// .await?;
2220 ///
2221 /// while let Some(message) = messages.next().await {
2222 /// let message = message?;
2223 /// println!("message: {:?}", message);
2224 /// message.ack().await?;
2225 /// }
2226 /// # Ok(())
2227 /// # }
2228 /// ```
2229 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2230 self.heartbeat = heartbeat;
2231 self
2232 }
2233
2234 /// Sets overflow threshold for minimum pending messages before this stream will start getting
2235 /// messages.
2236 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2237 /// [PriorityPolicy::Overflow] set.
2238 ///
2239 /// # Examples
2240 ///
2241 /// ```no_run
2242 /// # #[tokio::main]
2243 /// # async fn main() -> Result<(), async_nats::Error> {
2244 /// use async_nats::jetstream::consumer::PullConsumer;
2245 /// use futures_util::StreamExt;
2246 ///
2247 /// let client = async_nats::connect("localhost:4222").await?;
2248 /// let jetstream = async_nats::jetstream::new(client);
2249 ///
2250 /// let consumer: PullConsumer = jetstream
2251 /// .get_stream("events")
2252 /// .await?
2253 /// .get_consumer("pull")
2254 /// .await?;
2255 ///
2256 /// let mut messages = consumer
2257 /// .batch()
2258 /// .expires(std::time::Duration::from_secs(30))
2259 /// .group("A")
2260 /// .min_pending(100)
2261 /// .messages()
2262 /// .await?;
2263 ///
2264 /// while let Some(message) = messages.next().await {
2265 /// let message = message?;
2266 /// println!("message: {:?}", message);
2267 /// message.ack().await?;
2268 /// }
2269 /// # Ok(())
2270 /// # }
2271 /// ```
2272 pub fn min_pending(mut self, min_pending: usize) -> Self {
2273 self.min_pending = Some(min_pending);
2274 self
2275 }
2276
2277 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2278 /// messages.
2279 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2280 /// [PriorityPolicy::Overflow] set.
2281 ///
2282 /// # Examples
2283 ///
2284 /// ```no_run
2285 /// # #[tokio::main]
2286 /// # async fn main() -> Result<(), async_nats::Error> {
2287 /// use async_nats::jetstream::consumer::PullConsumer;
2288 /// use futures_util::StreamExt;
2289 ///
2290 /// let client = async_nats::connect("localhost:4222").await?;
2291 /// let jetstream = async_nats::jetstream::new(client);
2292 ///
2293 /// let consumer: PullConsumer = jetstream
2294 /// .get_stream("events")
2295 /// .await?
2296 /// .get_consumer("pull")
2297 /// .await?;
2298 ///
2299 /// let mut messages = consumer
2300 /// .batch()
2301 /// .expires(std::time::Duration::from_secs(30))
2302 /// .group("A")
2303 /// .min_ack_pending(100)
2304 /// .messages()
2305 /// .await?;
2306 ///
2307 /// while let Some(message) = messages.next().await {
2308 /// let message = message?;
2309 /// println!("message: {:?}", message);
2310 /// message.ack().await?;
2311 /// }
2312 /// # Ok(())
2313 /// # }
2314 /// ```
2315 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2316 self.min_ack_pending = Some(min_ack_pending);
2317 self
2318 }
2319
2320 /// Setting group when using [Consumer] with [PriorityPolicy].
2321 ///
2322 /// # Examples
2323 ///
2324 /// ```no_run
2325 /// # #[tokio::main]
2326 /// # async fn main() -> Result<(), async_nats::Error> {
2327 /// use async_nats::jetstream::consumer::PullConsumer;
2328 /// use futures_util::StreamExt;
2329 ///
2330 /// let client = async_nats::connect("localhost:4222").await?;
2331 /// let jetstream = async_nats::jetstream::new(client);
2332 ///
2333 /// let consumer: PullConsumer = jetstream
2334 /// .get_stream("events")
2335 /// .await?
2336 /// .get_consumer("pull")
2337 /// .await?;
2338 ///
2339 /// let mut messages = consumer
2340 /// .batch()
2341 /// .expires(std::time::Duration::from_secs(30))
2342 /// .group("A")
2343 /// .min_ack_pending(100)
2344 /// .messages()
2345 /// .await?;
2346 ///
2347 /// while let Some(message) = messages.next().await {
2348 /// let message = message?;
2349 /// println!("message: {:?}", message);
2350 /// message.ack().await?;
2351 /// }
2352 /// # Ok(())
2353 /// # }
2354 /// ```
2355 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2356 self.group = Some(group.into());
2357 self
2358 }
2359
2360 /// Low level API that does not need tweaking for most use cases.
2361 /// Sets how long each batch request waits for whole batch of messages before timing out.
2362 /// [Consumer] pending.
2363 ///
2364 /// # Examples
2365 ///
2366 /// ```no_run
2367 /// # #[tokio::main]
2368 /// # async fn main() -> Result<(), async_nats::Error> {
2369 /// use async_nats::jetstream::consumer::PullConsumer;
2370 /// use futures_util::StreamExt;
2371 /// let client = async_nats::connect("localhost:4222").await?;
2372 /// let jetstream = async_nats::jetstream::new(client);
2373 ///
2374 /// let consumer: PullConsumer = jetstream
2375 /// .get_stream("events")
2376 /// .await?
2377 /// .get_consumer("pull")
2378 /// .await?;
2379 ///
2380 /// let mut messages = consumer
2381 /// .batch()
2382 /// .expires(std::time::Duration::from_secs(30))
2383 /// .messages()
2384 /// .await?;
2385 ///
2386 /// while let Some(message) = messages.next().await {
2387 /// let message = message?;
2388 /// println!("message: {:?}", message);
2389 /// message.ack().await?;
2390 /// }
2391 /// # Ok(())
2392 /// # }
2393 /// ```
2394 pub fn expires(mut self, expires: Duration) -> Self {
2395 self.expires = expires;
2396 self
2397 }
2398
2399 /// Creates actual [Stream] with provided configuration.
2400 ///
2401 /// # Examples
2402 ///
2403 /// ```no_run
2404 /// # #[tokio::main]
2405 /// # async fn main() -> Result<(), async_nats::Error> {
2406 /// use async_nats::jetstream::consumer::PullConsumer;
2407 /// use futures_util::StreamExt;
2408 /// let client = async_nats::connect("localhost:4222").await?;
2409 /// let jetstream = async_nats::jetstream::new(client);
2410 ///
2411 /// let consumer: PullConsumer = jetstream
2412 /// .get_stream("events")
2413 /// .await?
2414 /// .get_consumer("pull")
2415 /// .await?;
2416 ///
2417 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2418 ///
2419 /// while let Some(message) = messages.next().await {
2420 /// let message = message?;
2421 /// println!("message: {:?}", message);
2422 /// message.ack().await?;
2423 /// }
2424 /// # Ok(())
2425 /// # }
2426 /// ```
2427 pub async fn messages(self) -> Result<Batch, BatchError> {
2428 let config = BatchConfig {
2429 batch: self.batch,
2430 expires: Some(self.expires),
2431 no_wait: false,
2432 max_bytes: self.max_bytes,
2433 idle_heartbeat: self.heartbeat,
2434 min_pending: self.min_pending,
2435 min_ack_pending: self.min_ack_pending,
2436 group: self.group,
2437 #[cfg(feature = "server_2_12")]
2438 priority: None,
2439 };
2440 Batch::batch(config, self.consumer).await
2441 }
2442}
2443
2444/// Used for next Pull Request for Pull Consumer
2445#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2446pub struct BatchConfig {
2447 /// The number of messages that are being requested to be delivered.
2448 pub batch: usize,
2449 /// The optional number of nanoseconds that the server will store this next request for
2450 /// before forgetting about the pending batch size.
2451 #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2452 pub expires: Option<Duration>,
2453 /// This optionally causes the server not to store this pending request at all, but when there are no
2454 /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2455 /// can know when you reached the end of the stream for example. A 409 is returned if the
2456 /// Consumer has reached MaxAckPending limits.
2457 #[serde(skip_serializing_if = "is_default")]
2458 pub no_wait: bool,
2459
2460 /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2461 /// Whichever value is reached first, batch will complete.
2462 pub max_bytes: usize,
2463
2464 /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2465 /// client
2466 #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2467 pub idle_heartbeat: Duration,
2468
2469 pub min_pending: Option<usize>,
2470 pub min_ack_pending: Option<usize>,
2471 pub group: Option<String>,
2472 #[cfg(feature = "server_2_12")]
2473 pub priority: Option<usize>,
2474}
2475
2476fn is_default<T: Default + Eq>(t: &T) -> bool {
2477 t == &T::default()
2478}
2479
2480#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2481pub struct Config {
2482 /// Setting `durable_name` to `Some(...)` will cause this consumer
2483 /// to be "durable". This may be a good choice for workloads that
2484 /// benefit from the `JetStream` server or cluster remembering the
2485 /// progress of consumers for fault tolerance purposes. If a consumer
2486 /// crashes, the `JetStream` server or cluster will remember which
2487 /// messages the consumer acknowledged. When the consumer recovers,
2488 /// this information will allow the consumer to resume processing
2489 /// where it left off. If you're unsure, set this to `Some(...)`.
2490 ///
2491 /// Setting `durable_name` to `None` will cause this consumer to
2492 /// be "ephemeral". This may be a good choice for workloads where
2493 /// you don't need the `JetStream` server to remember the consumer's
2494 /// progress in the case of a crash, such as certain "high churn"
2495 /// workloads or workloads where a crashed instance is not required
2496 /// to recover.
2497 #[serde(default, skip_serializing_if = "Option::is_none")]
2498 pub durable_name: Option<String>,
2499 /// A name of the consumer. Can be specified for both durable and ephemeral
2500 /// consumers.
2501 #[serde(default, skip_serializing_if = "Option::is_none")]
2502 pub name: Option<String>,
2503 /// A short description of the purpose of this consumer.
2504 #[serde(default, skip_serializing_if = "Option::is_none")]
2505 pub description: Option<String>,
2506 /// Allows for a variety of options that determine how this consumer will receive messages
2507 #[serde(flatten)]
2508 pub deliver_policy: DeliverPolicy,
2509 /// How messages should be acknowledged
2510 pub ack_policy: AckPolicy,
2511 /// How long to allow messages to remain un-acknowledged before attempting redelivery
2512 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2513 pub ack_wait: Duration,
2514 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2515 #[serde(default, skip_serializing_if = "is_default")]
2516 pub max_deliver: i64,
2517 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2518 #[serde(default, skip_serializing_if = "is_default")]
2519 pub filter_subject: String,
2520 #[cfg(feature = "server_2_10")]
2521 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2522 #[serde(default, skip_serializing_if = "is_default")]
2523 pub filter_subjects: Vec<String>,
2524 /// Whether messages are sent as quickly as possible or at the rate of receipt
2525 pub replay_policy: ReplayPolicy,
2526 /// The rate of message delivery in bits per second
2527 #[serde(default, skip_serializing_if = "is_default")]
2528 pub rate_limit: u64,
2529 /// What percentage of acknowledgments should be samples for observability, 0-100
2530 #[serde(
2531 rename = "sample_freq",
2532 with = "super::sample_freq_deser",
2533 default,
2534 skip_serializing_if = "is_default"
2535 )]
2536 pub sample_frequency: u8,
2537 /// The maximum number of waiting consumers.
2538 #[serde(default, skip_serializing_if = "is_default")]
2539 pub max_waiting: i64,
2540 /// The maximum number of unacknowledged messages that may be
2541 /// in-flight before pausing sending additional messages to
2542 /// this consumer.
2543 #[serde(default, skip_serializing_if = "is_default")]
2544 pub max_ack_pending: i64,
2545 /// Only deliver headers without payloads.
2546 #[serde(default, skip_serializing_if = "is_default")]
2547 pub headers_only: bool,
2548 /// Maximum size of a request batch
2549 #[serde(default, skip_serializing_if = "is_default")]
2550 pub max_batch: i64,
2551 /// Maximum value of request max_bytes
2552 #[serde(default, skip_serializing_if = "is_default")]
2553 pub max_bytes: i64,
2554 /// Maximum value for request expiration
2555 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2556 pub max_expires: Duration,
2557 /// Threshold for consumer inactivity
2558 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2559 pub inactive_threshold: Duration,
2560 /// Number of consumer replicas
2561 #[serde(default, skip_serializing_if = "is_default")]
2562 pub num_replicas: usize,
2563 /// Force consumer to use memory storage.
2564 #[serde(default, skip_serializing_if = "is_default")]
2565 pub memory_storage: bool,
2566 #[cfg(feature = "server_2_10")]
2567 // Additional consumer metadata.
2568 #[serde(default, skip_serializing_if = "is_default")]
2569 pub metadata: HashMap<String, String>,
2570 /// Custom backoff for missed acknowledgments.
2571 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2572 pub backoff: Vec<Duration>,
2573
2574 /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2575 #[cfg(feature = "server_2_11")]
2576 #[serde(default, skip_serializing_if = "is_default")]
2577 pub priority_policy: PriorityPolicy,
2578 /// Priority groups for this consumer. Currently only one group is supported and is used
2579 /// in conjunction with [Config::priority_policy].
2580 #[cfg(feature = "server_2_11")]
2581 #[serde(default, skip_serializing_if = "is_default")]
2582 pub priority_groups: Vec<String>,
2583 /// For suspending the consumer until the deadline.
2584 #[cfg(feature = "server_2_11")]
2585 #[serde(
2586 default,
2587 with = "rfc3339::option",
2588 skip_serializing_if = "Option::is_none"
2589 )]
2590 pub pause_until: Option<OffsetDateTime>,
2591}
2592
2593impl IntoConsumerConfig for &Config {
2594 fn into_consumer_config(self) -> consumer::Config {
2595 self.clone().into_consumer_config()
2596 }
2597}
2598
2599impl IntoConsumerConfig for Config {
2600 fn into_consumer_config(self) -> consumer::Config {
2601 jetstream::consumer::Config {
2602 deliver_subject: None,
2603 name: self.name,
2604 durable_name: self.durable_name,
2605 description: self.description,
2606 deliver_group: None,
2607 deliver_policy: self.deliver_policy,
2608 ack_policy: self.ack_policy,
2609 ack_wait: self.ack_wait,
2610 max_deliver: self.max_deliver,
2611 filter_subject: self.filter_subject,
2612 #[cfg(feature = "server_2_10")]
2613 filter_subjects: self.filter_subjects,
2614 replay_policy: self.replay_policy,
2615 rate_limit: self.rate_limit,
2616 sample_frequency: self.sample_frequency,
2617 max_waiting: self.max_waiting,
2618 max_ack_pending: self.max_ack_pending,
2619 headers_only: self.headers_only,
2620 flow_control: false,
2621 idle_heartbeat: Duration::default(),
2622 max_batch: self.max_batch,
2623 max_bytes: self.max_bytes,
2624 max_expires: self.max_expires,
2625 inactive_threshold: self.inactive_threshold,
2626 num_replicas: self.num_replicas,
2627 memory_storage: self.memory_storage,
2628 #[cfg(feature = "server_2_10")]
2629 metadata: self.metadata,
2630 backoff: self.backoff,
2631 #[cfg(feature = "server_2_11")]
2632 priority_policy: self.priority_policy,
2633 #[cfg(feature = "server_2_11")]
2634 priority_groups: self.priority_groups,
2635 #[cfg(feature = "server_2_11")]
2636 pause_until: self.pause_until,
2637 }
2638 }
2639}
2640impl FromConsumer for Config {
2641 fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2642 if config.deliver_subject.is_some() {
2643 return Err(Box::new(std::io::Error::other(
2644 "pull consumer cannot have delivery subject",
2645 )));
2646 }
2647 Ok(Config {
2648 durable_name: config.durable_name,
2649 name: config.name,
2650 description: config.description,
2651 deliver_policy: config.deliver_policy,
2652 ack_policy: config.ack_policy,
2653 ack_wait: config.ack_wait,
2654 max_deliver: config.max_deliver,
2655 filter_subject: config.filter_subject,
2656 #[cfg(feature = "server_2_10")]
2657 filter_subjects: config.filter_subjects,
2658 replay_policy: config.replay_policy,
2659 rate_limit: config.rate_limit,
2660 sample_frequency: config.sample_frequency,
2661 max_waiting: config.max_waiting,
2662 max_ack_pending: config.max_ack_pending,
2663 headers_only: config.headers_only,
2664 max_batch: config.max_batch,
2665 max_bytes: config.max_bytes,
2666 max_expires: config.max_expires,
2667 inactive_threshold: config.inactive_threshold,
2668 num_replicas: config.num_replicas,
2669 memory_storage: config.memory_storage,
2670 #[cfg(feature = "server_2_10")]
2671 metadata: config.metadata,
2672 backoff: config.backoff,
2673 #[cfg(feature = "server_2_11")]
2674 priority_policy: config.priority_policy,
2675 #[cfg(feature = "server_2_11")]
2676 priority_groups: config.priority_groups,
2677 #[cfg(feature = "server_2_11")]
2678 pause_until: config.pause_until,
2679 })
2680 }
2681}
2682
2683#[derive(Clone, Copy, Debug, PartialEq)]
2684pub enum BatchRequestErrorKind {
2685 Publish,
2686 Flush,
2687 Serialize,
2688}
2689
2690impl std::fmt::Display for BatchRequestErrorKind {
2691 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2692 match self {
2693 Self::Publish => write!(f, "publish failed"),
2694 Self::Flush => write!(f, "flush failed"),
2695 Self::Serialize => write!(f, "serialize failed"),
2696 }
2697 }
2698}
2699
2700pub type BatchRequestError = Error<BatchRequestErrorKind>;
2701
2702#[derive(Clone, Copy, Debug, PartialEq)]
2703pub enum BatchErrorKind {
2704 Subscribe,
2705 Pull,
2706 Flush,
2707 Serialize,
2708}
2709
2710impl std::fmt::Display for BatchErrorKind {
2711 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2712 match self {
2713 Self::Pull => write!(f, "pull request failed"),
2714 Self::Flush => write!(f, "flush failed"),
2715 Self::Serialize => write!(f, "serialize failed"),
2716 Self::Subscribe => write!(f, "subscribe failed"),
2717 }
2718 }
2719}
2720
2721pub type BatchError = Error<BatchErrorKind>;
2722
2723impl From<SubscribeError> for BatchError {
2724 fn from(err: SubscribeError) -> Self {
2725 BatchError::with_source(BatchErrorKind::Subscribe, err)
2726 }
2727}
2728
2729impl From<BatchRequestError> for BatchError {
2730 fn from(err: BatchRequestError) -> Self {
2731 BatchError::with_source(BatchErrorKind::Pull, err)
2732 }
2733}
2734
2735#[derive(Clone, Copy, Debug, PartialEq)]
2736pub enum ConsumerRecreateErrorKind {
2737 GetStream,
2738 Recreate,
2739 TimedOut,
2740}
2741
2742impl std::fmt::Display for ConsumerRecreateErrorKind {
2743 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2744 match self {
2745 Self::GetStream => write!(f, "error getting stream"),
2746 Self::Recreate => write!(f, "consumer creation failed"),
2747 Self::TimedOut => write!(f, "timed out"),
2748 }
2749 }
2750}
2751
2752pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2753
2754async fn recreate_consumer_stream(
2755 context: &Context,
2756 config: &OrderedConfig,
2757 stream_name: &str,
2758 consumer_name: &str,
2759 sequence: u64,
2760) -> Result<Stream, ConsumerRecreateError> {
2761 let span = tracing::span!(
2762 tracing::Level::DEBUG,
2763 "recreate_ordered_consumer",
2764 stream_name = stream_name,
2765 consumer_name = consumer_name,
2766 sequence = sequence
2767 );
2768 let _span_handle = span.enter();
2769 let config = config.to_owned();
2770 trace!("delete old consumer before creating new one");
2771
2772 tokio::time::timeout(
2773 Duration::from_secs(5),
2774 context.delete_consumer_from_stream(consumer_name, stream_name),
2775 )
2776 .await
2777 .ok();
2778
2779 let deliver_policy = {
2780 if sequence == 0 {
2781 DeliverPolicy::All
2782 } else {
2783 DeliverPolicy::ByStartSequence {
2784 start_sequence: sequence + 1,
2785 }
2786 }
2787 };
2788 trace!("create the new ordered consumer for sequence {}", sequence);
2789 let consumer = tokio::time::timeout(
2790 Duration::from_secs(5),
2791 context.create_consumer_on_stream(
2792 jetstream::consumer::pull::OrderedConfig {
2793 deliver_policy,
2794 ..config.clone()
2795 },
2796 stream_name,
2797 ),
2798 )
2799 .await
2800 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2801 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2802
2803 let config = Consumer {
2804 config: config.clone().into(),
2805 context: context.clone(),
2806 info: consumer.info,
2807 };
2808
2809 trace!("create iterator");
2810 let stream = tokio::time::timeout(
2811 Duration::from_secs(5),
2812 Stream::stream(
2813 BatchConfig {
2814 batch: 500,
2815 expires: Some(Duration::from_secs(30)),
2816 no_wait: false,
2817 max_bytes: 0,
2818 idle_heartbeat: Duration::from_secs(15),
2819 min_pending: None,
2820 min_ack_pending: None,
2821 group: None,
2822 #[cfg(feature = "server_2_12")]
2823 priority: None,
2824 },
2825 &config,
2826 ),
2827 )
2828 .await
2829 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2830 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2831 trace!("recreated consumer");
2832 stream
2833}