Skip to main content

async_nats/jetstream/consumer/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26#[cfg(feature = "server_2_11")]
27use time::OffsetDateTime;
28
29use super::context::{ConsumerInfoError, RequestError};
30use super::response::Response;
31use super::stream::ClusterInfo;
32use super::Context;
33use crate::error::Error;
34use crate::jetstream::consumer;
35
36pub trait IntoConsumerConfig {
37    fn into_consumer_config(self) -> Config;
38}
39
40#[allow(dead_code)]
41#[derive(Clone, Debug)]
42pub struct Consumer<T: IntoConsumerConfig> {
43    pub(crate) context: Context,
44    pub(crate) config: T,
45    pub(crate) info: Info,
46}
47
48impl<T: IntoConsumerConfig> Consumer<T> {
49    pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
50        Self {
51            config,
52            info,
53            context,
54        }
55    }
56}
57impl<T: IntoConsumerConfig> Consumer<T> {
58    /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
59    /// [Consumer] and returns it.
60    ///
61    /// When possible, use [`Message::info()`][crate::jetstream::Message::info] instead.
62    /// message metadata often already contains the needed information and does not require a server call.
63    ///
64    /// # Examples
65    ///
66    /// ```no_run
67    /// # #[tokio::main]
68    /// # async fn main() -> Result<(), async_nats::Error> {
69    /// use async_nats::jetstream::consumer::PullConsumer;
70    /// let client = async_nats::connect("localhost:4222").await?;
71    /// let jetstream = async_nats::jetstream::new(client);
72    ///
73    /// let mut consumer: PullConsumer = jetstream
74    ///     .get_stream("events")
75    ///     .await?
76    ///     .get_consumer("pull")
77    ///     .await?;
78    ///
79    /// let info = consumer.info().await?;
80    /// # Ok(())
81    /// # }
82    /// ```
83    pub async fn info(&mut self) -> Result<&consumer::Info, ConsumerInfoError> {
84        let info = self.get_info().await?;
85        self.info = info;
86        Ok(&self.info)
87    }
88
89    /// Retrieves `info` about [Consumer] from the server. Does not update the cache.
90    ///
91    /// When possible, use [`Message::info()`][crate::jetstream::Message::info] instead.
92    /// message metadata often already contains the needed information and does not require a server call.
93    ///
94    /// # Examples
95    ///
96    /// ```no_run
97    /// # #[tokio::main]
98    /// # async fn main() -> Result<(), async_nats::Error> {
99    /// use async_nats::jetstream::consumer::PullConsumer;
100    /// let client = async_nats::connect("localhost:4222").await?;
101    /// let jetstream = async_nats::jetstream::new(client);
102    ///
103    /// let mut consumer: PullConsumer = jetstream
104    ///     .get_stream("events")
105    ///     .await?
106    ///     .get_consumer("pull")
107    ///     .await?;
108    ///
109    /// let info = consumer.get_info().await?;
110    /// # Ok(())
111    /// # }
112    /// ```
113    pub async fn get_info(&self) -> Result<consumer::Info, ConsumerInfoError> {
114        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
115        match self.context.request(subject, &json!({})).await? {
116            Response::Err { error } => Err(error.into()),
117            Response::Ok(info) => Ok(info),
118        }
119    }
120
121    /// Returns cached [Info] for the [Consumer].
122    /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
123    /// [Info].
124    ///
125    /// # Examples
126    ///
127    /// ```no_run
128    /// # #[tokio::main]
129    /// # async fn main() -> Result<(), async_nats::Error> {
130    /// use async_nats::jetstream::consumer::PullConsumer;
131    /// let client = async_nats::connect("localhost:4222").await?;
132    /// let jetstream = async_nats::jetstream::new(client);
133    ///
134    /// let consumer: PullConsumer = jetstream
135    ///     .get_stream("events")
136    ///     .await?
137    ///     .get_consumer("pull")
138    ///     .await?;
139    ///
140    /// let info = consumer.cached_info();
141    /// # Ok(())
142    /// # }
143    /// ```
144    pub fn cached_info(&self) -> &consumer::Info {
145        &self.info
146    }
147}
148
149/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
150/// [Pull][crate::jetstream::consumer::pull::Config] or
151/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
152/// a valid target one.
153pub trait FromConsumer {
154    fn try_from_consumer_config(
155        config: crate::jetstream::consumer::Config,
156    ) -> Result<Self, crate::Error>
157    where
158        Self: Sized;
159}
160
161pub type PullConsumer = Consumer<self::pull::Config>;
162pub type PushConsumer = Consumer<self::push::Config>;
163pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
164pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
165
166/// Information about a consumer
167#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
168pub struct Info {
169    /// The stream being consumed
170    pub stream_name: String,
171    /// The consumer's unique name
172    pub name: String,
173    /// The time the consumer was created
174    #[serde(with = "rfc3339")]
175    pub created: time::OffsetDateTime,
176    /// The consumer's configuration
177    pub config: Config,
178    /// Statistics for delivered messages
179    pub delivered: SequenceInfo,
180    /// Statistics for acknowledged messages
181    pub ack_floor: SequenceInfo,
182    /// The number of messages delivered but not yet acknowledged
183    pub num_ack_pending: usize,
184    /// The number of messages re-sent after acknowledgment was not received within the configured
185    /// time threshold
186    pub num_redelivered: usize,
187    /// The number of pull requests waiting for messages
188    pub num_waiting: usize,
189    /// The number of messages pending delivery
190    pub num_pending: u64,
191    /// Information about the consumer's cluster
192    #[serde(skip_serializing_if = "is_default")]
193    pub cluster: Option<ClusterInfo>,
194    /// Indicates if any client is connected and receiving messages from a push consumer
195    #[serde(default, skip_serializing_if = "is_default")]
196    pub push_bound: bool,
197    #[cfg(feature = "server_2_11")]
198    /// Indicates if the consumer is paused
199    #[serde(default)]
200    pub paused: bool,
201    #[cfg(feature = "server_2_11")]
202    /// The remaining time the consumer is paused
203    #[serde(default, with = "serde_nanos")]
204    pub pause_remaining: Option<Duration>,
205}
206
207/// Information about a consumer and the stream it is consuming
208#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
209pub struct SequenceInfo {
210    /// How far along the consumer has progressed
211    #[serde(rename = "consumer_seq")]
212    pub consumer_sequence: u64,
213    /// The aggregate for all stream consumers
214    #[serde(rename = "stream_seq")]
215    pub stream_sequence: u64,
216    // Last activity for the sequence
217    #[serde(
218        default,
219        with = "rfc3339::option",
220        skip_serializing_if = "Option::is_none"
221    )]
222    pub last_active: Option<time::OffsetDateTime>,
223}
224
225/// Configuration for consumers. From a high level, the
226/// `durable_name` and `deliver_subject` fields have a particularly
227/// strong influence on the consumer's overall behavior.
228#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
229pub struct Config {
230    /// Setting `deliver_subject` to `Some(...)` will cause this consumer
231    /// to be "push-based". This is analogous in some ways to a normal
232    /// NATS subscription (rather than a queue subscriber) in that the
233    /// consumer will receive all messages published to the stream that
234    /// the consumer is interested in. Acknowledgment policies such as
235    /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
236    /// push-based consumers, which reduce the amount of effort spent
237    /// tracking delivery. Combining `AckPolicy::All` with
238    /// `Consumer::process_batch` enables particularly nice throughput
239    /// optimizations.
240    ///
241    /// Setting `deliver_subject` to `None` will cause this consumer to
242    /// be "pull-based", and will require explicit acknowledgment of
243    /// each message. This is analogous in some ways to a normal NATS
244    /// queue subscriber, where a message will be delivered to a single
245    /// subscriber. Pull-based consumers are intended to be used for
246    /// workloads where it is desirable to have a single process receive
247    /// a message. The only valid `ack_policy` for pull-based consumers
248    /// is the default of `AckPolicy::Explicit`, which acknowledges each
249    /// processed message individually. Pull-based consumers may be a
250    /// good choice for work queue-like workloads where you want messages
251    /// to be handled by a single consumer process. Note that it is
252    /// possible to deliver a message to multiple consumers if the
253    /// consumer crashes or is slow to acknowledge the delivered message.
254    /// This is a fundamental behavior present in all distributed systems
255    /// that attempt redelivery when a consumer fails to acknowledge a message.
256    /// This is known as "at least once" message processing. To achieve
257    /// "exactly once" semantics, it is necessary to implement idempotent
258    /// semantics in any system that is written to as a result of processing
259    /// a message.
260    #[serde(default, skip_serializing_if = "Option::is_none")]
261    pub deliver_subject: Option<String>,
262
263    /// Setting `durable_name` to `Some(...)` will cause this consumer
264    /// to be "durable". This may be a good choice for workloads that
265    /// benefit from the `JetStream` server or cluster remembering the
266    /// progress of consumers for fault tolerance purposes. If a consumer
267    /// crashes, the `JetStream` server or cluster will remember which
268    /// messages the consumer acknowledged. When the consumer recovers,
269    /// this information will allow the consumer to resume processing
270    /// where it left off. If you're unsure, set this to `Some(...)`.
271    ///
272    /// Setting `durable_name` to `None` will cause this consumer to
273    /// be "ephemeral". This may be a good choice for workloads where
274    /// you don't need the `JetStream` server to remember the consumer's
275    /// progress in the case of a crash, such as certain "high churn"
276    /// workloads or workloads where a crashed instance is not required
277    /// to recover.
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    pub durable_name: Option<String>,
280    /// A name of the consumer. Can be specified for both durable and ephemeral
281    /// consumers.
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    pub name: Option<String>,
284    /// A short description of the purpose of this consumer.
285    #[serde(default, skip_serializing_if = "Option::is_none")]
286    pub description: Option<String>,
287    /// Deliver group to use.
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    pub deliver_group: Option<String>,
290    /// Allows for a variety of options that determine how this consumer will receive messages
291    #[serde(flatten)]
292    pub deliver_policy: DeliverPolicy,
293    /// How messages should be acknowledged
294    #[serde(default)]
295    pub ack_policy: AckPolicy,
296    /// How long to allow messages to remain un-acknowledged before attempting redelivery
297    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
298    pub ack_wait: Duration,
299    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
300    #[serde(default, skip_serializing_if = "is_default")]
301    pub max_deliver: i64,
302    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
303    #[serde(default, skip_serializing_if = "is_default")]
304    pub filter_subject: String,
305    #[cfg(feature = "server_2_10")]
306    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
307    #[serde(default, skip_serializing_if = "is_default")]
308    pub filter_subjects: Vec<String>,
309    /// Whether messages are sent as quickly as possible or at the rate of receipt
310    #[serde(default)]
311    pub replay_policy: ReplayPolicy,
312    /// The rate of message delivery in bits per second
313    #[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
314    pub rate_limit: u64,
315    /// What percentage of acknowledgments should be samples for observability, 0-100
316    #[serde(
317        rename = "sample_freq",
318        with = "sample_freq_deser",
319        default,
320        skip_serializing_if = "is_default"
321    )]
322    pub sample_frequency: u8,
323    /// The maximum number of waiting consumers.
324    #[serde(default, skip_serializing_if = "is_default")]
325    pub max_waiting: i64,
326    /// The maximum number of unacknowledged messages that may be
327    /// in-flight before pausing sending additional messages to
328    /// this consumer.
329    #[serde(default, skip_serializing_if = "is_default")]
330    pub max_ack_pending: i64,
331    /// Only deliver headers without payloads.
332    #[serde(default, skip_serializing_if = "is_default")]
333    pub headers_only: bool,
334    /// Enable flow control messages
335    #[serde(default, skip_serializing_if = "is_default")]
336    pub flow_control: bool,
337    /// Enable idle heartbeat messages
338    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
339    pub idle_heartbeat: Duration,
340    /// Maximum size of a request batch
341    #[serde(default, skip_serializing_if = "is_default")]
342    pub max_batch: i64,
343    /// Maximum size of a request max_bytes
344    #[serde(default, skip_serializing_if = "is_default")]
345    pub max_bytes: i64,
346    /// Maximum value for request expiration
347    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
348    pub max_expires: Duration,
349    /// Threshold for ephemeral consumer inactivity
350    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
351    pub inactive_threshold: Duration,
352    /// Number of consumer replicas
353    #[serde(default, skip_serializing_if = "is_default")]
354    pub num_replicas: usize,
355    /// Force consumer to use memory storage.
356    #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
357    pub memory_storage: bool,
358
359    #[cfg(feature = "server_2_10")]
360    /// Additional consumer metadata.
361    #[serde(default, skip_serializing_if = "is_default")]
362    pub metadata: HashMap<String, String>,
363    /// Custom backoff for missed acknowledgments.
364    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
365    pub backoff: Vec<Duration>,
366    #[cfg(feature = "server_2_11")]
367    #[serde(default, skip_serializing_if = "is_default")]
368    pub priority_policy: PriorityPolicy,
369    #[cfg(feature = "server_2_11")]
370    #[serde(default, skip_serializing_if = "is_default")]
371    pub priority_groups: Vec<String>,
372    /// For suspending the consumer until the deadline.
373    #[cfg(feature = "server_2_11")]
374    #[serde(
375        default,
376        with = "rfc3339::option",
377        skip_serializing_if = "Option::is_none"
378    )]
379    pub pause_until: Option<OffsetDateTime>,
380}
381
382#[cfg(feature = "server_2_11")]
383#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
384pub enum PriorityPolicy {
385    #[serde(rename = "overflow")]
386    Overflow,
387    /// This feature is not yet supported by the client.
388    /// It's part of the enum to ensure that the client can deserialize
389    /// Consumer configurations that used [PriorityPolicy::PinnedClient].
390    #[serde(rename = "pinned_client")]
391    PinnedClient,
392    #[cfg(feature = "server_2_12")]
393    #[serde(rename = "prioritized")]
394    Prioritized,
395    #[serde(rename = "none")]
396    #[default]
397    None,
398}
399
400impl From<&Config> for Config {
401    fn from(cc: &Config) -> Config {
402        cc.clone()
403    }
404}
405
406impl From<&str> for Config {
407    fn from(s: &str) -> Config {
408        Config {
409            durable_name: Some(s.to_string()),
410            ..Default::default()
411        }
412    }
413}
414
415impl IntoConsumerConfig for Config {
416    fn into_consumer_config(self) -> Config {
417        self
418    }
419}
420impl IntoConsumerConfig for &Config {
421    fn into_consumer_config(self) -> Config {
422        self.clone()
423    }
424}
425
426impl FromConsumer for Config {
427    fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
428    where
429        Self: Sized,
430    {
431        Ok(config)
432    }
433}
434
435/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
436#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
437#[repr(u8)]
438#[serde(tag = "deliver_policy")]
439pub enum DeliverPolicy {
440    /// All causes the consumer to receive the oldest messages still present in the system.
441    /// This is the default.
442    #[default]
443    #[serde(rename = "all")]
444    All,
445    /// Last will start the consumer with the last sequence received.
446    #[serde(rename = "last")]
447    Last,
448    /// New will only deliver new messages that are received by the `JetStream` server
449    /// after the consumer is created.
450    #[serde(rename = "new")]
451    New,
452    /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
453    /// parameter.
454    #[serde(rename = "by_start_sequence")]
455    ByStartSequence {
456        #[serde(rename = "opt_start_seq")]
457        start_sequence: u64,
458    },
459    /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
460    /// configured `opt_start_time` parameter.
461    #[serde(rename = "by_start_time")]
462    ByStartTime {
463        #[serde(rename = "opt_start_time", with = "rfc3339")]
464        start_time: time::OffsetDateTime,
465    },
466    /// `LastPerSubject` will start the consumer with the last message
467    /// for all subjects received.
468    #[serde(rename = "last_per_subject")]
469    LastPerSubject,
470}
471
472/// Determines whether messages will be acknowledged individually,
473/// in batches, or never.
474#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
475#[repr(u8)]
476pub enum AckPolicy {
477    /// All messages will be individually acknowledged. This is the default.
478    #[default]
479    #[serde(rename = "explicit")]
480    Explicit = 2,
481    /// No messages are acknowledged.
482    #[serde(rename = "none")]
483    None = 0,
484    /// Acknowledges all messages with lower sequence numbers when a later
485    /// message is acknowledged. Useful for "batching" acknowledgment.
486    #[serde(rename = "all")]
487    All = 1,
488}
489
490/// `ReplayPolicy` controls whether messages are sent to a consumer
491/// as quickly as possible or at the rate that they were originally received at.
492#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
493#[repr(u8)]
494pub enum ReplayPolicy {
495    /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
496    #[default]
497    #[serde(rename = "instant")]
498    Instant = 0,
499    /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
500    /// is useful for replaying traffic in a testing or staging environment based on production
501    /// traffic patterns.
502    #[serde(rename = "original")]
503    Original = 1,
504}
505
506fn is_default<T: Default + Eq>(t: &T) -> bool {
507    t == &T::default()
508}
509
510pub(crate) mod sample_freq_deser {
511    pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
512    where
513        T: std::str::FromStr,
514        T::Err: std::fmt::Display,
515        D: serde::Deserializer<'de>,
516    {
517        let s = <String as serde::Deserialize>::deserialize(deserializer)?;
518
519        let mut spliterator = s.split('%');
520        match (spliterator.next(), spliterator.next()) {
521            // No percentage occurred, parse as number
522            (Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
523            // A percentage sign occurred right at the end
524            (Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
525            _ => Err(serde::de::Error::custom(format!(
526                "Malformed sample frequency: {s}"
527            ))),
528        }
529    }
530
531    pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
532    where
533        T: std::fmt::Display,
534        S: serde::Serializer,
535    {
536        serializer.serialize_str(&value.to_string())
537    }
538}
539
540#[derive(Clone, Copy, Debug, PartialEq)]
541pub enum StreamErrorKind {
542    TimedOut,
543    Other,
544}
545
546impl std::fmt::Display for StreamErrorKind {
547    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
548        match self {
549            Self::TimedOut => write!(f, "timed out"),
550            Self::Other => write!(f, "failed"),
551        }
552    }
553}
554
555pub type StreamError = Error<StreamErrorKind>;
556
557fn backoff(attempt: u32, _: &impl std::error::Error) -> Duration {
558    if attempt < 5 {
559        Duration::from_millis(500 * attempt as u64)
560    } else {
561        Duration::from_secs(10)
562    }
563}