Skip to main content

async_nats/jetstream/consumer/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26#[cfg(feature = "server_2_11")]
27use time::OffsetDateTime;
28
29use super::context::{ConsumerInfoError, RequestError};
30use super::response::Response;
31use super::stream::ClusterInfo;
32use super::Context;
33use crate::error::Error;
34use crate::jetstream::consumer;
35
36#[cfg(feature = "server_2_14")]
37use crate::jetstream::stream::{ConsumerResetError, ConsumerResetRequest, ConsumerResetResponse};
38
39pub trait IntoConsumerConfig {
40    fn into_consumer_config(self) -> Config;
41}
42
43#[allow(dead_code)]
44#[derive(Clone, Debug)]
45pub struct Consumer<T: IntoConsumerConfig> {
46    pub(crate) context: Context,
47    pub(crate) config: T,
48    pub(crate) info: Info,
49}
50
51impl<T: IntoConsumerConfig> Consumer<T> {
52    pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
53        Self {
54            config,
55            info,
56            context,
57        }
58    }
59}
60impl<T: IntoConsumerConfig> Consumer<T> {
61    /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
62    /// [Consumer] and returns it.
63    ///
64    /// When possible, use [`Message::info()`][crate::jetstream::Message::info] instead.
65    /// message metadata often already contains the needed information and does not require a server call.
66    ///
67    /// # Examples
68    ///
69    /// ```no_run
70    /// # #[tokio::main]
71    /// # async fn main() -> Result<(), async_nats::Error> {
72    /// use async_nats::jetstream::consumer::PullConsumer;
73    /// let client = async_nats::connect("localhost:4222").await?;
74    /// let jetstream = async_nats::jetstream::new(client);
75    ///
76    /// let mut consumer: PullConsumer = jetstream
77    ///     .get_stream("events")
78    ///     .await?
79    ///     .get_consumer("pull")
80    ///     .await?;
81    ///
82    /// let info = consumer.info().await?;
83    /// # Ok(())
84    /// # }
85    /// ```
86    pub async fn info(&mut self) -> Result<&consumer::Info, ConsumerInfoError> {
87        let info = self.get_info().await?;
88        self.info = info;
89        Ok(&self.info)
90    }
91
92    /// Retrieves `info` about [Consumer] from the server. Does not update the cache.
93    ///
94    /// When possible, use [`Message::info()`][crate::jetstream::Message::info] instead.
95    /// message metadata often already contains the needed information and does not require a server call.
96    ///
97    /// # Examples
98    ///
99    /// ```no_run
100    /// # #[tokio::main]
101    /// # async fn main() -> Result<(), async_nats::Error> {
102    /// use async_nats::jetstream::consumer::PullConsumer;
103    /// let client = async_nats::connect("localhost:4222").await?;
104    /// let jetstream = async_nats::jetstream::new(client);
105    ///
106    /// let mut consumer: PullConsumer = jetstream
107    ///     .get_stream("events")
108    ///     .await?
109    ///     .get_consumer("pull")
110    ///     .await?;
111    ///
112    /// let info = consumer.get_info().await?;
113    /// # Ok(())
114    /// # }
115    /// ```
116    pub async fn get_info(&self) -> Result<consumer::Info, ConsumerInfoError> {
117        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
118        match self.context.request(subject, &json!({})).await? {
119            Response::Err { error } => Err(error.into()),
120            Response::Ok(info) => Ok(info),
121        }
122    }
123
124    /// Returns cached [Info] for the [Consumer].
125    /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
126    /// [Info].
127    ///
128    /// # Examples
129    ///
130    /// ```no_run
131    /// # #[tokio::main]
132    /// # async fn main() -> Result<(), async_nats::Error> {
133    /// use async_nats::jetstream::consumer::PullConsumer;
134    /// let client = async_nats::connect("localhost:4222").await?;
135    /// let jetstream = async_nats::jetstream::new(client);
136    ///
137    /// let consumer: PullConsumer = jetstream
138    ///     .get_stream("events")
139    ///     .await?
140    ///     .get_consumer("pull")
141    ///     .await?;
142    ///
143    /// let info = consumer.cached_info();
144    /// # Ok(())
145    /// # }
146    /// ```
147    pub fn cached_info(&self) -> &consumer::Info {
148        &self.info
149    }
150
151    /// Reset this consumer's delivery state (ADR-60).
152    ///
153    /// `seq` semantics:
154    /// - `None` (or `Some(0)`): reset back to the consumer's ack floor.
155    /// - `Some(n)` with `n > 0`: next delivered message will have a stream
156    ///   sequence of at least `n`.
157    ///
158    /// On success the cached [Info] is updated from the response.
159    ///
160    /// # Examples
161    ///
162    /// ```no_run
163    /// # #[tokio::main]
164    /// # async fn main() -> Result<(), async_nats::Error> {
165    /// use async_nats::jetstream::consumer::PullConsumer;
166    /// let client = async_nats::connect("localhost:4222").await?;
167    /// let jetstream = async_nats::jetstream::new(client);
168    ///
169    /// let mut consumer: PullConsumer = jetstream
170    ///     .get_stream("events")
171    ///     .await?
172    ///     .get_consumer("processor")
173    ///     .await?;
174    ///
175    /// consumer.reset(Some(42)).await?;
176    /// # Ok(())
177    /// # }
178    /// ```
179    #[cfg(feature = "server_2_14")]
180    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
181    pub async fn reset(
182        &mut self,
183        seq: Option<u64>,
184    ) -> Result<ConsumerResetResponse, ConsumerResetError> {
185        let subject = format!(
186            "CONSUMER.RESET.{}.{}",
187            self.info.stream_name, self.info.name
188        );
189        let payload = ConsumerResetRequest {
190            seq: seq.unwrap_or(0),
191        };
192        match self.context.request(subject, &payload).await? {
193            Response::Ok::<ConsumerResetResponse>(resp) => {
194                self.info = resp.info.clone();
195                Ok(resp)
196            }
197            Response::Err { error } => Err(error.into()),
198        }
199    }
200}
201
202/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
203/// [Pull][crate::jetstream::consumer::pull::Config] or
204/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
205/// a valid target one.
206pub trait FromConsumer {
207    fn try_from_consumer_config(
208        config: crate::jetstream::consumer::Config,
209    ) -> Result<Self, crate::Error>
210    where
211        Self: Sized;
212}
213
214pub type PullConsumer = Consumer<self::pull::Config>;
215pub type PushConsumer = Consumer<self::push::Config>;
216pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
217pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
218
219/// Information about a consumer
220#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
221pub struct Info {
222    /// The stream being consumed
223    pub stream_name: String,
224    /// The consumer's unique name
225    pub name: String,
226    /// The time the consumer was created
227    #[serde(with = "rfc3339")]
228    pub created: time::OffsetDateTime,
229    /// The consumer's configuration
230    pub config: Config,
231    /// Statistics for delivered messages
232    pub delivered: SequenceInfo,
233    /// Statistics for acknowledged messages
234    pub ack_floor: SequenceInfo,
235    /// The number of messages delivered but not yet acknowledged
236    pub num_ack_pending: usize,
237    /// The number of messages re-sent after acknowledgment was not received within the configured
238    /// time threshold
239    pub num_redelivered: usize,
240    /// The number of pull requests waiting for messages
241    pub num_waiting: usize,
242    /// The number of messages pending delivery
243    pub num_pending: u64,
244    /// Information about the consumer's cluster
245    #[serde(skip_serializing_if = "is_default")]
246    pub cluster: Option<ClusterInfo>,
247    /// Indicates if any client is connected and receiving messages from a push consumer
248    #[serde(default, skip_serializing_if = "is_default")]
249    pub push_bound: bool,
250    #[cfg(feature = "server_2_11")]
251    /// Indicates if the consumer is paused
252    #[serde(default)]
253    pub paused: bool,
254    #[cfg(feature = "server_2_11")]
255    /// The remaining time the consumer is paused
256    #[serde(default, with = "serde_nanos")]
257    pub pause_remaining: Option<Duration>,
258}
259
260/// Information about a consumer and the stream it is consuming
261#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
262pub struct SequenceInfo {
263    /// How far along the consumer has progressed
264    #[serde(rename = "consumer_seq")]
265    pub consumer_sequence: u64,
266    /// The aggregate for all stream consumers
267    #[serde(rename = "stream_seq")]
268    pub stream_sequence: u64,
269    // Last activity for the sequence
270    #[serde(
271        default,
272        with = "rfc3339::option",
273        skip_serializing_if = "Option::is_none"
274    )]
275    pub last_active: Option<time::OffsetDateTime>,
276}
277
278/// Configuration for consumers. From a high level, the
279/// `durable_name` and `deliver_subject` fields have a particularly
280/// strong influence on the consumer's overall behavior.
281#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
282pub struct Config {
283    /// Setting `deliver_subject` to `Some(...)` will cause this consumer
284    /// to be "push-based". This is analogous in some ways to a normal
285    /// NATS subscription (rather than a queue subscriber) in that the
286    /// consumer will receive all messages published to the stream that
287    /// the consumer is interested in. Acknowledgment policies such as
288    /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
289    /// push-based consumers, which reduce the amount of effort spent
290    /// tracking delivery. Combining `AckPolicy::All` with
291    /// `Consumer::process_batch` enables particularly nice throughput
292    /// optimizations.
293    ///
294    /// Setting `deliver_subject` to `None` will cause this consumer to
295    /// be "pull-based", and will require explicit acknowledgment of
296    /// each message. This is analogous in some ways to a normal NATS
297    /// queue subscriber, where a message will be delivered to a single
298    /// subscriber. Pull-based consumers are intended to be used for
299    /// workloads where it is desirable to have a single process receive
300    /// a message. The only valid `ack_policy` for pull-based consumers
301    /// is the default of `AckPolicy::Explicit`, which acknowledges each
302    /// processed message individually. Pull-based consumers may be a
303    /// good choice for work queue-like workloads where you want messages
304    /// to be handled by a single consumer process. Note that it is
305    /// possible to deliver a message to multiple consumers if the
306    /// consumer crashes or is slow to acknowledge the delivered message.
307    /// This is a fundamental behavior present in all distributed systems
308    /// that attempt redelivery when a consumer fails to acknowledge a message.
309    /// This is known as "at least once" message processing. To achieve
310    /// "exactly once" semantics, it is necessary to implement idempotent
311    /// semantics in any system that is written to as a result of processing
312    /// a message.
313    #[serde(default, skip_serializing_if = "Option::is_none")]
314    pub deliver_subject: Option<String>,
315
316    /// Setting `durable_name` to `Some(...)` will cause this consumer
317    /// to be "durable". This may be a good choice for workloads that
318    /// benefit from the `JetStream` server or cluster remembering the
319    /// progress of consumers for fault tolerance purposes. If a consumer
320    /// crashes, the `JetStream` server or cluster will remember which
321    /// messages the consumer acknowledged. When the consumer recovers,
322    /// this information will allow the consumer to resume processing
323    /// where it left off. If you're unsure, set this to `Some(...)`.
324    ///
325    /// Setting `durable_name` to `None` will cause this consumer to
326    /// be "ephemeral". This may be a good choice for workloads where
327    /// you don't need the `JetStream` server to remember the consumer's
328    /// progress in the case of a crash, such as certain "high churn"
329    /// workloads or workloads where a crashed instance is not required
330    /// to recover.
331    #[serde(default, skip_serializing_if = "Option::is_none")]
332    pub durable_name: Option<String>,
333    /// A name of the consumer. Can be specified for both durable and ephemeral
334    /// consumers.
335    #[serde(default, skip_serializing_if = "Option::is_none")]
336    pub name: Option<String>,
337    /// A short description of the purpose of this consumer.
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub description: Option<String>,
340    /// Deliver group to use.
341    #[serde(default, skip_serializing_if = "Option::is_none")]
342    pub deliver_group: Option<String>,
343    /// Allows for a variety of options that determine how this consumer will receive messages
344    #[serde(flatten)]
345    pub deliver_policy: DeliverPolicy,
346    /// How messages should be acknowledged
347    #[serde(default)]
348    pub ack_policy: AckPolicy,
349    /// How long to allow messages to remain un-acknowledged before attempting redelivery
350    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
351    pub ack_wait: Duration,
352    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
353    #[serde(default, skip_serializing_if = "is_default")]
354    pub max_deliver: i64,
355    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
356    #[serde(default, skip_serializing_if = "is_default")]
357    pub filter_subject: String,
358    #[cfg(feature = "server_2_10")]
359    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
360    #[serde(default, skip_serializing_if = "is_default")]
361    pub filter_subjects: Vec<String>,
362    /// Whether messages are sent as quickly as possible or at the rate of receipt
363    #[serde(default)]
364    pub replay_policy: ReplayPolicy,
365    /// The rate of message delivery in bits per second
366    #[serde(rename = "rate_limit_bps", default, skip_serializing_if = "is_default")]
367    pub rate_limit: u64,
368    /// What percentage of acknowledgments should be samples for observability, 0-100
369    #[serde(
370        rename = "sample_freq",
371        with = "sample_freq_deser",
372        default,
373        skip_serializing_if = "is_default"
374    )]
375    pub sample_frequency: u8,
376    /// The maximum number of waiting consumers.
377    #[serde(default, skip_serializing_if = "is_default")]
378    pub max_waiting: i64,
379    /// The maximum number of unacknowledged messages that may be
380    /// in-flight before pausing sending additional messages to
381    /// this consumer.
382    #[serde(default, skip_serializing_if = "is_default")]
383    pub max_ack_pending: i64,
384    /// Only deliver headers without payloads.
385    #[serde(default, skip_serializing_if = "is_default")]
386    pub headers_only: bool,
387    /// Enable flow control messages
388    #[serde(default, skip_serializing_if = "is_default")]
389    pub flow_control: bool,
390    /// Enable idle heartbeat messages
391    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
392    pub idle_heartbeat: Duration,
393    /// Maximum size of a request batch
394    #[serde(default, skip_serializing_if = "is_default")]
395    pub max_batch: i64,
396    /// Maximum size of a request max_bytes
397    #[serde(default, skip_serializing_if = "is_default")]
398    pub max_bytes: i64,
399    /// Maximum value for request expiration
400    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
401    pub max_expires: Duration,
402    /// Threshold for ephemeral consumer inactivity
403    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
404    pub inactive_threshold: Duration,
405    /// Number of consumer replicas
406    #[serde(default, skip_serializing_if = "is_default")]
407    pub num_replicas: usize,
408    /// Force consumer to use memory storage.
409    #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
410    pub memory_storage: bool,
411
412    #[cfg(feature = "server_2_10")]
413    /// Additional consumer metadata.
414    #[serde(default, skip_serializing_if = "is_default")]
415    pub metadata: HashMap<String, String>,
416    /// Custom backoff for missed acknowledgments.
417    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
418    pub backoff: Vec<Duration>,
419    #[cfg(feature = "server_2_11")]
420    #[serde(default, skip_serializing_if = "is_default")]
421    pub priority_policy: PriorityPolicy,
422    #[cfg(feature = "server_2_11")]
423    #[serde(default, skip_serializing_if = "is_default")]
424    pub priority_groups: Vec<String>,
425    /// For suspending the consumer until the deadline.
426    #[cfg(feature = "server_2_11")]
427    #[serde(
428        default,
429        with = "rfc3339::option",
430        skip_serializing_if = "Option::is_none"
431    )]
432    pub pause_until: Option<OffsetDateTime>,
433}
434
435#[cfg(feature = "server_2_11")]
436#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
437pub enum PriorityPolicy {
438    #[serde(rename = "overflow")]
439    Overflow,
440    /// This feature is not yet supported by the client.
441    /// It's part of the enum to ensure that the client can deserialize
442    /// Consumer configurations that used [PriorityPolicy::PinnedClient].
443    #[serde(rename = "pinned_client")]
444    PinnedClient,
445    #[cfg(feature = "server_2_12")]
446    #[serde(rename = "prioritized")]
447    Prioritized,
448    #[serde(rename = "none")]
449    #[default]
450    None,
451}
452
453impl From<&Config> for Config {
454    fn from(cc: &Config) -> Config {
455        cc.clone()
456    }
457}
458
459impl From<&str> for Config {
460    fn from(s: &str) -> Config {
461        Config {
462            durable_name: Some(s.to_string()),
463            ..Default::default()
464        }
465    }
466}
467
468impl IntoConsumerConfig for Config {
469    fn into_consumer_config(self) -> Config {
470        self
471    }
472}
473impl IntoConsumerConfig for &Config {
474    fn into_consumer_config(self) -> Config {
475        self.clone()
476    }
477}
478
479impl FromConsumer for Config {
480    fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
481    where
482        Self: Sized,
483    {
484        Ok(config)
485    }
486}
487
488/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
489#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
490#[repr(u8)]
491#[serde(tag = "deliver_policy")]
492pub enum DeliverPolicy {
493    /// All causes the consumer to receive the oldest messages still present in the system.
494    /// This is the default.
495    #[default]
496    #[serde(rename = "all")]
497    All,
498    /// Last will start the consumer with the last sequence received.
499    #[serde(rename = "last")]
500    Last,
501    /// New will only deliver new messages that are received by the `JetStream` server
502    /// after the consumer is created.
503    #[serde(rename = "new")]
504    New,
505    /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
506    /// parameter.
507    #[serde(rename = "by_start_sequence")]
508    ByStartSequence {
509        #[serde(rename = "opt_start_seq")]
510        start_sequence: u64,
511    },
512    /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
513    /// configured `opt_start_time` parameter.
514    #[serde(rename = "by_start_time")]
515    ByStartTime {
516        #[serde(rename = "opt_start_time", with = "rfc3339")]
517        start_time: time::OffsetDateTime,
518    },
519    /// `LastPerSubject` will start the consumer with the last message
520    /// for all subjects received.
521    #[serde(rename = "last_per_subject")]
522    LastPerSubject,
523}
524
525/// Determines whether messages will be acknowledged individually,
526/// in batches, or never.
527#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
528#[repr(u8)]
529pub enum AckPolicy {
530    /// All messages will be individually acknowledged. This is the default.
531    #[default]
532    #[serde(rename = "explicit")]
533    Explicit = 2,
534    /// No messages are acknowledged.
535    #[serde(rename = "none")]
536    None = 0,
537    /// Acknowledges all messages with lower sequence numbers when a later
538    /// message is acknowledged. Useful for "batching" acknowledgment.
539    #[serde(rename = "all")]
540    All = 1,
541    /// Used by server-managed durable sourcing/mirroring consumers (ADR-60).
542    /// Behaves like [`AckPolicy::All`] but acknowledgements are driven by
543    /// flow-control responses from the receiving server.
544    // Discriminant `3` matches the server-side enum order (consumer.go:335).
545    #[cfg(feature = "server_2_14")]
546    #[cfg_attr(docsrs, doc(cfg(feature = "server_2_14")))]
547    #[serde(rename = "flow_control")]
548    FlowControl = 3,
549}
550
551/// `ReplayPolicy` controls whether messages are sent to a consumer
552/// as quickly as possible or at the rate that they were originally received at.
553#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
554#[repr(u8)]
555pub enum ReplayPolicy {
556    /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
557    #[default]
558    #[serde(rename = "instant")]
559    Instant = 0,
560    /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
561    /// is useful for replaying traffic in a testing or staging environment based on production
562    /// traffic patterns.
563    #[serde(rename = "original")]
564    Original = 1,
565}
566
567fn is_default<T: Default + Eq>(t: &T) -> bool {
568    t == &T::default()
569}
570
571pub(crate) mod sample_freq_deser {
572    pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
573    where
574        T: std::str::FromStr,
575        T::Err: std::fmt::Display,
576        D: serde::Deserializer<'de>,
577    {
578        let s = <String as serde::Deserialize>::deserialize(deserializer)?;
579
580        let mut spliterator = s.split('%');
581        match (spliterator.next(), spliterator.next()) {
582            // No percentage occurred, parse as number
583            (Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
584            // A percentage sign occurred right at the end
585            (Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
586            _ => Err(serde::de::Error::custom(format!(
587                "Malformed sample frequency: {s}"
588            ))),
589        }
590    }
591
592    pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
593    where
594        T: std::fmt::Display,
595        S: serde::Serializer,
596    {
597        serializer.serialize_str(&value.to_string())
598    }
599}
600
601#[derive(Clone, Copy, Debug, PartialEq)]
602pub enum StreamErrorKind {
603    TimedOut,
604    Other,
605}
606
607impl std::fmt::Display for StreamErrorKind {
608    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
609        match self {
610            Self::TimedOut => write!(f, "timed out"),
611            Self::Other => write!(f, "failed"),
612        }
613    }
614}
615
616pub type StreamError = Error<StreamErrorKind>;
617
618fn backoff(attempt: u32, _: &impl std::error::Error) -> Duration {
619    if attempt < 5 {
620        Duration::from_millis(500 * attempt as u64)
621    } else {
622        Duration::from_secs(10)
623    }
624}