async_nats/jetstream/consumer/pull.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use bytes::Bytes;
15use futures::{
16 future::{BoxFuture, Either},
17 FutureExt, StreamExt,
18};
19
20#[cfg(feature = "server_2_11")]
21use time::{serde::rfc3339, OffsetDateTime};
22
23#[cfg(feature = "server_2_10")]
24use std::collections::HashMap;
25use std::{future, pin::Pin, task::Poll, time::Duration};
26use tokio::{task::JoinHandle, time::Sleep};
27
28use serde::{Deserialize, Serialize};
29use tracing::{debug, trace};
30
31use crate::{
32 connection::State,
33 error::Error,
34 jetstream::{self, Context},
35 StatusCode, SubscribeError, Subscriber,
36};
37
38use crate::subject::Subject;
39
40#[cfg(feature = "server_2_11")]
41use super::PriorityPolicy;
42
43use super::{
44 backoff, AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy,
45 StreamError, StreamErrorKind,
46};
47use jetstream::consumer;
48
49impl Consumer<Config> {
50 /// Returns a stream of messages for Pull Consumer.
51 ///
52 /// # Example
53 ///
54 /// ```no_run
55 /// # #[tokio::main]
56 /// # async fn mains() -> Result<(), async_nats::Error> {
57 /// use futures::StreamExt;
58 /// use futures::TryStreamExt;
59 ///
60 /// let client = async_nats::connect("localhost:4222").await?;
61 /// let jetstream = async_nats::jetstream::new(client);
62 ///
63 /// let stream = jetstream
64 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
65 /// name: "events".to_string(),
66 /// max_messages: 10_000,
67 /// ..Default::default()
68 /// })
69 /// .await?;
70 ///
71 /// jetstream.publish("events", "data".into()).await?;
72 ///
73 /// let consumer = stream
74 /// .get_or_create_consumer(
75 /// "consumer",
76 /// async_nats::jetstream::consumer::pull::Config {
77 /// durable_name: Some("consumer".to_string()),
78 /// ..Default::default()
79 /// },
80 /// )
81 /// .await?;
82 ///
83 /// let mut messages = consumer.messages().await?.take(100);
84 /// while let Some(Ok(message)) = messages.next().await {
85 /// println!("got message {:?}", message);
86 /// message.ack().await?;
87 /// }
88 /// Ok(())
89 /// # }
90 /// ```
91 pub async fn messages(&self) -> Result<Stream, StreamError> {
92 Stream::stream(
93 BatchConfig {
94 batch: 200,
95 expires: Some(Duration::from_secs(30)),
96 no_wait: false,
97 max_bytes: 0,
98 idle_heartbeat: Duration::from_secs(15),
99 min_pending: None,
100 min_ack_pending: None,
101 group: None,
102 },
103 self,
104 )
105 .await
106 }
107
108 /// Enables customization of [Stream] by setting timeouts, heartbeats, maximum number of
109 /// messages or bytes buffered.
110 ///
111 /// # Examples
112 ///
113 /// ```no_run
114 /// # #[tokio::main]
115 /// # async fn main() -> Result<(), async_nats::Error> {
116 /// use async_nats::jetstream::consumer::PullConsumer;
117 /// use futures::StreamExt;
118 /// let client = async_nats::connect("localhost:4222").await?;
119 /// let jetstream = async_nats::jetstream::new(client);
120 ///
121 /// let consumer: PullConsumer = jetstream
122 /// .get_stream("events")
123 /// .await?
124 /// .get_consumer("pull")
125 /// .await?;
126 ///
127 /// let mut messages = consumer
128 /// .stream()
129 /// .max_messages_per_batch(100)
130 /// .max_bytes_per_batch(1024)
131 /// .messages()
132 /// .await?;
133 ///
134 /// while let Some(message) = messages.next().await {
135 /// let message = message?;
136 /// println!("message: {:?}", message);
137 /// message.ack().await?;
138 /// }
139 /// # Ok(())
140 /// # }
141 /// ```
142 pub fn stream(&self) -> StreamBuilder<'_> {
143 StreamBuilder::new(self)
144 }
145
146 pub async fn request_batch<I: Into<BatchConfig>>(
147 &self,
148 batch: I,
149 inbox: Subject,
150 ) -> Result<(), BatchRequestError> {
151 debug!("sending batch");
152 let subject = format!(
153 "{}.CONSUMER.MSG.NEXT.{}.{}",
154 self.context.prefix, self.info.stream_name, self.info.name
155 );
156
157 let payload = serde_json::to_vec(&batch.into())
158 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
159
160 self.context
161 .client
162 .publish_with_reply(subject, inbox, payload.into())
163 .await
164 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
165 debug!("batch request sent");
166 Ok(())
167 }
168
169 /// Returns a batch of specified number of messages, or if there are less messages on the
170 /// [Stream] than requested, returns all available messages.
171 ///
172 /// # Example
173 ///
174 /// ```no_run
175 /// # #[tokio::main]
176 /// # async fn mains() -> Result<(), async_nats::Error> {
177 /// use futures::StreamExt;
178 /// use futures::TryStreamExt;
179 ///
180 /// let client = async_nats::connect("localhost:4222").await?;
181 /// let jetstream = async_nats::jetstream::new(client);
182 ///
183 /// let stream = jetstream
184 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
185 /// name: "events".to_string(),
186 /// max_messages: 10_000,
187 /// ..Default::default()
188 /// })
189 /// .await?;
190 ///
191 /// jetstream.publish("events", "data".into()).await?;
192 ///
193 /// let consumer = stream
194 /// .get_or_create_consumer(
195 /// "consumer",
196 /// async_nats::jetstream::consumer::pull::Config {
197 /// durable_name: Some("consumer".to_string()),
198 /// ..Default::default()
199 /// },
200 /// )
201 /// .await?;
202 ///
203 /// for _ in 0..100 {
204 /// jetstream.publish("events", "data".into()).await?;
205 /// }
206 ///
207 /// let mut messages = consumer.fetch().max_messages(200).messages().await?;
208 /// // will finish after 100 messages, as that is the number of messages available on the
209 /// // stream.
210 /// while let Some(Ok(message)) = messages.next().await {
211 /// println!("got message {:?}", message);
212 /// message.ack().await?;
213 /// }
214 /// Ok(())
215 /// # }
216 /// ```
217 pub fn fetch(&self) -> FetchBuilder {
218 FetchBuilder::new(self)
219 }
220
221 /// Returns a batch of specified number of messages unless timeout happens first.
222 ///
223 /// # Example
224 ///
225 /// ```no_run
226 /// # #[tokio::main]
227 /// # async fn mains() -> Result<(), async_nats::Error> {
228 /// use futures::StreamExt;
229 /// use futures::TryStreamExt;
230 ///
231 /// let client = async_nats::connect("localhost:4222").await?;
232 /// let jetstream = async_nats::jetstream::new(client);
233 ///
234 /// let stream = jetstream
235 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
236 /// name: "events".to_string(),
237 /// max_messages: 10_000,
238 /// ..Default::default()
239 /// })
240 /// .await?;
241 ///
242 /// jetstream.publish("events", "data".into()).await?;
243 ///
244 /// let consumer = stream
245 /// .get_or_create_consumer(
246 /// "consumer",
247 /// async_nats::jetstream::consumer::pull::Config {
248 /// durable_name: Some("consumer".to_string()),
249 /// ..Default::default()
250 /// },
251 /// )
252 /// .await?;
253 ///
254 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
255 /// while let Some(Ok(message)) = messages.next().await {
256 /// println!("got message {:?}", message);
257 /// message.ack().await?;
258 /// }
259 /// Ok(())
260 /// # }
261 /// ```
262 pub fn batch(&self) -> BatchBuilder {
263 BatchBuilder::new(self)
264 }
265
266 /// Returns a sequence of [Batches][Batch] allowing for iterating over batches, and then over
267 /// messages in those batches.
268 ///
269 /// # Example
270 ///
271 /// ```no_run
272 /// # #[tokio::main]
273 /// # async fn mains() -> Result<(), async_nats::Error> {
274 /// use futures::StreamExt;
275 /// use futures::TryStreamExt;
276 ///
277 /// let client = async_nats::connect("localhost:4222").await?;
278 /// let jetstream = async_nats::jetstream::new(client);
279 ///
280 /// let stream = jetstream
281 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
282 /// name: "events".to_string(),
283 /// max_messages: 10_000,
284 /// ..Default::default()
285 /// })
286 /// .await?;
287 ///
288 /// jetstream.publish("events", "data".into()).await?;
289 ///
290 /// let consumer = stream
291 /// .get_or_create_consumer(
292 /// "consumer",
293 /// async_nats::jetstream::consumer::pull::Config {
294 /// durable_name: Some("consumer".to_string()),
295 /// ..Default::default()
296 /// },
297 /// )
298 /// .await?;
299 ///
300 /// let mut iter = consumer.sequence(50).unwrap().take(10);
301 /// while let Ok(Some(mut batch)) = iter.try_next().await {
302 /// while let Ok(Some(message)) = batch.try_next().await {
303 /// println!("message received: {:?}", message);
304 /// }
305 /// }
306 /// Ok(())
307 /// # }
308 /// ```
309 pub fn sequence(&self, batch: usize) -> Result<Sequence, BatchError> {
310 let context = self.context.clone();
311 let subject = format!(
312 "{}.CONSUMER.MSG.NEXT.{}.{}",
313 self.context.prefix, self.info.stream_name, self.info.name
314 );
315
316 let request = serde_json::to_vec(&BatchConfig {
317 batch,
318 expires: Some(Duration::from_secs(60)),
319 ..Default::default()
320 })
321 .map(Bytes::from)
322 .map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Serialize, err))?;
323
324 Ok(Sequence {
325 context,
326 subject,
327 request,
328 pending_messages: batch,
329 next: None,
330 })
331 }
332}
333
334pub struct Batch {
335 pending_messages: usize,
336 subscriber: Subscriber,
337 context: Context,
338 timeout: Option<Pin<Box<Sleep>>>,
339 terminated: bool,
340}
341
342impl Batch {
343 async fn batch(batch: BatchConfig, consumer: &Consumer<Config>) -> Result<Batch, BatchError> {
344 let inbox = Subject::from(consumer.context.client.new_inbox());
345 let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
346 consumer.request_batch(batch.clone(), inbox.clone()).await?;
347
348 let sleep = batch.expires.map(|expires| {
349 Box::pin(tokio::time::sleep(
350 expires.saturating_add(Duration::from_secs(5)),
351 ))
352 });
353
354 Ok(Batch {
355 pending_messages: batch.batch,
356 subscriber: subscription,
357 context: consumer.context.clone(),
358 terminated: false,
359 timeout: sleep,
360 })
361 }
362}
363
364impl futures::Stream for Batch {
365 type Item = Result<jetstream::Message, crate::Error>;
366
367 fn poll_next(
368 mut self: std::pin::Pin<&mut Self>,
369 cx: &mut std::task::Context<'_>,
370 ) -> std::task::Poll<Option<Self::Item>> {
371 if self.terminated {
372 return Poll::Ready(None);
373 }
374 if self.pending_messages == 0 {
375 self.terminated = true;
376 return Poll::Ready(None);
377 }
378 if let Some(sleep) = self.timeout.as_mut() {
379 match sleep.poll_unpin(cx) {
380 Poll::Ready(_) => {
381 debug!("batch timeout timer triggered");
382 // TODO(tp): Maybe we can be smarter here and before timing out, check if
383 // we consumed all the messages from the subscription buffer in case of user
384 // slowly consuming messages. Keep in mind that we time out here only if
385 // for some reason we missed timeout from the server and few seconds have
386 // passed since expected timeout message.
387 self.terminated = true;
388 return Poll::Ready(None);
389 }
390 Poll::Pending => (),
391 }
392 }
393 match self.subscriber.receiver.poll_recv(cx) {
394 Poll::Ready(maybe_message) => match maybe_message {
395 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
396 StatusCode::TIMEOUT => {
397 debug!("received timeout. Iterator done");
398 self.terminated = true;
399 Poll::Ready(None)
400 }
401 StatusCode::IDLE_HEARTBEAT => {
402 debug!("received heartbeat");
403 Poll::Pending
404 }
405 // If this is fetch variant, terminate on no more messages.
406 // We do not need to check if this is a fetch, not batch,
407 // as only fetch will send back `NO_MESSAGES` status.
408 StatusCode::NOT_FOUND => {
409 debug!("received `NO_MESSAGES`. Iterator done");
410 self.terminated = true;
411 Poll::Ready(None)
412 }
413 StatusCode::OK => {
414 debug!("received message");
415 self.pending_messages -= 1;
416 Poll::Ready(Some(Ok(jetstream::Message {
417 context: self.context.clone(),
418 message,
419 })))
420 }
421 status => {
422 debug!("received error");
423 self.terminated = true;
424 Poll::Ready(Some(Err(Box::new(std::io::Error::other(format!(
425 "error while processing messages from the stream: {}, {:?}",
426 status, message.description
427 ))))))
428 }
429 },
430 None => Poll::Ready(None),
431 },
432 std::task::Poll::Pending => std::task::Poll::Pending,
433 }
434 }
435}
436
437pub struct Sequence {
438 context: Context,
439 subject: String,
440 request: Bytes,
441 pending_messages: usize,
442 next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
443}
444
445impl futures::Stream for Sequence {
446 type Item = Result<Batch, MessagesError>;
447
448 fn poll_next(
449 mut self: std::pin::Pin<&mut Self>,
450 cx: &mut std::task::Context<'_>,
451 ) -> std::task::Poll<Option<Self::Item>> {
452 match self.next.as_mut() {
453 None => {
454 let context = self.context.clone();
455 let subject = self.subject.clone();
456 let request = self.request.clone();
457 let pending_messages = self.pending_messages;
458
459 let next = self.next.insert(Box::pin(async move {
460 let inbox = context.client.new_inbox();
461 let subscriber = context
462 .client
463 .subscribe(inbox.clone())
464 .await
465 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
466
467 context
468 .client
469 .publish_with_reply(subject, inbox, request)
470 .await
471 .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
472
473 // TODO(tp): Add timeout config and defaults.
474 Ok(Batch {
475 pending_messages,
476 subscriber,
477 context,
478 terminated: false,
479 timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
480 })
481 }));
482
483 match next.as_mut().poll(cx) {
484 Poll::Ready(result) => {
485 self.next = None;
486 Poll::Ready(Some(result.map_err(|err| {
487 MessagesError::with_source(MessagesErrorKind::Pull, err)
488 })))
489 }
490 Poll::Pending => Poll::Pending,
491 }
492 }
493
494 Some(next) => match next.as_mut().poll(cx) {
495 Poll::Ready(result) => {
496 self.next = None;
497 Poll::Ready(Some(result.map_err(|err| {
498 MessagesError::with_source(MessagesErrorKind::Pull, err)
499 })))
500 }
501 Poll::Pending => Poll::Pending,
502 },
503 }
504 }
505}
506
507impl Consumer<OrderedConfig> {
508 /// Returns a stream of messages for Ordered Pull Consumer.
509 ///
510 /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
511 /// Stream. It does not use acks, instead it tracks sequences and recreate itself whenever it
512 /// sees mismatch.
513 ///
514 /// # Example
515 ///
516 /// ```no_run
517 /// # #[tokio::main]
518 /// # async fn mains() -> Result<(), async_nats::Error> {
519 /// use futures::StreamExt;
520 /// use futures::TryStreamExt;
521 ///
522 /// let client = async_nats::connect("localhost:4222").await?;
523 /// let jetstream = async_nats::jetstream::new(client);
524 ///
525 /// let stream = jetstream
526 /// .get_or_create_stream(async_nats::jetstream::stream::Config {
527 /// name: "events".to_string(),
528 /// max_messages: 10_000,
529 /// ..Default::default()
530 /// })
531 /// .await?;
532 ///
533 /// jetstream.publish("events", "data".into()).await?;
534 ///
535 /// let consumer = stream
536 /// .get_or_create_consumer(
537 /// "consumer",
538 /// async_nats::jetstream::consumer::pull::OrderedConfig {
539 /// name: Some("consumer".to_string()),
540 /// ..Default::default()
541 /// },
542 /// )
543 /// .await?;
544 ///
545 /// let mut messages = consumer.messages().await?.take(100);
546 /// while let Some(Ok(message)) = messages.next().await {
547 /// println!("got message {:?}", message);
548 /// }
549 /// Ok(())
550 /// # }
551 /// ```
552 pub async fn messages(self) -> Result<Ordered, StreamError> {
553 let config = Consumer {
554 config: self.config.clone().into(),
555 context: self.context.clone(),
556 info: self.info.clone(),
557 };
558 let stream = Stream::stream(
559 BatchConfig {
560 batch: 500,
561 expires: Some(Duration::from_secs(30)),
562 no_wait: false,
563 max_bytes: 0,
564 idle_heartbeat: Duration::from_secs(15),
565 min_pending: None,
566 min_ack_pending: None,
567 group: None,
568 },
569 &config,
570 )
571 .await?;
572
573 Ok(Ordered {
574 consumer_sequence: 0,
575 stream_sequence: 0,
576 missed_heartbeats: false,
577 create_stream: None,
578 context: self.context.clone(),
579 consumer_name: self
580 .config
581 .name
582 .clone()
583 .unwrap_or_else(|| self.context.client.new_inbox()),
584 consumer: self.config,
585 stream: Some(stream),
586 stream_name: self.info.stream_name.clone(),
587 })
588 }
589}
590
591/// Configuration for consumers. From a high level, the
592/// `durable_name` and `deliver_subject` fields have a particularly
593/// strong influence on the consumer's overall behavior.
594#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
595pub struct OrderedConfig {
596 /// A name of the consumer. Can be specified for both durable and ephemeral
597 /// consumers.
598 #[serde(default, skip_serializing_if = "Option::is_none")]
599 pub name: Option<String>,
600 /// A short description of the purpose of this consumer.
601 #[serde(default, skip_serializing_if = "Option::is_none")]
602 pub description: Option<String>,
603 #[serde(default, skip_serializing_if = "is_default")]
604 pub filter_subject: String,
605 #[cfg(feature = "server_2_10")]
606 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
607 #[serde(default, skip_serializing_if = "is_default")]
608 pub filter_subjects: Vec<String>,
609 /// Whether messages are sent as quickly as possible or at the rate of receipt
610 pub replay_policy: ReplayPolicy,
611 /// The rate of message delivery in bits per second
612 #[serde(default, skip_serializing_if = "is_default")]
613 pub rate_limit: u64,
614 /// What percentage of acknowledgments should be samples for observability, 0-100
615 #[serde(
616 rename = "sample_freq",
617 with = "super::sample_freq_deser",
618 default,
619 skip_serializing_if = "is_default"
620 )]
621 pub sample_frequency: u8,
622 /// Only deliver headers without payloads.
623 #[serde(default, skip_serializing_if = "is_default")]
624 pub headers_only: bool,
625 /// Allows for a variety of options that determine how this consumer will receive messages
626 #[serde(flatten)]
627 pub deliver_policy: DeliverPolicy,
628 /// The maximum number of waiting consumers.
629 #[serde(default, skip_serializing_if = "is_default")]
630 pub max_waiting: i64,
631 #[cfg(feature = "server_2_10")]
632 // Additional consumer metadata.
633 #[serde(default, skip_serializing_if = "is_default")]
634 pub metadata: HashMap<String, String>,
635 // Maximum number of messages that can be requested in single Pull Request.
636 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
637 // [stream]
638 pub max_batch: i64,
639 // Maximum number of bytes that can be requested in single Pull Request.
640 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
641 // [stream]
642 pub max_bytes: i64,
643 // Maximum expiry that can be set for a single Pull Request.
644 // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
645 // [stream]
646 pub max_expires: Duration,
647}
648
649impl From<OrderedConfig> for Config {
650 fn from(config: OrderedConfig) -> Self {
651 Config {
652 durable_name: None,
653 name: config.name,
654 description: config.description,
655 deliver_policy: config.deliver_policy,
656 ack_policy: AckPolicy::None,
657 ack_wait: Duration::default(),
658 max_deliver: 1,
659 filter_subject: config.filter_subject,
660 #[cfg(feature = "server_2_10")]
661 filter_subjects: config.filter_subjects,
662 replay_policy: config.replay_policy,
663 rate_limit: config.rate_limit,
664 sample_frequency: config.sample_frequency,
665 max_waiting: config.max_waiting,
666 max_ack_pending: 0,
667 headers_only: config.headers_only,
668 max_batch: config.max_batch,
669 max_bytes: config.max_bytes,
670 max_expires: config.max_expires,
671 inactive_threshold: Duration::from_secs(30),
672 num_replicas: 1,
673 memory_storage: true,
674 #[cfg(feature = "server_2_10")]
675 metadata: config.metadata,
676 backoff: Vec::new(),
677 #[cfg(feature = "server_2_11")]
678 priority_policy: PriorityPolicy::None,
679 #[cfg(feature = "server_2_11")]
680 priority_groups: Vec::new(),
681 #[cfg(feature = "server_2_11")]
682 pause_until: None,
683 }
684 }
685}
686
687impl FromConsumer for OrderedConfig {
688 fn try_from_consumer_config(
689 config: crate::jetstream::consumer::Config,
690 ) -> Result<Self, crate::Error>
691 where
692 Self: Sized,
693 {
694 Ok(OrderedConfig {
695 name: config.name,
696 description: config.description,
697 filter_subject: config.filter_subject,
698 #[cfg(feature = "server_2_10")]
699 filter_subjects: config.filter_subjects,
700 replay_policy: config.replay_policy,
701 rate_limit: config.rate_limit,
702 sample_frequency: config.sample_frequency,
703 headers_only: config.headers_only,
704 deliver_policy: config.deliver_policy,
705 max_waiting: config.max_waiting,
706 #[cfg(feature = "server_2_10")]
707 metadata: config.metadata,
708 max_batch: config.max_batch,
709 max_bytes: config.max_bytes,
710 max_expires: config.max_expires,
711 })
712 }
713}
714
715impl IntoConsumerConfig for OrderedConfig {
716 fn into_consumer_config(self) -> super::Config {
717 jetstream::consumer::Config {
718 deliver_subject: None,
719 durable_name: None,
720 name: self.name,
721 description: self.description,
722 deliver_group: None,
723 deliver_policy: self.deliver_policy,
724 ack_policy: AckPolicy::None,
725 ack_wait: Duration::default(),
726 max_deliver: 1,
727 filter_subject: self.filter_subject,
728 #[cfg(feature = "server_2_10")]
729 filter_subjects: self.filter_subjects,
730 replay_policy: self.replay_policy,
731 rate_limit: self.rate_limit,
732 sample_frequency: self.sample_frequency,
733 max_waiting: self.max_waiting,
734 max_ack_pending: 0,
735 headers_only: self.headers_only,
736 flow_control: false,
737 idle_heartbeat: Duration::default(),
738 max_batch: 0,
739 max_bytes: 0,
740 max_expires: Duration::default(),
741 inactive_threshold: Duration::from_secs(30),
742 num_replicas: 1,
743 memory_storage: true,
744 #[cfg(feature = "server_2_10")]
745 metadata: self.metadata,
746 backoff: Vec::new(),
747 #[cfg(feature = "server_2_11")]
748 priority_policy: PriorityPolicy::None,
749 #[cfg(feature = "server_2_11")]
750 priority_groups: Vec::new(),
751 #[cfg(feature = "server_2_11")]
752 pause_until: None,
753 }
754 }
755}
756
757pub struct Ordered {
758 context: Context,
759 stream_name: String,
760 consumer: OrderedConfig,
761 consumer_name: String,
762 stream: Option<Stream>,
763 create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
764 consumer_sequence: u64,
765 stream_sequence: u64,
766 missed_heartbeats: bool,
767}
768
769impl futures::Stream for Ordered {
770 type Item = Result<jetstream::Message, OrderedError>;
771
772 fn poll_next(
773 mut self: Pin<&mut Self>,
774 cx: &mut std::task::Context<'_>,
775 ) -> Poll<Option<Self::Item>> {
776 let mut recreate = false;
777 // Poll messages
778 if let Some(stream) = self.stream.as_mut() {
779 match stream.poll_next_unpin(cx) {
780 Poll::Ready(message) => match message {
781 Some(message) => match message {
782 Ok(message) => {
783 self.missed_heartbeats = false;
784 let info = message.info().map_err(|err| {
785 OrderedError::with_source(OrderedErrorKind::Other, err)
786 })?;
787 trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
788 self.consumer_sequence,
789 self.stream_sequence,
790 info.consumer_sequence,
791 info.stream_sequence);
792 if info.consumer_sequence != self.consumer_sequence + 1 {
793 debug!(
794 "ordered consumer mismatch. current {}, info: {}",
795 self.consumer_sequence, info.consumer_sequence
796 );
797 recreate = true;
798 self.consumer_sequence = 0;
799 } else {
800 self.stream_sequence = info.stream_sequence;
801 self.consumer_sequence = info.consumer_sequence;
802 return Poll::Ready(Some(Ok(message)));
803 }
804 }
805 Err(err) => match err.kind() {
806 MessagesErrorKind::MissingHeartbeat => {
807 // If we have missed heartbeats set, it means this is a second
808 // missed heartbeat, so we need to recreate consumer.
809 if self.missed_heartbeats {
810 self.consumer_sequence = 0;
811 recreate = true;
812 } else {
813 self.missed_heartbeats = true;
814 }
815 }
816 MessagesErrorKind::ConsumerDeleted
817 | MessagesErrorKind::NoResponders => {
818 recreate = true;
819 self.consumer_sequence = 0;
820 }
821 MessagesErrorKind::Pull
822 | MessagesErrorKind::PushBasedConsumer
823 | MessagesErrorKind::Other => {
824 return Poll::Ready(Some(Err(err.into())));
825 }
826 },
827 },
828 None => return Poll::Ready(None),
829 },
830 Poll::Pending => (),
831 }
832 }
833 // Recreate consumer if needed
834 if recreate {
835 self.stream = None;
836 self.create_stream = Some(Box::pin({
837 let context = self.context.clone();
838 let config = self.consumer.clone();
839 let stream_name = self.stream_name.clone();
840 let consumer_name = self.consumer_name.clone();
841 let sequence = self.stream_sequence;
842 async move {
843 tryhard::retry_fn(|| {
844 recreate_consumer_stream(
845 &context,
846 &config,
847 &stream_name,
848 &consumer_name,
849 sequence,
850 )
851 })
852 .retries(u32::MAX)
853 .custom_backoff(backoff)
854 .await
855 }
856 }))
857 }
858 // check for recreation future
859 if let Some(result) = self.create_stream.as_mut() {
860 match result.poll_unpin(cx) {
861 Poll::Ready(result) => match result {
862 Ok(stream) => {
863 self.create_stream = None;
864 self.stream = Some(stream);
865 return self.poll_next(cx);
866 }
867 Err(err) => {
868 return Poll::Ready(Some(Err(OrderedError::with_source(
869 OrderedErrorKind::Recreate,
870 err,
871 ))))
872 }
873 },
874 Poll::Pending => (),
875 }
876 }
877 Poll::Pending
878 }
879}
880
881pub struct Stream {
882 pending_messages: usize,
883 pending_bytes: usize,
884 request_result_rx: tokio::sync::mpsc::Receiver<Result<bool, super::RequestError>>,
885 request_tx: tokio::sync::watch::Sender<()>,
886 subscriber: Subscriber,
887 batch_config: BatchConfig,
888 context: Context,
889 pending_request: bool,
890 task_handle: JoinHandle<()>,
891 terminated: bool,
892 heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
893 started: Option<tokio::sync::oneshot::Sender<()>>,
894}
895
896impl Drop for Stream {
897 fn drop(&mut self) {
898 self.task_handle.abort();
899 }
900}
901
902impl Stream {
903 async fn stream(
904 batch_config: BatchConfig,
905 consumer: &Consumer<Config>,
906 ) -> Result<Stream, StreamError> {
907 let inbox = consumer.context.client.new_inbox();
908 let subscription = consumer
909 .context
910 .client
911 .subscribe(inbox.clone())
912 .await
913 .map_err(|err| StreamError::with_source(StreamErrorKind::Other, err))?;
914 let subject = format!(
915 "{}.CONSUMER.MSG.NEXT.{}.{}",
916 consumer.context.prefix, consumer.info.stream_name, consumer.info.name
917 );
918
919 let (request_result_tx, request_result_rx) = tokio::sync::mpsc::channel(1);
920 let (request_tx, mut request_rx) = tokio::sync::watch::channel(());
921 let (started_tx, started_rx) = tokio::sync::oneshot::channel();
922 let task_handle = tokio::task::spawn({
923 let batch = batch_config.clone();
924 let consumer = consumer.clone();
925 let mut context = consumer.context.clone();
926 let inbox = inbox.clone();
927 async move {
928 started_rx.await.ok();
929 loop {
930 // this is just in edge case of missing response for some reason.
931 let expires = batch_config
932 .expires
933 .map(|expires| {
934 if expires.is_zero() {
935 Either::Left(future::pending())
936 } else {
937 Either::Right(tokio::time::sleep(
938 expires.saturating_add(Duration::from_secs(5)),
939 ))
940 }
941 })
942 .unwrap_or_else(|| Either::Left(future::pending()));
943 // Need to check previous state, as `changed` will always fire on first
944 // call.
945 let prev_state = context.client.state.borrow().to_owned();
946 let mut pending_reset = false;
947
948 tokio::select! {
949 _ = context.client.state.changed() => {
950 let state = context.client.state.borrow().to_owned();
951 if !(state == crate::connection::State::Connected
952 && prev_state != State::Connected) {
953 continue;
954 }
955 debug!("detected !Connected -> Connected state change");
956
957 match tryhard::retry_fn(|| consumer.fetch_info())
958 .retries(5).custom_backoff(backoff).await
959 .map_err(|err| crate::RequestError::with_source(crate::RequestErrorKind::Other, err).into()) {
960 Ok(info) => {
961 if info.num_waiting == 0 {
962 pending_reset = true;
963 }
964 }
965 Err(err) => {
966 if let Err(err) = request_result_tx.send(Err(err)).await {
967 debug!("failed to sent request result: {}", err);
968 }
969 },
970 }
971 },
972 _ = request_rx.changed() => debug!("task received request request"),
973 _ = expires => {
974 pending_reset = true;
975 debug!("expired pull request")},
976 }
977
978 let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap();
979 let result = context
980 .client
981 .publish_with_reply(subject.clone(), inbox.clone(), request.clone())
982 .await
983 .map(|_| pending_reset);
984 // TODO: add tracing instead of ignoring this.
985 request_result_tx
986 .send(result.map(|_| pending_reset).map_err(|err| {
987 crate::RequestError::with_source(crate::RequestErrorKind::Other, err)
988 .into()
989 }))
990 .await
991 .ok();
992 trace!("result send over tx");
993 }
994 }
995 });
996
997 Ok(Stream {
998 task_handle,
999 request_result_rx,
1000 request_tx,
1001 batch_config,
1002 pending_messages: 0,
1003 pending_bytes: 0,
1004 subscriber: subscription,
1005 context: consumer.context.clone(),
1006 pending_request: false,
1007 terminated: false,
1008 heartbeat_timeout: None,
1009 started: Some(started_tx),
1010 })
1011 }
1012}
1013
1014#[derive(Clone, Copy, Debug, PartialEq)]
1015pub enum OrderedErrorKind {
1016 MissingHeartbeat,
1017 ConsumerDeleted,
1018 Pull,
1019 PushBasedConsumer,
1020 Recreate,
1021 NoResponders,
1022 Other,
1023}
1024
1025impl std::fmt::Display for OrderedErrorKind {
1026 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027 match self {
1028 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1029 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1030 Self::Pull => write!(f, "pull request failed"),
1031 Self::Other => write!(f, "error"),
1032 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1033 Self::Recreate => write!(f, "consumer recreation failed"),
1034 Self::NoResponders => write!(f, "no responders"),
1035 }
1036 }
1037}
1038
1039pub type OrderedError = Error<OrderedErrorKind>;
1040
1041impl From<MessagesError> for OrderedError {
1042 fn from(err: MessagesError) -> Self {
1043 match err.kind() {
1044 MessagesErrorKind::MissingHeartbeat => {
1045 OrderedError::new(OrderedErrorKind::MissingHeartbeat)
1046 }
1047 MessagesErrorKind::ConsumerDeleted => {
1048 OrderedError::new(OrderedErrorKind::ConsumerDeleted)
1049 }
1050 MessagesErrorKind::Pull => OrderedError {
1051 kind: OrderedErrorKind::Pull,
1052 source: err.source,
1053 },
1054 MessagesErrorKind::PushBasedConsumer => {
1055 OrderedError::new(OrderedErrorKind::PushBasedConsumer)
1056 }
1057 MessagesErrorKind::Other => OrderedError {
1058 kind: OrderedErrorKind::Other,
1059 source: err.source,
1060 },
1061 MessagesErrorKind::NoResponders => OrderedError::new(OrderedErrorKind::NoResponders),
1062 }
1063 }
1064}
1065
1066#[derive(Clone, Copy, Debug, PartialEq)]
1067pub enum MessagesErrorKind {
1068 MissingHeartbeat,
1069 ConsumerDeleted,
1070 Pull,
1071 PushBasedConsumer,
1072 NoResponders,
1073 Other,
1074}
1075
1076impl std::fmt::Display for MessagesErrorKind {
1077 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1078 match self {
1079 Self::MissingHeartbeat => write!(f, "missed idle heartbeat"),
1080 Self::ConsumerDeleted => write!(f, "consumer deleted"),
1081 Self::Pull => write!(f, "pull request failed"),
1082 Self::Other => write!(f, "error"),
1083 Self::NoResponders => write!(f, "no responders"),
1084 Self::PushBasedConsumer => write!(f, "cannot use with push consumer"),
1085 }
1086 }
1087}
1088
1089pub type MessagesError = Error<MessagesErrorKind>;
1090
1091impl futures::Stream for Stream {
1092 type Item = Result<jetstream::Message, MessagesError>;
1093
1094 fn poll_next(
1095 mut self: std::pin::Pin<&mut Self>,
1096 cx: &mut std::task::Context<'_>,
1097 ) -> std::task::Poll<Option<Self::Item>> {
1098 if let Some(started) = self.started.take() {
1099 trace!("stream started, sending started signal");
1100 if started.send(()).is_err() {
1101 debug!("failed to send started signal");
1102 }
1103 }
1104 if self.terminated {
1105 return Poll::Ready(None);
1106 }
1107
1108 if !self.batch_config.idle_heartbeat.is_zero() {
1109 trace!("checking idle hearbeats");
1110 let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
1111 match self
1112 .heartbeat_timeout
1113 .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
1114 .poll_unpin(cx)
1115 {
1116 Poll::Ready(_) => {
1117 self.heartbeat_timeout = None;
1118 return Poll::Ready(Some(Err(MessagesError::new(
1119 MessagesErrorKind::MissingHeartbeat,
1120 ))));
1121 }
1122 Poll::Pending => (),
1123 }
1124 }
1125
1126 loop {
1127 trace!("pending messages: {}", self.pending_messages);
1128 if (self.pending_messages <= self.batch_config.batch / 2
1129 || (self.batch_config.max_bytes > 0
1130 && self.pending_bytes <= self.batch_config.max_bytes / 2))
1131 && !self.pending_request
1132 {
1133 debug!("pending messages reached threshold to send new fetch request");
1134 self.request_tx.send(()).ok();
1135 self.pending_request = true;
1136 }
1137
1138 match self.request_result_rx.poll_recv(cx) {
1139 Poll::Ready(resp) => match resp {
1140 Some(resp) => match resp {
1141 Ok(reset) => {
1142 trace!("request response: {:?}", reset);
1143 debug!("request sent, setting pending messages");
1144 if reset {
1145 self.pending_messages = self.batch_config.batch;
1146 self.pending_bytes = self.batch_config.max_bytes;
1147 } else {
1148 self.pending_messages += self.batch_config.batch;
1149 self.pending_bytes += self.batch_config.max_bytes;
1150 }
1151 self.pending_request = false;
1152 continue;
1153 }
1154 Err(err) => {
1155 return Poll::Ready(Some(Err(MessagesError::with_source(
1156 MessagesErrorKind::Pull,
1157 err,
1158 ))))
1159 }
1160 },
1161 None => return Poll::Ready(None),
1162 },
1163 Poll::Pending => {
1164 trace!("pending result");
1165 }
1166 }
1167
1168 trace!("polling subscriber");
1169 match self.subscriber.receiver.poll_recv(cx) {
1170 Poll::Ready(maybe_message) => {
1171 self.heartbeat_timeout = None;
1172 match maybe_message {
1173 Some(message) => match message.status.unwrap_or(StatusCode::OK) {
1174 StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
1175 debug!("received status message: {:?}", message);
1176 // If consumer has been deleted, error and shutdown the iterator.
1177 if message.description.as_deref() == Some("Consumer Deleted") {
1178 self.terminated = true;
1179 return Poll::Ready(Some(Err(MessagesError::new(
1180 MessagesErrorKind::ConsumerDeleted,
1181 ))));
1182 }
1183 // If consumer is not pull based, error and shutdown the iterator.
1184 if message.description.as_deref() == Some("Consumer is push based")
1185 {
1186 self.terminated = true;
1187 return Poll::Ready(Some(Err(MessagesError::new(
1188 MessagesErrorKind::PushBasedConsumer,
1189 ))));
1190 }
1191
1192 // Do accounting for messages left after terminated/completed pull request.
1193 let pending_messages = message
1194 .headers
1195 .as_ref()
1196 .and_then(|headers| headers.get("Nats-Pending-Messages"))
1197 .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse())
1198 .map_err(|err| {
1199 MessagesError::with_source(MessagesErrorKind::Other, err)
1200 })?;
1201
1202 let pending_bytes = message
1203 .headers
1204 .as_ref()
1205 .and_then(|headers| headers.get("Nats-Pending-Bytes"))
1206 .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse())
1207 .map_err(|err| {
1208 MessagesError::with_source(MessagesErrorKind::Other, err)
1209 })?;
1210
1211 debug!(
1212 "timeout reached. remaining messages: {}, bytes {}",
1213 pending_messages, pending_bytes
1214 );
1215 self.pending_messages =
1216 self.pending_messages.saturating_sub(pending_messages);
1217 trace!("message bytes len: {}", pending_bytes);
1218 self.pending_bytes =
1219 self.pending_bytes.saturating_sub(pending_bytes);
1220 continue;
1221 }
1222 // Idle Hearbeat means we have no messages, but consumer is fine.
1223 StatusCode::IDLE_HEARTBEAT => {
1224 debug!("received idle heartbeat");
1225 continue;
1226 }
1227 // We got an message from a stream.
1228 StatusCode::OK => {
1229 trace!("message received");
1230 self.pending_messages = self.pending_messages.saturating_sub(1);
1231 self.pending_bytes =
1232 self.pending_bytes.saturating_sub(message.length);
1233 return Poll::Ready(Some(Ok(jetstream::Message {
1234 context: self.context.clone(),
1235 message,
1236 })));
1237 }
1238 StatusCode::NO_RESPONDERS => {
1239 debug!("received no responders");
1240 return Poll::Ready(Some(Err(MessagesError::new(
1241 MessagesErrorKind::NoResponders,
1242 ))));
1243 }
1244 status => {
1245 debug!("received unknown message: {:?}", message);
1246 return Poll::Ready(Some(Err(MessagesError::with_source(
1247 MessagesErrorKind::Other,
1248 format!(
1249 "error while processing messages from the stream: {}, {:?}",
1250 status, message.description
1251 ),
1252 ))));
1253 }
1254 },
1255 None => return Poll::Ready(None),
1256 }
1257 }
1258 Poll::Pending => {
1259 debug!("subscriber still pending");
1260 return std::task::Poll::Pending;
1261 }
1262 }
1263 }
1264 }
1265}
1266
1267/// Used for building configuration for a [Stream]. Created by a [Consumer::stream] on a [Consumer].
1268///
1269/// # Examples
1270///
1271/// ```no_run
1272/// # #[tokio::main]
1273/// # async fn main() -> Result<(), async_nats::Error> {
1274/// use futures::StreamExt;
1275/// use async_nats::jetstream::consumer::PullConsumer;
1276/// let client = async_nats::connect("localhost:4222").await?;
1277/// let jetstream = async_nats::jetstream::new(client);
1278///
1279/// let consumer: PullConsumer = jetstream
1280/// .get_stream("events").await?
1281/// .get_consumer("pull").await?;
1282///
1283/// let mut messages = consumer.stream()
1284/// .max_messages_per_batch(100)
1285/// .max_bytes_per_batch(1024)
1286/// .messages().await?;
1287///
1288/// while let Some(message) = messages.next().await {
1289/// let message = message?;
1290/// println!("message: {:?}", message);
1291/// message.ack().await?;
1292/// }
1293/// # Ok(())
1294/// # }
1295pub struct StreamBuilder<'a> {
1296 batch: usize,
1297 max_bytes: usize,
1298 heartbeat: Duration,
1299 expires: Duration,
1300 group: Option<String>,
1301 min_pending: Option<usize>,
1302 min_ack_pending: Option<usize>,
1303 consumer: &'a Consumer<Config>,
1304}
1305
1306impl<'a> StreamBuilder<'a> {
1307 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1308 StreamBuilder {
1309 consumer,
1310 batch: 200,
1311 max_bytes: 0,
1312 expires: Duration::from_secs(30),
1313 heartbeat: Duration::default(),
1314 group: None,
1315 min_pending: None,
1316 min_ack_pending: None,
1317 }
1318 }
1319
1320 /// Sets max bytes that can be buffered on the Client while processing already received
1321 /// messages.
1322 /// Higher values will yield better performance, but also potentially increase memory usage if
1323 /// application is acknowledging messages much slower than they arrive.
1324 ///
1325 /// Default values should provide reasonable balance between performance and memory usage.
1326 ///
1327 /// # Examples
1328 ///
1329 /// ```no_run
1330 /// # #[tokio::main]
1331 /// # async fn main() -> Result<(), async_nats::Error> {
1332 /// use async_nats::jetstream::consumer::PullConsumer;
1333 /// use futures::StreamExt;
1334 /// let client = async_nats::connect("localhost:4222").await?;
1335 /// let jetstream = async_nats::jetstream::new(client);
1336 ///
1337 /// let consumer: PullConsumer = jetstream
1338 /// .get_stream("events")
1339 /// .await?
1340 /// .get_consumer("pull")
1341 /// .await?;
1342 ///
1343 /// let mut messages = consumer
1344 /// .stream()
1345 /// .max_bytes_per_batch(1024)
1346 /// .messages()
1347 /// .await?;
1348 ///
1349 /// while let Some(message) = messages.next().await {
1350 /// let message = message?;
1351 /// println!("message: {:?}", message);
1352 /// message.ack().await?;
1353 /// }
1354 /// # Ok(())
1355 /// # }
1356 /// ```
1357 pub fn max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
1358 self.max_bytes = max_bytes;
1359 self
1360 }
1361
1362 /// Sets max number of messages that can be buffered on the Client while processing already received
1363 /// messages.
1364 /// Higher values will yield better performance, but also potentially increase memory usage if
1365 /// application is acknowledging messages much slower than they arrive.
1366 ///
1367 /// Default values should provide reasonable balance between performance and memory usage.
1368 ///
1369 /// # Examples
1370 ///
1371 /// ```no_run
1372 /// # #[tokio::main]
1373 /// # async fn main() -> Result<(), async_nats::Error> {
1374 /// use async_nats::jetstream::consumer::PullConsumer;
1375 /// use futures::StreamExt;
1376 /// let client = async_nats::connect("localhost:4222").await?;
1377 /// let jetstream = async_nats::jetstream::new(client);
1378 ///
1379 /// let consumer: PullConsumer = jetstream
1380 /// .get_stream("events")
1381 /// .await?
1382 /// .get_consumer("pull")
1383 /// .await?;
1384 ///
1385 /// let mut messages = consumer
1386 /// .stream()
1387 /// .max_messages_per_batch(100)
1388 /// .messages()
1389 /// .await?;
1390 ///
1391 /// while let Some(message) = messages.next().await {
1392 /// let message = message?;
1393 /// println!("message: {:?}", message);
1394 /// message.ack().await?;
1395 /// }
1396 /// # Ok(())
1397 /// # }
1398 /// ```
1399 pub fn max_messages_per_batch(mut self, batch: usize) -> Self {
1400 self.batch = batch;
1401 self
1402 }
1403
1404 /// Sets heartbeat which will be send by the server if there are no messages for a given
1405 /// [Consumer] pending.
1406 ///
1407 /// # Examples
1408 ///
1409 /// ```no_run
1410 /// # #[tokio::main]
1411 /// # async fn main() -> Result<(), async_nats::Error> {
1412 /// use async_nats::jetstream::consumer::PullConsumer;
1413 /// use futures::StreamExt;
1414 /// let client = async_nats::connect("localhost:4222").await?;
1415 /// let jetstream = async_nats::jetstream::new(client);
1416 ///
1417 /// let consumer: PullConsumer = jetstream
1418 /// .get_stream("events")
1419 /// .await?
1420 /// .get_consumer("pull")
1421 /// .await?;
1422 ///
1423 /// let mut messages = consumer
1424 /// .stream()
1425 /// .heartbeat(std::time::Duration::from_secs(10))
1426 /// .messages()
1427 /// .await?;
1428 ///
1429 /// while let Some(message) = messages.next().await {
1430 /// let message = message?;
1431 /// println!("message: {:?}", message);
1432 /// message.ack().await?;
1433 /// }
1434 /// # Ok(())
1435 /// # }
1436 /// ```
1437 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1438 self.heartbeat = heartbeat;
1439 self
1440 }
1441
1442 /// Low level API that does not need tweaking for most use cases.
1443 /// Sets how long each batch request waits for whole batch of messages before timing out.
1444 /// [Consumer] pending.
1445 ///
1446 /// # Examples
1447 ///
1448 /// ```no_run
1449 /// # #[tokio::main]
1450 /// # async fn main() -> Result<(), async_nats::Error> {
1451 /// use async_nats::jetstream::consumer::PullConsumer;
1452 /// use futures::StreamExt;
1453 /// let client = async_nats::connect("localhost:4222").await?;
1454 /// let jetstream = async_nats::jetstream::new(client);
1455 ///
1456 /// let consumer: PullConsumer = jetstream
1457 /// .get_stream("events")
1458 /// .await?
1459 /// .get_consumer("pull")
1460 /// .await?;
1461 ///
1462 /// let mut messages = consumer
1463 /// .stream()
1464 /// .expires(std::time::Duration::from_secs(30))
1465 /// .messages()
1466 /// .await?;
1467 ///
1468 /// while let Some(message) = messages.next().await {
1469 /// let message = message?;
1470 /// println!("message: {:?}", message);
1471 /// message.ack().await?;
1472 /// }
1473 /// # Ok(())
1474 /// # }
1475 /// ```
1476 pub fn expires(mut self, expires: Duration) -> Self {
1477 self.expires = expires;
1478 self
1479 }
1480
1481 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1482 /// messages for a [Consumer].
1483 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1484 ///
1485 /// # Examples
1486 ///
1487 /// ```no_run
1488 /// # #[tokio::main]
1489 /// # async fn main() -> Result<(), async_nats::Error> {
1490 /// use async_nats::jetstream::consumer::PullConsumer;
1491 /// use futures::StreamExt;
1492 /// let client = async_nats::connect("localhost:4222").await?;
1493 /// let jetstream = async_nats::jetstream::new(client);
1494 ///
1495 /// let consumer: PullConsumer = jetstream
1496 /// .get_stream("events")
1497 /// .await?
1498 /// .get_consumer("pull")
1499 /// .await?;
1500 ///
1501 /// let mut messages = consumer
1502 /// .stream()
1503 /// .expires(std::time::Duration::from_secs(30))
1504 /// .group("A")
1505 /// .min_pending(100)
1506 /// .messages()
1507 /// .await?;
1508 ///
1509 /// while let Some(message) = messages.next().await {
1510 /// let message = message?;
1511 /// println!("message: {:?}", message);
1512 /// message.ack().await?;
1513 /// }
1514 /// # Ok(())
1515 /// # }
1516 /// ```
1517 pub fn min_pending(mut self, min_pending: usize) -> Self {
1518 self.min_pending = Some(min_pending);
1519 self
1520 }
1521
1522 /// Sets overflow threshold for minimum pending acknowledgements before this stream will start getting
1523 /// messages for a [Consumer].
1524 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and [PriorityPolicy::Overflow] set.
1525 ///
1526 /// # Examples
1527 ///
1528 /// ```no_run
1529 /// # #[tokio::main]
1530 /// # async fn main() -> Result<(), async_nats::Error> {
1531 /// use async_nats::jetstream::consumer::PullConsumer;
1532 /// use futures::StreamExt;
1533 /// let client = async_nats::connect("localhost:4222").await?;
1534 /// let jetstream = async_nats::jetstream::new(client);
1535 ///
1536 /// let consumer: PullConsumer = jetstream
1537 /// .get_stream("events")
1538 /// .await?
1539 /// .get_consumer("pull")
1540 /// .await?;
1541 ///
1542 /// let mut messages = consumer
1543 /// .stream()
1544 /// .expires(std::time::Duration::from_secs(30))
1545 /// .group("A")
1546 /// .min_ack_pending(100)
1547 /// .messages()
1548 /// .await?;
1549 ///
1550 /// while let Some(message) = messages.next().await {
1551 /// let message = message?;
1552 /// println!("message: {:?}", message);
1553 /// message.ack().await?;
1554 /// }
1555 /// # Ok(())
1556 /// # }
1557 /// ```
1558 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1559 self.min_ack_pending = Some(min_ack_pending);
1560 self
1561 }
1562
1563 /// Setting group when using [Consumer] with [Config::priority_groups].
1564 ///
1565 /// # Examples
1566 ///
1567 /// ```no_run
1568 /// # #[tokio::main]
1569 /// # async fn main() -> Result<(), async_nats::Error> {
1570 /// use async_nats::jetstream::consumer::PullConsumer;
1571 /// use futures::StreamExt;
1572 /// let client = async_nats::connect("localhost:4222").await?;
1573 /// let jetstream = async_nats::jetstream::new(client);
1574 ///
1575 /// let consumer: PullConsumer = jetstream
1576 /// .get_stream("events")
1577 /// .await?
1578 /// .get_consumer("pull")
1579 /// .await?;
1580 ///
1581 /// let mut messages = consumer
1582 /// .stream()
1583 /// .expires(std::time::Duration::from_secs(30))
1584 /// .group("A")
1585 /// .min_ack_pending(100)
1586 /// .messages()
1587 /// .await?;
1588 ///
1589 /// while let Some(message) = messages.next().await {
1590 /// let message = message?;
1591 /// println!("message: {:?}", message);
1592 /// message.ack().await?;
1593 /// }
1594 /// # Ok(())
1595 /// # }
1596 /// ```
1597 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1598 self.group = Some(group.into());
1599 self
1600 }
1601
1602 /// Creates actual [Stream] with provided configuration.
1603 ///
1604 /// # Examples
1605 ///
1606 /// ```no_run
1607 /// # #[tokio::main]
1608 /// # async fn main() -> Result<(), async_nats::Error> {
1609 /// use async_nats::jetstream::consumer::PullConsumer;
1610 /// use futures::StreamExt;
1611 /// let client = async_nats::connect("localhost:4222").await?;
1612 /// let jetstream = async_nats::jetstream::new(client);
1613 ///
1614 /// let consumer: PullConsumer = jetstream
1615 /// .get_stream("events")
1616 /// .await?
1617 /// .get_consumer("pull")
1618 /// .await?;
1619 ///
1620 /// let mut messages = consumer
1621 /// .stream()
1622 /// .max_messages_per_batch(100)
1623 /// .messages()
1624 /// .await?;
1625 ///
1626 /// while let Some(message) = messages.next().await {
1627 /// let message = message?;
1628 /// println!("message: {:?}", message);
1629 /// message.ack().await?;
1630 /// }
1631 /// # Ok(())
1632 /// # }
1633 /// ```
1634 pub async fn messages(self) -> Result<Stream, StreamError> {
1635 Stream::stream(
1636 BatchConfig {
1637 batch: self.batch,
1638 expires: Some(self.expires),
1639 no_wait: false,
1640 max_bytes: self.max_bytes,
1641 idle_heartbeat: self.heartbeat,
1642 min_pending: self.min_pending,
1643 group: self.group,
1644 min_ack_pending: self.min_ack_pending,
1645 },
1646 self.consumer,
1647 )
1648 .await
1649 }
1650}
1651
1652/// Used for building configuration for a [Batch] with `fetch()` semantics. Created by a [FetchBuilder] on a [Consumer].
1653///
1654/// # Examples
1655///
1656/// ```no_run
1657/// # #[tokio::main]
1658/// # async fn main() -> Result<(), async_nats::Error> {
1659/// use async_nats::jetstream::consumer::PullConsumer;
1660/// use futures::StreamExt;
1661/// let client = async_nats::connect("localhost:4222").await?;
1662/// let jetstream = async_nats::jetstream::new(client);
1663///
1664/// let consumer: PullConsumer = jetstream
1665/// .get_stream("events")
1666/// .await?
1667/// .get_consumer("pull")
1668/// .await?;
1669///
1670/// let mut messages = consumer
1671/// .fetch()
1672/// .max_messages(100)
1673/// .max_bytes(1024)
1674/// .messages()
1675/// .await?;
1676///
1677/// while let Some(message) = messages.next().await {
1678/// let message = message?;
1679/// println!("message: {:?}", message);
1680/// message.ack().await?;
1681/// }
1682/// # Ok(())
1683/// # }
1684/// ```
1685pub struct FetchBuilder<'a> {
1686 batch: usize,
1687 max_bytes: usize,
1688 heartbeat: Duration,
1689 expires: Option<Duration>,
1690 min_pending: Option<usize>,
1691 min_ack_pending: Option<usize>,
1692 group: Option<String>,
1693 consumer: &'a Consumer<Config>,
1694}
1695
1696impl<'a> FetchBuilder<'a> {
1697 pub fn new(consumer: &'a Consumer<Config>) -> Self {
1698 FetchBuilder {
1699 consumer,
1700 batch: 200,
1701 max_bytes: 0,
1702 expires: None,
1703 min_pending: None,
1704 min_ack_pending: None,
1705 group: None,
1706 heartbeat: Duration::default(),
1707 }
1708 }
1709
1710 /// Sets max bytes that can be buffered on the Client while processing already received
1711 /// messages.
1712 /// Higher values will yield better performance, but also potentially increase memory usage if
1713 /// application is acknowledging messages much slower than they arrive.
1714 ///
1715 /// Default values should provide reasonable balance between performance and memory usage.
1716 ///
1717 /// # Examples
1718 ///
1719 /// ```no_run
1720 /// # #[tokio::main]
1721 /// # async fn main() -> Result<(), async_nats::Error> {
1722 /// use futures::StreamExt;
1723 /// let client = async_nats::connect("localhost:4222").await?;
1724 /// let jetstream = async_nats::jetstream::new(client);
1725 ///
1726 /// let consumer = jetstream
1727 /// .get_stream("events")
1728 /// .await?
1729 /// .get_consumer("pull")
1730 /// .await?;
1731 ///
1732 /// let mut messages = consumer.fetch().max_bytes(1024).messages().await?;
1733 ///
1734 /// while let Some(message) = messages.next().await {
1735 /// let message = message?;
1736 /// println!("message: {:?}", message);
1737 /// message.ack().await?;
1738 /// }
1739 /// # Ok(())
1740 /// # }
1741 /// ```
1742 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
1743 self.max_bytes = max_bytes;
1744 self
1745 }
1746
1747 /// Sets max number of messages that can be buffered on the Client while processing already received
1748 /// messages.
1749 /// Higher values will yield better performance, but also potentially increase memory usage if
1750 /// application is acknowledging messages much slower than they arrive.
1751 ///
1752 /// Default values should provide reasonable balance between performance and memory usage.
1753 ///
1754 /// # Examples
1755 ///
1756 /// ```no_run
1757 /// # #[tokio::main]
1758 /// # async fn main() -> Result<(), async_nats::Error> {
1759 /// use futures::StreamExt;
1760 /// let client = async_nats::connect("localhost:4222").await?;
1761 /// let jetstream = async_nats::jetstream::new(client);
1762 ///
1763 /// let consumer = jetstream
1764 /// .get_stream("events")
1765 /// .await?
1766 /// .get_consumer("pull")
1767 /// .await?;
1768 ///
1769 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
1770 ///
1771 /// while let Some(message) = messages.next().await {
1772 /// let message = message?;
1773 /// println!("message: {:?}", message);
1774 /// message.ack().await?;
1775 /// }
1776 /// # Ok(())
1777 /// # }
1778 /// ```
1779 pub fn max_messages(mut self, batch: usize) -> Self {
1780 self.batch = batch;
1781 self
1782 }
1783
1784 /// Sets heartbeat which will be send by the server if there are no messages for a given
1785 /// [Consumer] pending.
1786 ///
1787 /// # Examples
1788 ///
1789 /// ```no_run
1790 /// # #[tokio::main]
1791 /// # async fn main() -> Result<(), async_nats::Error> {
1792 /// use async_nats::jetstream::consumer::PullConsumer;
1793 /// use futures::StreamExt;
1794 /// let client = async_nats::connect("localhost:4222").await?;
1795 /// let jetstream = async_nats::jetstream::new(client);
1796 ///
1797 /// let consumer = jetstream
1798 /// .get_stream("events")
1799 /// .await?
1800 /// .get_consumer("pull")
1801 /// .await?;
1802 ///
1803 /// let mut messages = consumer
1804 /// .fetch()
1805 /// .heartbeat(std::time::Duration::from_secs(10))
1806 /// .messages()
1807 /// .await?;
1808 ///
1809 /// while let Some(message) = messages.next().await {
1810 /// let message = message?;
1811 /// println!("message: {:?}", message);
1812 /// message.ack().await?;
1813 /// }
1814 /// # Ok(())
1815 /// # }
1816 /// ```
1817 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
1818 self.heartbeat = heartbeat;
1819 self
1820 }
1821
1822 /// Low level API that does not need tweaking for most use cases.
1823 /// Sets how long each batch request waits for whole batch of messages before timing out.
1824 /// [Consumer] pending.
1825 ///
1826 /// # Examples
1827 ///
1828 /// ```no_run
1829 /// # #[tokio::main]
1830 /// # async fn main() -> Result<(), async_nats::Error> {
1831 /// use async_nats::jetstream::consumer::PullConsumer;
1832 /// use futures::StreamExt;
1833 ///
1834 /// let client = async_nats::connect("localhost:4222").await?;
1835 /// let jetstream = async_nats::jetstream::new(client);
1836 ///
1837 /// let consumer: PullConsumer = jetstream
1838 /// .get_stream("events")
1839 /// .await?
1840 /// .get_consumer("pull")
1841 /// .await?;
1842 ///
1843 /// let mut messages = consumer
1844 /// .fetch()
1845 /// .expires(std::time::Duration::from_secs(30))
1846 /// .messages()
1847 /// .await?;
1848 ///
1849 /// while let Some(message) = messages.next().await {
1850 /// let message = message?;
1851 /// println!("message: {:?}", message);
1852 /// message.ack().await?;
1853 /// }
1854 /// # Ok(())
1855 /// # }
1856 /// ```
1857 pub fn expires(mut self, expires: Duration) -> Self {
1858 self.expires = Some(expires);
1859 self
1860 }
1861
1862 /// Sets overflow threshold for minimum pending messages before this stream will start getting
1863 /// messages.
1864 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1865 /// [PriorityPolicy::Overflow] set.
1866 ///
1867 /// # Examples
1868 ///
1869 /// ```no_run
1870 /// # #[tokio::main]
1871 /// # async fn main() -> Result<(), async_nats::Error> {
1872 /// use async_nats::jetstream::consumer::PullConsumer;
1873 /// use futures::StreamExt;
1874 ///
1875 /// let client = async_nats::connect("localhost:4222").await?;
1876 /// let jetstream = async_nats::jetstream::new(client);
1877 ///
1878 /// let consumer: PullConsumer = jetstream
1879 /// .get_stream("events")
1880 /// .await?
1881 /// .get_consumer("pull")
1882 /// .await?;
1883 ///
1884 /// let mut messages = consumer
1885 /// .fetch()
1886 /// .expires(std::time::Duration::from_secs(30))
1887 /// .group("A")
1888 /// .min_pending(100)
1889 /// .messages()
1890 /// .await?;
1891 ///
1892 /// while let Some(message) = messages.next().await {
1893 /// let message = message?;
1894 /// println!("message: {:?}", message);
1895 /// message.ack().await?;
1896 /// }
1897 /// # Ok(())
1898 /// # }
1899 /// ```
1900 pub fn min_pending(mut self, min_pending: usize) -> Self {
1901 self.min_pending = Some(min_pending);
1902 self
1903 }
1904
1905 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
1906 /// messages.
1907 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
1908 /// [PriorityPolicy::Overflow] set.
1909 ///
1910 /// # Examples
1911 ///
1912 /// ```no_run
1913 /// # #[tokio::main]
1914 /// # async fn main() -> Result<(), async_nats::Error> {
1915 /// use async_nats::jetstream::consumer::PullConsumer;
1916 /// use futures::StreamExt;
1917 ///
1918 /// let client = async_nats::connect("localhost:4222").await?;
1919 /// let jetstream = async_nats::jetstream::new(client);
1920 ///
1921 /// let consumer: PullConsumer = jetstream
1922 /// .get_stream("events")
1923 /// .await?
1924 /// .get_consumer("pull")
1925 /// .await?;
1926 ///
1927 /// let mut messages = consumer
1928 /// .fetch()
1929 /// .expires(std::time::Duration::from_secs(30))
1930 /// .group("A")
1931 /// .min_ack_pending(100)
1932 /// .messages()
1933 /// .await?;
1934 ///
1935 /// while let Some(message) = messages.next().await {
1936 /// let message = message?;
1937 /// println!("message: {:?}", message);
1938 /// message.ack().await?;
1939 /// }
1940 /// # Ok(())
1941 /// # }
1942 /// ```
1943 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
1944 self.min_ack_pending = Some(min_ack_pending);
1945 self
1946 }
1947
1948 /// Setting group when using [Consumer] with [PriorityPolicy].
1949 ///
1950 /// # Examples
1951 ///
1952 /// ```no_run
1953 /// # #[tokio::main]
1954 /// # async fn main() -> Result<(), async_nats::Error> {
1955 /// use async_nats::jetstream::consumer::PullConsumer;
1956 /// use futures::StreamExt;
1957 ///
1958 /// let client = async_nats::connect("localhost:4222").await?;
1959 /// let jetstream = async_nats::jetstream::new(client);
1960 ///
1961 /// let consumer: PullConsumer = jetstream
1962 /// .get_stream("events")
1963 /// .await?
1964 /// .get_consumer("pull")
1965 /// .await?;
1966 ///
1967 /// let mut messages = consumer
1968 /// .fetch()
1969 /// .expires(std::time::Duration::from_secs(30))
1970 /// .group("A")
1971 /// .min_ack_pending(100)
1972 /// .messages()
1973 /// .await?;
1974 ///
1975 /// while let Some(message) = messages.next().await {
1976 /// let message = message?;
1977 /// println!("message: {:?}", message);
1978 /// message.ack().await?;
1979 /// }
1980 /// # Ok(())
1981 /// # }
1982 /// ```
1983 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
1984 self.group = Some(group.into());
1985 self
1986 }
1987
1988 /// Creates actual [Stream] with provided configuration.
1989 ///
1990 /// # Examples
1991 ///
1992 /// ```no_run
1993 /// # #[tokio::main]
1994 /// # async fn main() -> Result<(), async_nats::Error> {
1995 /// use async_nats::jetstream::consumer::PullConsumer;
1996 /// use futures::StreamExt;
1997 /// let client = async_nats::connect("localhost:4222").await?;
1998 /// let jetstream = async_nats::jetstream::new(client);
1999 ///
2000 /// let consumer: PullConsumer = jetstream
2001 /// .get_stream("events")
2002 /// .await?
2003 /// .get_consumer("pull")
2004 /// .await?;
2005 ///
2006 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
2007 ///
2008 /// while let Some(message) = messages.next().await {
2009 /// let message = message?;
2010 /// println!("message: {:?}", message);
2011 /// message.ack().await?;
2012 /// }
2013 /// # Ok(())
2014 /// # }
2015 /// ```
2016 pub async fn messages(self) -> Result<Batch, BatchError> {
2017 Batch::batch(
2018 BatchConfig {
2019 batch: self.batch,
2020 expires: self.expires,
2021 no_wait: true,
2022 max_bytes: self.max_bytes,
2023 idle_heartbeat: self.heartbeat,
2024 min_pending: self.min_pending,
2025 min_ack_pending: self.min_ack_pending,
2026 group: self.group,
2027 },
2028 self.consumer,
2029 )
2030 .await
2031 }
2032}
2033
2034/// Used for building configuration for a [Batch]. Created by a [Consumer::batch] on a [Consumer].
2035///
2036/// # Examples
2037///
2038/// ```no_run
2039/// # #[tokio::main]
2040/// # async fn main() -> Result<(), async_nats::Error> {
2041/// use async_nats::jetstream::consumer::PullConsumer;
2042/// use futures::StreamExt;
2043/// let client = async_nats::connect("localhost:4222").await?;
2044/// let jetstream = async_nats::jetstream::new(client);
2045///
2046/// let consumer: PullConsumer = jetstream
2047/// .get_stream("events")
2048/// .await?
2049/// .get_consumer("pull")
2050/// .await?;
2051///
2052/// let mut messages = consumer
2053/// .batch()
2054/// .max_messages(100)
2055/// .max_bytes(1024)
2056/// .messages()
2057/// .await?;
2058///
2059/// while let Some(message) = messages.next().await {
2060/// let message = message?;
2061/// println!("message: {:?}", message);
2062/// message.ack().await?;
2063/// }
2064/// # Ok(())
2065/// # }
2066/// ```
2067pub struct BatchBuilder<'a> {
2068 batch: usize,
2069 max_bytes: usize,
2070 heartbeat: Duration,
2071 expires: Duration,
2072 min_pending: Option<usize>,
2073 min_ack_pending: Option<usize>,
2074 group: Option<String>,
2075 consumer: &'a Consumer<Config>,
2076}
2077
2078impl<'a> BatchBuilder<'a> {
2079 pub fn new(consumer: &'a Consumer<Config>) -> Self {
2080 BatchBuilder {
2081 consumer,
2082 batch: 200,
2083 max_bytes: 0,
2084 expires: Duration::ZERO,
2085 heartbeat: Duration::default(),
2086 min_pending: None,
2087 min_ack_pending: None,
2088 group: None,
2089 }
2090 }
2091
2092 /// Sets max bytes that can be buffered on the Client while processing already received
2093 /// messages.
2094 /// Higher values will yield better performance, but also potentially increase memory usage if
2095 /// application is acknowledging messages much slower than they arrive.
2096 ///
2097 /// Default values should provide reasonable balance between performance and memory usage.
2098 ///
2099 /// # Examples
2100 ///
2101 /// ```no_run
2102 /// # #[tokio::main]
2103 /// # async fn main() -> Result<(), async_nats::Error> {
2104 /// use async_nats::jetstream::consumer::PullConsumer;
2105 /// use futures::StreamExt;
2106 /// let client = async_nats::connect("localhost:4222").await?;
2107 /// let jetstream = async_nats::jetstream::new(client);
2108 ///
2109 /// let consumer: PullConsumer = jetstream
2110 /// .get_stream("events")
2111 /// .await?
2112 /// .get_consumer("pull")
2113 /// .await?;
2114 ///
2115 /// let mut messages = consumer.batch().max_bytes(1024).messages().await?;
2116 ///
2117 /// while let Some(message) = messages.next().await {
2118 /// let message = message?;
2119 /// println!("message: {:?}", message);
2120 /// message.ack().await?;
2121 /// }
2122 /// # Ok(())
2123 /// # }
2124 /// ```
2125 pub fn max_bytes(mut self, max_bytes: usize) -> Self {
2126 self.max_bytes = max_bytes;
2127 self
2128 }
2129
2130 /// Sets max number of messages that can be buffered on the Client while processing already received
2131 /// messages.
2132 /// Higher values will yield better performance, but also potentially increase memory usage if
2133 /// application is acknowledging messages much slower than they arrive.
2134 ///
2135 /// Default values should provide reasonable balance between performance and memory usage.
2136 ///
2137 /// # Examples
2138 ///
2139 /// ```no_run
2140 /// # #[tokio::main]
2141 /// # async fn main() -> Result<(), async_nats::Error> {
2142 /// use async_nats::jetstream::consumer::PullConsumer;
2143 /// use futures::StreamExt;
2144 /// let client = async_nats::connect("localhost:4222").await?;
2145 /// let jetstream = async_nats::jetstream::new(client);
2146 ///
2147 /// let consumer: PullConsumer = jetstream
2148 /// .get_stream("events")
2149 /// .await?
2150 /// .get_consumer("pull")
2151 /// .await?;
2152 ///
2153 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2154 ///
2155 /// while let Some(message) = messages.next().await {
2156 /// let message = message?;
2157 /// println!("message: {:?}", message);
2158 /// message.ack().await?;
2159 /// }
2160 /// # Ok(())
2161 /// # }
2162 /// ```
2163 pub fn max_messages(mut self, batch: usize) -> Self {
2164 self.batch = batch;
2165 self
2166 }
2167
2168 /// Sets heartbeat which will be send by the server if there are no messages for a given
2169 /// [Consumer] pending.
2170 ///
2171 /// # Examples
2172 ///
2173 /// ```no_run
2174 /// # #[tokio::main]
2175 /// # async fn main() -> Result<(), async_nats::Error> {
2176 /// use async_nats::jetstream::consumer::PullConsumer;
2177 /// use futures::StreamExt;
2178 /// let client = async_nats::connect("localhost:4222").await?;
2179 /// let jetstream = async_nats::jetstream::new(client);
2180 ///
2181 /// let consumer: PullConsumer = jetstream
2182 /// .get_stream("events")
2183 /// .await?
2184 /// .get_consumer("pull")
2185 /// .await?;
2186 ///
2187 /// let mut messages = consumer
2188 /// .batch()
2189 /// .heartbeat(std::time::Duration::from_secs(10))
2190 /// .messages()
2191 /// .await?;
2192 ///
2193 /// while let Some(message) = messages.next().await {
2194 /// let message = message?;
2195 /// println!("message: {:?}", message);
2196 /// message.ack().await?;
2197 /// }
2198 /// # Ok(())
2199 /// # }
2200 /// ```
2201 pub fn heartbeat(mut self, heartbeat: Duration) -> Self {
2202 self.heartbeat = heartbeat;
2203 self
2204 }
2205
2206 /// Sets overflow threshold for minimum pending messages before this stream will start getting
2207 /// messages.
2208 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2209 /// [PriorityPolicy::Overflow] set.
2210 ///
2211 /// # Examples
2212 ///
2213 /// ```no_run
2214 /// # #[tokio::main]
2215 /// # async fn main() -> Result<(), async_nats::Error> {
2216 /// use async_nats::jetstream::consumer::PullConsumer;
2217 /// use futures::StreamExt;
2218 ///
2219 /// let client = async_nats::connect("localhost:4222").await?;
2220 /// let jetstream = async_nats::jetstream::new(client);
2221 ///
2222 /// let consumer: PullConsumer = jetstream
2223 /// .get_stream("events")
2224 /// .await?
2225 /// .get_consumer("pull")
2226 /// .await?;
2227 ///
2228 /// let mut messages = consumer
2229 /// .batch()
2230 /// .expires(std::time::Duration::from_secs(30))
2231 /// .group("A")
2232 /// .min_pending(100)
2233 /// .messages()
2234 /// .await?;
2235 ///
2236 /// while let Some(message) = messages.next().await {
2237 /// let message = message?;
2238 /// println!("message: {:?}", message);
2239 /// message.ack().await?;
2240 /// }
2241 /// # Ok(())
2242 /// # }
2243 /// ```
2244 pub fn min_pending(mut self, min_pending: usize) -> Self {
2245 self.min_pending = Some(min_pending);
2246 self
2247 }
2248
2249 /// Sets overflow threshold for minimum pending acknowledgments before this stream will start getting
2250 /// messages.
2251 /// To use overflow, [Consumer] needs to have enabled [Config::priority_groups] and
2252 /// [PriorityPolicy::Overflow] set.
2253 ///
2254 /// # Examples
2255 ///
2256 /// ```no_run
2257 /// # #[tokio::main]
2258 /// # async fn main() -> Result<(), async_nats::Error> {
2259 /// use async_nats::jetstream::consumer::PullConsumer;
2260 /// use futures::StreamExt;
2261 ///
2262 /// let client = async_nats::connect("localhost:4222").await?;
2263 /// let jetstream = async_nats::jetstream::new(client);
2264 ///
2265 /// let consumer: PullConsumer = jetstream
2266 /// .get_stream("events")
2267 /// .await?
2268 /// .get_consumer("pull")
2269 /// .await?;
2270 ///
2271 /// let mut messages = consumer
2272 /// .batch()
2273 /// .expires(std::time::Duration::from_secs(30))
2274 /// .group("A")
2275 /// .min_ack_pending(100)
2276 /// .messages()
2277 /// .await?;
2278 ///
2279 /// while let Some(message) = messages.next().await {
2280 /// let message = message?;
2281 /// println!("message: {:?}", message);
2282 /// message.ack().await?;
2283 /// }
2284 /// # Ok(())
2285 /// # }
2286 /// ```
2287 pub fn min_ack_pending(mut self, min_ack_pending: usize) -> Self {
2288 self.min_ack_pending = Some(min_ack_pending);
2289 self
2290 }
2291
2292 /// Setting group when using [Consumer] with [PriorityPolicy].
2293 ///
2294 /// # Examples
2295 ///
2296 /// ```no_run
2297 /// # #[tokio::main]
2298 /// # async fn main() -> Result<(), async_nats::Error> {
2299 /// use async_nats::jetstream::consumer::PullConsumer;
2300 /// use futures::StreamExt;
2301 ///
2302 /// let client = async_nats::connect("localhost:4222").await?;
2303 /// let jetstream = async_nats::jetstream::new(client);
2304 ///
2305 /// let consumer: PullConsumer = jetstream
2306 /// .get_stream("events")
2307 /// .await?
2308 /// .get_consumer("pull")
2309 /// .await?;
2310 ///
2311 /// let mut messages = consumer
2312 /// .batch()
2313 /// .expires(std::time::Duration::from_secs(30))
2314 /// .group("A")
2315 /// .min_ack_pending(100)
2316 /// .messages()
2317 /// .await?;
2318 ///
2319 /// while let Some(message) = messages.next().await {
2320 /// let message = message?;
2321 /// println!("message: {:?}", message);
2322 /// message.ack().await?;
2323 /// }
2324 /// # Ok(())
2325 /// # }
2326 /// ```
2327 pub fn group<T: Into<String>>(mut self, group: T) -> Self {
2328 self.group = Some(group.into());
2329 self
2330 }
2331
2332 /// Low level API that does not need tweaking for most use cases.
2333 /// Sets how long each batch request waits for whole batch of messages before timing out.
2334 /// [Consumer] pending.
2335 ///
2336 /// # Examples
2337 ///
2338 /// ```no_run
2339 /// # #[tokio::main]
2340 /// # async fn main() -> Result<(), async_nats::Error> {
2341 /// use async_nats::jetstream::consumer::PullConsumer;
2342 /// use futures::StreamExt;
2343 /// let client = async_nats::connect("localhost:4222").await?;
2344 /// let jetstream = async_nats::jetstream::new(client);
2345 ///
2346 /// let consumer: PullConsumer = jetstream
2347 /// .get_stream("events")
2348 /// .await?
2349 /// .get_consumer("pull")
2350 /// .await?;
2351 ///
2352 /// let mut messages = consumer
2353 /// .batch()
2354 /// .expires(std::time::Duration::from_secs(30))
2355 /// .messages()
2356 /// .await?;
2357 ///
2358 /// while let Some(message) = messages.next().await {
2359 /// let message = message?;
2360 /// println!("message: {:?}", message);
2361 /// message.ack().await?;
2362 /// }
2363 /// # Ok(())
2364 /// # }
2365 /// ```
2366 pub fn expires(mut self, expires: Duration) -> Self {
2367 self.expires = expires;
2368 self
2369 }
2370
2371 /// Creates actual [Stream] with provided configuration.
2372 ///
2373 /// # Examples
2374 ///
2375 /// ```no_run
2376 /// # #[tokio::main]
2377 /// # async fn main() -> Result<(), async_nats::Error> {
2378 /// use async_nats::jetstream::consumer::PullConsumer;
2379 /// use futures::StreamExt;
2380 /// let client = async_nats::connect("localhost:4222").await?;
2381 /// let jetstream = async_nats::jetstream::new(client);
2382 ///
2383 /// let consumer: PullConsumer = jetstream
2384 /// .get_stream("events")
2385 /// .await?
2386 /// .get_consumer("pull")
2387 /// .await?;
2388 ///
2389 /// let mut messages = consumer.batch().max_messages(100).messages().await?;
2390 ///
2391 /// while let Some(message) = messages.next().await {
2392 /// let message = message?;
2393 /// println!("message: {:?}", message);
2394 /// message.ack().await?;
2395 /// }
2396 /// # Ok(())
2397 /// # }
2398 /// ```
2399 pub async fn messages(self) -> Result<Batch, BatchError> {
2400 let config = BatchConfig {
2401 batch: self.batch,
2402 expires: Some(self.expires),
2403 no_wait: false,
2404 max_bytes: self.max_bytes,
2405 idle_heartbeat: self.heartbeat,
2406 min_pending: self.min_pending,
2407 min_ack_pending: self.min_ack_pending,
2408 group: self.group,
2409 };
2410 Batch::batch(config, self.consumer).await
2411 }
2412}
2413
2414/// Used for next Pull Request for Pull Consumer
2415#[derive(Debug, Default, Serialize, Clone, PartialEq, Eq)]
2416pub struct BatchConfig {
2417 /// The number of messages that are being requested to be delivered.
2418 pub batch: usize,
2419 /// The optional number of nanoseconds that the server will store this next request for
2420 /// before forgetting about the pending batch size.
2421 #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
2422 pub expires: Option<Duration>,
2423 /// This optionally causes the server not to store this pending request at all, but when there are no
2424 /// messages to deliver will send a nil bytes message with a Status header of 404, this way you
2425 /// can know when you reached the end of the stream for example. A 409 is returned if the
2426 /// Consumer has reached MaxAckPending limits.
2427 #[serde(skip_serializing_if = "is_default")]
2428 pub no_wait: bool,
2429
2430 /// Sets max number of bytes in total in given batch size. This works together with `batch`.
2431 /// Whichever value is reached first, batch will complete.
2432 pub max_bytes: usize,
2433
2434 /// Setting this other than zero will cause the server to send 100 Idle Heartbeat status to the
2435 /// client
2436 #[serde(with = "serde_nanos", skip_serializing_if = "is_default")]
2437 pub idle_heartbeat: Duration,
2438
2439 pub min_pending: Option<usize>,
2440 pub min_ack_pending: Option<usize>,
2441 pub group: Option<String>,
2442}
2443
2444fn is_default<T: Default + Eq>(t: &T) -> bool {
2445 t == &T::default()
2446}
2447
2448#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2449pub struct Config {
2450 /// Setting `durable_name` to `Some(...)` will cause this consumer
2451 /// to be "durable". This may be a good choice for workloads that
2452 /// benefit from the `JetStream` server or cluster remembering the
2453 /// progress of consumers for fault tolerance purposes. If a consumer
2454 /// crashes, the `JetStream` server or cluster will remember which
2455 /// messages the consumer acknowledged. When the consumer recovers,
2456 /// this information will allow the consumer to resume processing
2457 /// where it left off. If you're unsure, set this to `Some(...)`.
2458 ///
2459 /// Setting `durable_name` to `None` will cause this consumer to
2460 /// be "ephemeral". This may be a good choice for workloads where
2461 /// you don't need the `JetStream` server to remember the consumer's
2462 /// progress in the case of a crash, such as certain "high churn"
2463 /// workloads or workloads where a crashed instance is not required
2464 /// to recover.
2465 #[serde(default, skip_serializing_if = "Option::is_none")]
2466 pub durable_name: Option<String>,
2467 /// A name of the consumer. Can be specified for both durable and ephemeral
2468 /// consumers.
2469 #[serde(default, skip_serializing_if = "Option::is_none")]
2470 pub name: Option<String>,
2471 /// A short description of the purpose of this consumer.
2472 #[serde(default, skip_serializing_if = "Option::is_none")]
2473 pub description: Option<String>,
2474 /// Allows for a variety of options that determine how this consumer will receive messages
2475 #[serde(flatten)]
2476 pub deliver_policy: DeliverPolicy,
2477 /// How messages should be acknowledged
2478 pub ack_policy: AckPolicy,
2479 /// How long to allow messages to remain un-acknowledged before attempting redelivery
2480 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2481 pub ack_wait: Duration,
2482 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
2483 #[serde(default, skip_serializing_if = "is_default")]
2484 pub max_deliver: i64,
2485 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
2486 #[serde(default, skip_serializing_if = "is_default")]
2487 pub filter_subject: String,
2488 #[cfg(feature = "server_2_10")]
2489 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
2490 #[serde(default, skip_serializing_if = "is_default")]
2491 pub filter_subjects: Vec<String>,
2492 /// Whether messages are sent as quickly as possible or at the rate of receipt
2493 pub replay_policy: ReplayPolicy,
2494 /// The rate of message delivery in bits per second
2495 #[serde(default, skip_serializing_if = "is_default")]
2496 pub rate_limit: u64,
2497 /// What percentage of acknowledgments should be samples for observability, 0-100
2498 #[serde(
2499 rename = "sample_freq",
2500 with = "super::sample_freq_deser",
2501 default,
2502 skip_serializing_if = "is_default"
2503 )]
2504 pub sample_frequency: u8,
2505 /// The maximum number of waiting consumers.
2506 #[serde(default, skip_serializing_if = "is_default")]
2507 pub max_waiting: i64,
2508 /// The maximum number of unacknowledged messages that may be
2509 /// in-flight before pausing sending additional messages to
2510 /// this consumer.
2511 #[serde(default, skip_serializing_if = "is_default")]
2512 pub max_ack_pending: i64,
2513 /// Only deliver headers without payloads.
2514 #[serde(default, skip_serializing_if = "is_default")]
2515 pub headers_only: bool,
2516 /// Maximum size of a request batch
2517 #[serde(default, skip_serializing_if = "is_default")]
2518 pub max_batch: i64,
2519 /// Maximum value of request max_bytes
2520 #[serde(default, skip_serializing_if = "is_default")]
2521 pub max_bytes: i64,
2522 /// Maximum value for request expiration
2523 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2524 pub max_expires: Duration,
2525 /// Threshold for consumer inactivity
2526 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
2527 pub inactive_threshold: Duration,
2528 /// Number of consumer replicas
2529 #[serde(default, skip_serializing_if = "is_default")]
2530 pub num_replicas: usize,
2531 /// Force consumer to use memory storage.
2532 #[serde(default, skip_serializing_if = "is_default")]
2533 pub memory_storage: bool,
2534 #[cfg(feature = "server_2_10")]
2535 // Additional consumer metadata.
2536 #[serde(default, skip_serializing_if = "is_default")]
2537 pub metadata: HashMap<String, String>,
2538 /// Custom backoff for missed acknowledgments.
2539 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
2540 pub backoff: Vec<Duration>,
2541
2542 /// Priority policy for this consumer. Requires [Config::priority_groups] to be set.
2543 #[cfg(feature = "server_2_11")]
2544 #[serde(default, skip_serializing_if = "is_default")]
2545 pub priority_policy: PriorityPolicy,
2546 /// Priority groups for this consumer. Currently only one group is supported and is used
2547 /// in conjunction with [Config::priority_policy].
2548 #[cfg(feature = "server_2_11")]
2549 #[serde(default, skip_serializing_if = "is_default")]
2550 pub priority_groups: Vec<String>,
2551 /// For suspending the consumer until the deadline.
2552 #[cfg(feature = "server_2_11")]
2553 #[serde(
2554 default,
2555 with = "rfc3339::option",
2556 skip_serializing_if = "Option::is_none"
2557 )]
2558 pub pause_until: Option<OffsetDateTime>,
2559}
2560
2561impl IntoConsumerConfig for &Config {
2562 fn into_consumer_config(self) -> consumer::Config {
2563 self.clone().into_consumer_config()
2564 }
2565}
2566
2567impl IntoConsumerConfig for Config {
2568 fn into_consumer_config(self) -> consumer::Config {
2569 jetstream::consumer::Config {
2570 deliver_subject: None,
2571 name: self.name,
2572 durable_name: self.durable_name,
2573 description: self.description,
2574 deliver_group: None,
2575 deliver_policy: self.deliver_policy,
2576 ack_policy: self.ack_policy,
2577 ack_wait: self.ack_wait,
2578 max_deliver: self.max_deliver,
2579 filter_subject: self.filter_subject,
2580 #[cfg(feature = "server_2_10")]
2581 filter_subjects: self.filter_subjects,
2582 replay_policy: self.replay_policy,
2583 rate_limit: self.rate_limit,
2584 sample_frequency: self.sample_frequency,
2585 max_waiting: self.max_waiting,
2586 max_ack_pending: self.max_ack_pending,
2587 headers_only: self.headers_only,
2588 flow_control: false,
2589 idle_heartbeat: Duration::default(),
2590 max_batch: self.max_batch,
2591 max_bytes: self.max_bytes,
2592 max_expires: self.max_expires,
2593 inactive_threshold: self.inactive_threshold,
2594 num_replicas: self.num_replicas,
2595 memory_storage: self.memory_storage,
2596 #[cfg(feature = "server_2_10")]
2597 metadata: self.metadata,
2598 backoff: self.backoff,
2599 #[cfg(feature = "server_2_11")]
2600 priority_policy: self.priority_policy,
2601 #[cfg(feature = "server_2_11")]
2602 priority_groups: self.priority_groups,
2603 #[cfg(feature = "server_2_11")]
2604 pause_until: self.pause_until,
2605 }
2606 }
2607}
2608impl FromConsumer for Config {
2609 fn try_from_consumer_config(config: consumer::Config) -> Result<Self, crate::Error> {
2610 if config.deliver_subject.is_some() {
2611 return Err(Box::new(std::io::Error::other(
2612 "pull consumer cannot have delivery subject",
2613 )));
2614 }
2615 Ok(Config {
2616 durable_name: config.durable_name,
2617 name: config.name,
2618 description: config.description,
2619 deliver_policy: config.deliver_policy,
2620 ack_policy: config.ack_policy,
2621 ack_wait: config.ack_wait,
2622 max_deliver: config.max_deliver,
2623 filter_subject: config.filter_subject,
2624 #[cfg(feature = "server_2_10")]
2625 filter_subjects: config.filter_subjects,
2626 replay_policy: config.replay_policy,
2627 rate_limit: config.rate_limit,
2628 sample_frequency: config.sample_frequency,
2629 max_waiting: config.max_waiting,
2630 max_ack_pending: config.max_ack_pending,
2631 headers_only: config.headers_only,
2632 max_batch: config.max_batch,
2633 max_bytes: config.max_bytes,
2634 max_expires: config.max_expires,
2635 inactive_threshold: config.inactive_threshold,
2636 num_replicas: config.num_replicas,
2637 memory_storage: config.memory_storage,
2638 #[cfg(feature = "server_2_10")]
2639 metadata: config.metadata,
2640 backoff: config.backoff,
2641 #[cfg(feature = "server_2_11")]
2642 priority_policy: config.priority_policy,
2643 #[cfg(feature = "server_2_11")]
2644 priority_groups: config.priority_groups,
2645 #[cfg(feature = "server_2_11")]
2646 pause_until: config.pause_until,
2647 })
2648 }
2649}
2650
2651#[derive(Clone, Copy, Debug, PartialEq)]
2652pub enum BatchRequestErrorKind {
2653 Publish,
2654 Flush,
2655 Serialize,
2656}
2657
2658impl std::fmt::Display for BatchRequestErrorKind {
2659 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2660 match self {
2661 Self::Publish => write!(f, "publish failed"),
2662 Self::Flush => write!(f, "flush failed"),
2663 Self::Serialize => write!(f, "serialize failed"),
2664 }
2665 }
2666}
2667
2668pub type BatchRequestError = Error<BatchRequestErrorKind>;
2669
2670#[derive(Clone, Copy, Debug, PartialEq)]
2671pub enum BatchErrorKind {
2672 Subscribe,
2673 Pull,
2674 Flush,
2675 Serialize,
2676}
2677
2678impl std::fmt::Display for BatchErrorKind {
2679 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2680 match self {
2681 Self::Pull => write!(f, "pull request failed"),
2682 Self::Flush => write!(f, "flush failed"),
2683 Self::Serialize => write!(f, "serialize failed"),
2684 Self::Subscribe => write!(f, "subscribe failed"),
2685 }
2686 }
2687}
2688
2689pub type BatchError = Error<BatchErrorKind>;
2690
2691impl From<SubscribeError> for BatchError {
2692 fn from(err: SubscribeError) -> Self {
2693 BatchError::with_source(BatchErrorKind::Subscribe, err)
2694 }
2695}
2696
2697impl From<BatchRequestError> for BatchError {
2698 fn from(err: BatchRequestError) -> Self {
2699 BatchError::with_source(BatchErrorKind::Pull, err)
2700 }
2701}
2702
2703#[derive(Clone, Copy, Debug, PartialEq)]
2704pub enum ConsumerRecreateErrorKind {
2705 GetStream,
2706 Recreate,
2707 TimedOut,
2708}
2709
2710impl std::fmt::Display for ConsumerRecreateErrorKind {
2711 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2712 match self {
2713 Self::GetStream => write!(f, "error getting stream"),
2714 Self::Recreate => write!(f, "consumer creation failed"),
2715 Self::TimedOut => write!(f, "timed out"),
2716 }
2717 }
2718}
2719
2720pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
2721
2722async fn recreate_consumer_stream(
2723 context: &Context,
2724 config: &OrderedConfig,
2725 stream_name: &str,
2726 consumer_name: &str,
2727 sequence: u64,
2728) -> Result<Stream, ConsumerRecreateError> {
2729 let span = tracing::span!(
2730 tracing::Level::DEBUG,
2731 "recreate_ordered_consumer",
2732 stream_name = stream_name,
2733 consumer_name = consumer_name,
2734 sequence = sequence
2735 );
2736 let _span_handle = span.enter();
2737 let config = config.to_owned();
2738 trace!("delete old consumer before creating new one");
2739
2740 tokio::time::timeout(
2741 Duration::from_secs(5),
2742 context.delete_consumer_from_stream(consumer_name, stream_name),
2743 )
2744 .await
2745 .ok();
2746
2747 let deliver_policy = {
2748 if sequence == 0 {
2749 DeliverPolicy::All
2750 } else {
2751 DeliverPolicy::ByStartSequence {
2752 start_sequence: sequence + 1,
2753 }
2754 }
2755 };
2756 trace!("create the new ordered consumer for sequence {}", sequence);
2757 let consumer = tokio::time::timeout(
2758 Duration::from_secs(5),
2759 context.create_consumer_on_stream(
2760 jetstream::consumer::pull::OrderedConfig {
2761 deliver_policy,
2762 ..config.clone()
2763 },
2764 stream_name,
2765 ),
2766 )
2767 .await
2768 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2769 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
2770
2771 let config = Consumer {
2772 config: config.clone().into(),
2773 context: context.clone(),
2774 info: consumer.info,
2775 };
2776
2777 trace!("create iterator");
2778 let stream = tokio::time::timeout(
2779 Duration::from_secs(5),
2780 Stream::stream(
2781 BatchConfig {
2782 batch: 500,
2783 expires: Some(Duration::from_secs(30)),
2784 no_wait: false,
2785 max_bytes: 0,
2786 idle_heartbeat: Duration::from_secs(15),
2787 min_pending: None,
2788 min_ack_pending: None,
2789 group: None,
2790 },
2791 &config,
2792 ),
2793 )
2794 .await
2795 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
2796 .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err));
2797 trace!("recreated consumer");
2798 stream
2799}