async_nats/jetstream/consumer/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26#[cfg(feature = "server_2_11")]
27use time::OffsetDateTime;
28
29use super::context::{ConsumerInfoError, RequestError};
30use super::response::Response;
31use super::stream::ClusterInfo;
32use super::Context;
33use crate::error::Error;
34use crate::jetstream::consumer;
35
36pub trait IntoConsumerConfig {
37    fn into_consumer_config(self) -> Config;
38}
39
40#[allow(dead_code)]
41#[derive(Clone, Debug)]
42pub struct Consumer<T: IntoConsumerConfig> {
43    pub(crate) context: Context,
44    pub(crate) config: T,
45    pub(crate) info: Info,
46}
47
48impl<T: IntoConsumerConfig> Consumer<T> {
49    pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
50        Self {
51            config,
52            info,
53            context,
54        }
55    }
56}
57impl<T: IntoConsumerConfig> Consumer<T> {
58    /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
59    /// [Consumer] and returns it.
60    ///
61    /// # Examples
62    ///
63    /// ```no_run
64    /// # #[tokio::main]
65    /// # async fn main() -> Result<(), async_nats::Error> {
66    /// use async_nats::jetstream::consumer::PullConsumer;
67    /// let client = async_nats::connect("localhost:4222").await?;
68    /// let jetstream = async_nats::jetstream::new(client);
69    ///
70    /// let mut consumer: PullConsumer = jetstream
71    ///     .get_stream("events")
72    ///     .await?
73    ///     .get_consumer("pull")
74    ///     .await?;
75    ///
76    /// let info = consumer.info().await?;
77    /// # Ok(())
78    /// # }
79    /// ```
80    pub async fn info(&mut self) -> Result<&consumer::Info, ConsumerInfoError> {
81        let info = self.get_info().await?;
82        self.info = info;
83        Ok(&self.info)
84    }
85
86    /// Retrieves `info` about [Consumer] from the server. Does not update the cache.
87    ///
88    /// # Examples
89    ///
90    /// ```no_run
91    /// # #[tokio::main]
92    /// # async fn main() -> Result<(), async_nats::Error> {
93    /// use async_nats::jetstream::consumer::PullConsumer;
94    /// let client = async_nats::connect("localhost:4222").await?;
95    /// let jetstream = async_nats::jetstream::new(client);
96    ///
97    /// let mut consumer: PullConsumer = jetstream
98    ///     .get_stream("events")
99    ///     .await?
100    ///     .get_consumer("pull")
101    ///     .await?;
102    ///
103    /// let info = consumer.get_info().await?;
104    /// # Ok(())
105    /// # }
106    /// ```
107    pub async fn get_info(&self) -> Result<consumer::Info, ConsumerInfoError> {
108        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
109        match self.context.request(subject, &json!({})).await? {
110            Response::Err { error } => Err(error.into()),
111            Response::Ok(info) => Ok(info),
112        }
113    }
114
115    /// Returns cached [Info] for the [Consumer].
116    /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
117    /// [Info].
118    ///
119    /// # Examples
120    ///
121    /// ```no_run
122    /// # #[tokio::main]
123    /// # async fn main() -> Result<(), async_nats::Error> {
124    /// use async_nats::jetstream::consumer::PullConsumer;
125    /// let client = async_nats::connect("localhost:4222").await?;
126    /// let jetstream = async_nats::jetstream::new(client);
127    ///
128    /// let consumer: PullConsumer = jetstream
129    ///     .get_stream("events")
130    ///     .await?
131    ///     .get_consumer("pull")
132    ///     .await?;
133    ///
134    /// let info = consumer.cached_info();
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub fn cached_info(&self) -> &consumer::Info {
139        &self.info
140    }
141}
142
143/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
144/// [Pull][crate::jetstream::consumer::pull::Config] or
145/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
146/// a valid target one.
147pub trait FromConsumer {
148    fn try_from_consumer_config(
149        config: crate::jetstream::consumer::Config,
150    ) -> Result<Self, crate::Error>
151    where
152        Self: Sized;
153}
154
155pub type PullConsumer = Consumer<self::pull::Config>;
156pub type PushConsumer = Consumer<self::push::Config>;
157pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
158pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
159
160/// Information about a consumer
161#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
162pub struct Info {
163    /// The stream being consumed
164    pub stream_name: String,
165    /// The consumer's unique name
166    pub name: String,
167    /// The time the consumer was created
168    #[serde(with = "rfc3339")]
169    pub created: time::OffsetDateTime,
170    /// The consumer's configuration
171    pub config: Config,
172    /// Statistics for delivered messages
173    pub delivered: SequenceInfo,
174    /// Statistics for acknowledged messages
175    pub ack_floor: SequenceInfo,
176    /// The number of messages delivered but not yet acknowledged
177    pub num_ack_pending: usize,
178    /// The number of messages re-sent after acknowledgment was not received within the configured
179    /// time threshold
180    pub num_redelivered: usize,
181    /// The number of pull requests waiting for messages
182    pub num_waiting: usize,
183    /// The number of messages pending delivery
184    pub num_pending: u64,
185    /// Information about the consumer's cluster
186    #[serde(skip_serializing_if = "is_default")]
187    pub cluster: Option<ClusterInfo>,
188    /// Indicates if any client is connected and receiving messages from a push consumer
189    #[serde(default, skip_serializing_if = "is_default")]
190    pub push_bound: bool,
191    #[cfg(feature = "server_2_11")]
192    /// Indicates if the consumer is paused
193    #[serde(default)]
194    pub paused: bool,
195    #[cfg(feature = "server_2_11")]
196    /// The remaining time the consumer is paused
197    #[serde(default, with = "serde_nanos")]
198    pub pause_remaining: Option<Duration>,
199}
200
201/// Information about a consumer and the stream it is consuming
202#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
203pub struct SequenceInfo {
204    /// How far along the consumer has progressed
205    #[serde(rename = "consumer_seq")]
206    pub consumer_sequence: u64,
207    /// The aggregate for all stream consumers
208    #[serde(rename = "stream_seq")]
209    pub stream_sequence: u64,
210    // Last activity for the sequence
211    #[serde(
212        default,
213        with = "rfc3339::option",
214        skip_serializing_if = "Option::is_none"
215    )]
216    pub last_active: Option<time::OffsetDateTime>,
217}
218
219/// Configuration for consumers. From a high level, the
220/// `durable_name` and `deliver_subject` fields have a particularly
221/// strong influence on the consumer's overall behavior.
222#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
223pub struct Config {
224    /// Setting `deliver_subject` to `Some(...)` will cause this consumer
225    /// to be "push-based". This is analogous in some ways to a normal
226    /// NATS subscription (rather than a queue subscriber) in that the
227    /// consumer will receive all messages published to the stream that
228    /// the consumer is interested in. Acknowledgment policies such as
229    /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
230    /// push-based consumers, which reduce the amount of effort spent
231    /// tracking delivery. Combining `AckPolicy::All` with
232    /// `Consumer::process_batch` enables particularly nice throughput
233    /// optimizations.
234    ///
235    /// Setting `deliver_subject` to `None` will cause this consumer to
236    /// be "pull-based", and will require explicit acknowledgment of
237    /// each message. This is analogous in some ways to a normal NATS
238    /// queue subscriber, where a message will be delivered to a single
239    /// subscriber. Pull-based consumers are intended to be used for
240    /// workloads where it is desirable to have a single process receive
241    /// a message. The only valid `ack_policy` for pull-based consumers
242    /// is the default of `AckPolicy::Explicit`, which acknowledges each
243    /// processed message individually. Pull-based consumers may be a
244    /// good choice for work queue-like workloads where you want messages
245    /// to be handled by a single consumer process. Note that it is
246    /// possible to deliver a message to multiple consumers if the
247    /// consumer crashes or is slow to acknowledge the delivered message.
248    /// This is a fundamental behavior present in all distributed systems
249    /// that attempt redelivery when a consumer fails to acknowledge a message.
250    /// This is known as "at least once" message processing. To achieve
251    /// "exactly once" semantics, it is necessary to implement idempotent
252    /// semantics in any system that is written to as a result of processing
253    /// a message.
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub deliver_subject: Option<String>,
256
257    /// Setting `durable_name` to `Some(...)` will cause this consumer
258    /// to be "durable". This may be a good choice for workloads that
259    /// benefit from the `JetStream` server or cluster remembering the
260    /// progress of consumers for fault tolerance purposes. If a consumer
261    /// crashes, the `JetStream` server or cluster will remember which
262    /// messages the consumer acknowledged. When the consumer recovers,
263    /// this information will allow the consumer to resume processing
264    /// where it left off. If you're unsure, set this to `Some(...)`.
265    ///
266    /// Setting `durable_name` to `None` will cause this consumer to
267    /// be "ephemeral". This may be a good choice for workloads where
268    /// you don't need the `JetStream` server to remember the consumer's
269    /// progress in the case of a crash, such as certain "high churn"
270    /// workloads or workloads where a crashed instance is not required
271    /// to recover.
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub durable_name: Option<String>,
274    /// A name of the consumer. Can be specified for both durable and ephemeral
275    /// consumers.
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub name: Option<String>,
278    /// A short description of the purpose of this consumer.
279    #[serde(default, skip_serializing_if = "Option::is_none")]
280    pub description: Option<String>,
281    /// Deliver group to use.
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    pub deliver_group: Option<String>,
284    /// Allows for a variety of options that determine how this consumer will receive messages
285    #[serde(flatten)]
286    pub deliver_policy: DeliverPolicy,
287    /// How messages should be acknowledged
288    #[serde(default)]
289    pub ack_policy: AckPolicy,
290    /// How long to allow messages to remain un-acknowledged before attempting redelivery
291    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
292    pub ack_wait: Duration,
293    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
294    #[serde(default, skip_serializing_if = "is_default")]
295    pub max_deliver: i64,
296    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
297    #[serde(default, skip_serializing_if = "is_default")]
298    pub filter_subject: String,
299    #[cfg(feature = "server_2_10")]
300    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
301    #[serde(default, skip_serializing_if = "is_default")]
302    pub filter_subjects: Vec<String>,
303    /// Whether messages are sent as quickly as possible or at the rate of receipt
304    #[serde(default)]
305    pub replay_policy: ReplayPolicy,
306    /// The rate of message delivery in bits per second
307    #[serde(default, skip_serializing_if = "is_default")]
308    pub rate_limit: u64,
309    /// What percentage of acknowledgments should be samples for observability, 0-100
310    #[serde(
311        rename = "sample_freq",
312        with = "sample_freq_deser",
313        default,
314        skip_serializing_if = "is_default"
315    )]
316    pub sample_frequency: u8,
317    /// The maximum number of waiting consumers.
318    #[serde(default, skip_serializing_if = "is_default")]
319    pub max_waiting: i64,
320    /// The maximum number of unacknowledged messages that may be
321    /// in-flight before pausing sending additional messages to
322    /// this consumer.
323    #[serde(default, skip_serializing_if = "is_default")]
324    pub max_ack_pending: i64,
325    /// Only deliver headers without payloads.
326    #[serde(default, skip_serializing_if = "is_default")]
327    pub headers_only: bool,
328    /// Enable flow control messages
329    #[serde(default, skip_serializing_if = "is_default")]
330    pub flow_control: bool,
331    /// Enable idle heartbeat messages
332    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
333    pub idle_heartbeat: Duration,
334    /// Maximum size of a request batch
335    #[serde(default, skip_serializing_if = "is_default")]
336    pub max_batch: i64,
337    /// Maximum size of a request max_bytes
338    #[serde(default, skip_serializing_if = "is_default")]
339    pub max_bytes: i64,
340    /// Maximum value for request expiration
341    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
342    pub max_expires: Duration,
343    /// Threshold for ephemeral consumer inactivity
344    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
345    pub inactive_threshold: Duration,
346    /// Number of consumer replicas
347    #[serde(default, skip_serializing_if = "is_default")]
348    pub num_replicas: usize,
349    /// Force consumer to use memory storage.
350    #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
351    pub memory_storage: bool,
352
353    #[cfg(feature = "server_2_10")]
354    /// Additional consumer metadata.
355    #[serde(default, skip_serializing_if = "is_default")]
356    pub metadata: HashMap<String, String>,
357    /// Custom backoff for missed acknowledgments.
358    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
359    pub backoff: Vec<Duration>,
360    #[cfg(feature = "server_2_11")]
361    #[serde(default, skip_serializing_if = "is_default")]
362    pub priority_policy: PriorityPolicy,
363    #[cfg(feature = "server_2_11")]
364    #[serde(default, skip_serializing_if = "is_default")]
365    pub priority_groups: Vec<String>,
366    /// For suspending the consumer until the deadline.
367    #[cfg(feature = "server_2_11")]
368    #[serde(
369        default,
370        with = "rfc3339::option",
371        skip_serializing_if = "Option::is_none"
372    )]
373    pub pause_until: Option<OffsetDateTime>,
374}
375
376#[cfg(feature = "server_2_11")]
377#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
378pub enum PriorityPolicy {
379    #[serde(rename = "overflow")]
380    Overflow,
381    /// This feature is not yet supported by the client.
382    /// It's part of the enum to ensure that the client can deserialize
383    /// Consumer configurations that used [PriorityPolicy::PinnedClient].
384    #[serde(rename = "pinned_client")]
385    PinnedClient,
386    #[cfg(feature = "server_2_12")]
387    #[serde(rename = "prioritized")]
388    Prioritized,
389    #[serde(rename = "none")]
390    #[default]
391    None,
392}
393
394impl From<&Config> for Config {
395    fn from(cc: &Config) -> Config {
396        cc.clone()
397    }
398}
399
400impl From<&str> for Config {
401    fn from(s: &str) -> Config {
402        Config {
403            durable_name: Some(s.to_string()),
404            ..Default::default()
405        }
406    }
407}
408
409impl IntoConsumerConfig for Config {
410    fn into_consumer_config(self) -> Config {
411        self
412    }
413}
414impl IntoConsumerConfig for &Config {
415    fn into_consumer_config(self) -> Config {
416        self.clone()
417    }
418}
419
420impl FromConsumer for Config {
421    fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
422    where
423        Self: Sized,
424    {
425        Ok(config)
426    }
427}
428
429/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
430#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
431#[repr(u8)]
432#[serde(tag = "deliver_policy")]
433pub enum DeliverPolicy {
434    /// All causes the consumer to receive the oldest messages still present in the system.
435    /// This is the default.
436    #[default]
437    #[serde(rename = "all")]
438    All,
439    /// Last will start the consumer with the last sequence received.
440    #[serde(rename = "last")]
441    Last,
442    /// New will only deliver new messages that are received by the `JetStream` server
443    /// after the consumer is created.
444    #[serde(rename = "new")]
445    New,
446    /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
447    /// parameter.
448    #[serde(rename = "by_start_sequence")]
449    ByStartSequence {
450        #[serde(rename = "opt_start_seq")]
451        start_sequence: u64,
452    },
453    /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
454    /// configured `opt_start_time` parameter.
455    #[serde(rename = "by_start_time")]
456    ByStartTime {
457        #[serde(rename = "opt_start_time", with = "rfc3339")]
458        start_time: time::OffsetDateTime,
459    },
460    /// `LastPerSubject` will start the consumer with the last message
461    /// for all subjects received.
462    #[serde(rename = "last_per_subject")]
463    LastPerSubject,
464}
465
466/// Determines whether messages will be acknowledged individually,
467/// in batches, or never.
468#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
469#[repr(u8)]
470pub enum AckPolicy {
471    /// All messages will be individually acknowledged. This is the default.
472    #[default]
473    #[serde(rename = "explicit")]
474    Explicit = 2,
475    /// No messages are acknowledged.
476    #[serde(rename = "none")]
477    None = 0,
478    /// Acknowledges all messages with lower sequence numbers when a later
479    /// message is acknowledged. Useful for "batching" acknowledgment.
480    #[serde(rename = "all")]
481    All = 1,
482}
483
484/// `ReplayPolicy` controls whether messages are sent to a consumer
485/// as quickly as possible or at the rate that they were originally received at.
486#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
487#[repr(u8)]
488pub enum ReplayPolicy {
489    /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
490    #[default]
491    #[serde(rename = "instant")]
492    Instant = 0,
493    /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
494    /// is useful for replaying traffic in a testing or staging environment based on production
495    /// traffic patterns.
496    #[serde(rename = "original")]
497    Original = 1,
498}
499
500fn is_default<T: Default + Eq>(t: &T) -> bool {
501    t == &T::default()
502}
503
504pub(crate) mod sample_freq_deser {
505    pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
506    where
507        T: std::str::FromStr,
508        T::Err: std::fmt::Display,
509        D: serde::Deserializer<'de>,
510    {
511        let s = <String as serde::Deserialize>::deserialize(deserializer)?;
512
513        let mut spliterator = s.split('%');
514        match (spliterator.next(), spliterator.next()) {
515            // No percentage occurred, parse as number
516            (Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
517            // A percentage sign occurred right at the end
518            (Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
519            _ => Err(serde::de::Error::custom(format!(
520                "Malformed sample frequency: {s}"
521            ))),
522        }
523    }
524
525    pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
526    where
527        T: std::fmt::Display,
528        S: serde::Serializer,
529    {
530        serializer.serialize_str(&value.to_string())
531    }
532}
533
534#[derive(Clone, Copy, Debug, PartialEq)]
535pub enum StreamErrorKind {
536    TimedOut,
537    Other,
538}
539
540impl std::fmt::Display for StreamErrorKind {
541    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
542        match self {
543            Self::TimedOut => write!(f, "timed out"),
544            Self::Other => write!(f, "failed"),
545        }
546    }
547}
548
549pub type StreamError = Error<StreamErrorKind>;
550
551fn backoff(attempt: u32, _: &impl std::error::Error) -> Duration {
552    if attempt < 5 {
553        Duration::from_millis(500 * attempt as u64)
554    } else {
555        Duration::from_secs(10)
556    }
557}