async_nats/jetstream/consumer/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26#[cfg(feature = "server_2_11")]
27use time::OffsetDateTime;
28
29use super::context::RequestError;
30use super::stream::ClusterInfo;
31use super::Context;
32use crate::error::Error;
33use crate::jetstream::consumer;
34
35pub trait IntoConsumerConfig {
36    fn into_consumer_config(self) -> Config;
37}
38
39#[allow(dead_code)]
40#[derive(Clone, Debug)]
41pub struct Consumer<T: IntoConsumerConfig> {
42    pub(crate) context: Context,
43    pub(crate) config: T,
44    pub(crate) info: Info,
45}
46
47impl<T: IntoConsumerConfig> Consumer<T> {
48    pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
49        Self {
50            config,
51            info,
52            context,
53        }
54    }
55}
56impl<T: IntoConsumerConfig> Consumer<T> {
57    /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
58    /// [Consumer] and returns it.
59    ///
60    /// # Examples
61    ///
62    /// ```no_run
63    /// # #[tokio::main]
64    /// # async fn main() -> Result<(), async_nats::Error> {
65    /// use async_nats::jetstream::consumer::PullConsumer;
66    /// let client = async_nats::connect("localhost:4222").await?;
67    /// let jetstream = async_nats::jetstream::new(client);
68    ///
69    /// let mut consumer: PullConsumer = jetstream
70    ///     .get_stream("events")
71    ///     .await?
72    ///     .get_consumer("pull")
73    ///     .await?;
74    ///
75    /// let info = consumer.info().await?;
76    /// # Ok(())
77    /// # }
78    /// ```
79    pub async fn info(&mut self) -> Result<&consumer::Info, RequestError> {
80        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
81
82        let info = self.context.request(subject, &json!({})).await?;
83        self.info = info;
84        Ok(&self.info)
85    }
86
87    async fn fetch_info(&self) -> Result<consumer::Info, RequestError> {
88        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
89        self.context.request(subject, &json!({})).await
90    }
91
92    /// Returns cached [Info] for the [Consumer].
93    /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
94    /// [Info].
95    ///
96    /// # Examples
97    ///
98    /// ```no_run
99    /// # #[tokio::main]
100    /// # async fn main() -> Result<(), async_nats::Error> {
101    /// use async_nats::jetstream::consumer::PullConsumer;
102    /// let client = async_nats::connect("localhost:4222").await?;
103    /// let jetstream = async_nats::jetstream::new(client);
104    ///
105    /// let consumer: PullConsumer = jetstream
106    ///     .get_stream("events")
107    ///     .await?
108    ///     .get_consumer("pull")
109    ///     .await?;
110    ///
111    /// let info = consumer.cached_info();
112    /// # Ok(())
113    /// # }
114    /// ```
115    pub fn cached_info(&self) -> &consumer::Info {
116        &self.info
117    }
118}
119
120/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
121/// [Pull][crate::jetstream::consumer::pull::Config] or
122/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
123/// a valid target one.
124pub trait FromConsumer {
125    fn try_from_consumer_config(
126        config: crate::jetstream::consumer::Config,
127    ) -> Result<Self, crate::Error>
128    where
129        Self: Sized;
130}
131
132pub type PullConsumer = Consumer<self::pull::Config>;
133pub type PushConsumer = Consumer<self::push::Config>;
134pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
135pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
136
137/// Information about a consumer
138#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
139pub struct Info {
140    /// The stream being consumed
141    pub stream_name: String,
142    /// The consumer's unique name
143    pub name: String,
144    /// The time the consumer was created
145    #[serde(with = "rfc3339")]
146    pub created: time::OffsetDateTime,
147    /// The consumer's configuration
148    pub config: Config,
149    /// Statistics for delivered messages
150    pub delivered: SequenceInfo,
151    /// Statistics for acknowledged messages
152    pub ack_floor: SequenceInfo,
153    /// The number of messages delivered but not yet acknowledged
154    pub num_ack_pending: usize,
155    /// The number of messages re-sent after acknowledgment was not received within the configured
156    /// time threshold
157    pub num_redelivered: usize,
158    /// The number of pull requests waiting for messages
159    pub num_waiting: usize,
160    /// The number of messages pending delivery
161    pub num_pending: u64,
162    /// Information about the consumer's cluster
163    #[serde(skip_serializing_if = "is_default")]
164    pub cluster: Option<ClusterInfo>,
165    /// Indicates if any client is connected and receiving messages from a push consumer
166    #[serde(default, skip_serializing_if = "is_default")]
167    pub push_bound: bool,
168    #[cfg(feature = "server_2_11")]
169    /// Indicates if the consumer is paused
170    #[serde(default)]
171    pub paused: bool,
172    #[cfg(feature = "server_2_11")]
173    /// The remaining time the consumer is paused
174    #[serde(default, with = "serde_nanos")]
175    pub pause_remaining: Option<Duration>,
176}
177
178/// Information about a consumer and the stream it is consuming
179#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
180pub struct SequenceInfo {
181    /// How far along the consumer has progressed
182    #[serde(rename = "consumer_seq")]
183    pub consumer_sequence: u64,
184    /// The aggregate for all stream consumers
185    #[serde(rename = "stream_seq")]
186    pub stream_sequence: u64,
187    // Last activity for the sequence
188    #[serde(
189        default,
190        with = "rfc3339::option",
191        skip_serializing_if = "Option::is_none"
192    )]
193    pub last_active: Option<time::OffsetDateTime>,
194}
195
196/// Configuration for consumers. From a high level, the
197/// `durable_name` and `deliver_subject` fields have a particularly
198/// strong influence on the consumer's overall behavior.
199#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
200pub struct Config {
201    /// Setting `deliver_subject` to `Some(...)` will cause this consumer
202    /// to be "push-based". This is analogous in some ways to a normal
203    /// NATS subscription (rather than a queue subscriber) in that the
204    /// consumer will receive all messages published to the stream that
205    /// the consumer is interested in. Acknowledgment policies such as
206    /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
207    /// push-based consumers, which reduce the amount of effort spent
208    /// tracking delivery. Combining `AckPolicy::All` with
209    /// `Consumer::process_batch` enables particularly nice throughput
210    /// optimizations.
211    ///
212    /// Setting `deliver_subject` to `None` will cause this consumer to
213    /// be "pull-based", and will require explicit acknowledgment of
214    /// each message. This is analogous in some ways to a normal NATS
215    /// queue subscriber, where a message will be delivered to a single
216    /// subscriber. Pull-based consumers are intended to be used for
217    /// workloads where it is desirable to have a single process receive
218    /// a message. The only valid `ack_policy` for pull-based consumers
219    /// is the default of `AckPolicy::Explicit`, which acknowledges each
220    /// processed message individually. Pull-based consumers may be a
221    /// good choice for work queue-like workloads where you want messages
222    /// to be handled by a single consumer process. Note that it is
223    /// possible to deliver a message to multiple consumers if the
224    /// consumer crashes or is slow to acknowledge the delivered message.
225    /// This is a fundamental behavior present in all distributed systems
226    /// that attempt redelivery when a consumer fails to acknowledge a message.
227    /// This is known as "at least once" message processing. To achieve
228    /// "exactly once" semantics, it is necessary to implement idempotent
229    /// semantics in any system that is written to as a result of processing
230    /// a message.
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    pub deliver_subject: Option<String>,
233
234    /// Setting `durable_name` to `Some(...)` will cause this consumer
235    /// to be "durable". This may be a good choice for workloads that
236    /// benefit from the `JetStream` server or cluster remembering the
237    /// progress of consumers for fault tolerance purposes. If a consumer
238    /// crashes, the `JetStream` server or cluster will remember which
239    /// messages the consumer acknowledged. When the consumer recovers,
240    /// this information will allow the consumer to resume processing
241    /// where it left off. If you're unsure, set this to `Some(...)`.
242    ///
243    /// Setting `durable_name` to `None` will cause this consumer to
244    /// be "ephemeral". This may be a good choice for workloads where
245    /// you don't need the `JetStream` server to remember the consumer's
246    /// progress in the case of a crash, such as certain "high churn"
247    /// workloads or workloads where a crashed instance is not required
248    /// to recover.
249    #[serde(default, skip_serializing_if = "Option::is_none")]
250    pub durable_name: Option<String>,
251    /// A name of the consumer. Can be specified for both durable and ephemeral
252    /// consumers.
253    #[serde(default, skip_serializing_if = "Option::is_none")]
254    pub name: Option<String>,
255    /// A short description of the purpose of this consumer.
256    #[serde(default, skip_serializing_if = "Option::is_none")]
257    pub description: Option<String>,
258    /// Deliver group to use.
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    pub deliver_group: Option<String>,
261    /// Allows for a variety of options that determine how this consumer will receive messages
262    #[serde(flatten)]
263    pub deliver_policy: DeliverPolicy,
264    /// How messages should be acknowledged
265    pub ack_policy: AckPolicy,
266    /// How long to allow messages to remain un-acknowledged before attempting redelivery
267    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
268    pub ack_wait: Duration,
269    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
270    #[serde(default, skip_serializing_if = "is_default")]
271    pub max_deliver: i64,
272    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
273    #[serde(default, skip_serializing_if = "is_default")]
274    pub filter_subject: String,
275    #[cfg(feature = "server_2_10")]
276    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
277    #[serde(default, skip_serializing_if = "is_default")]
278    pub filter_subjects: Vec<String>,
279    /// Whether messages are sent as quickly as possible or at the rate of receipt
280    pub replay_policy: ReplayPolicy,
281    /// The rate of message delivery in bits per second
282    #[serde(default, skip_serializing_if = "is_default")]
283    pub rate_limit: u64,
284    /// What percentage of acknowledgments should be samples for observability, 0-100
285    #[serde(
286        rename = "sample_freq",
287        with = "sample_freq_deser",
288        default,
289        skip_serializing_if = "is_default"
290    )]
291    pub sample_frequency: u8,
292    /// The maximum number of waiting consumers.
293    #[serde(default, skip_serializing_if = "is_default")]
294    pub max_waiting: i64,
295    /// The maximum number of unacknowledged messages that may be
296    /// in-flight before pausing sending additional messages to
297    /// this consumer.
298    #[serde(default, skip_serializing_if = "is_default")]
299    pub max_ack_pending: i64,
300    /// Only deliver headers without payloads.
301    #[serde(default, skip_serializing_if = "is_default")]
302    pub headers_only: bool,
303    /// Enable flow control messages
304    #[serde(default, skip_serializing_if = "is_default")]
305    pub flow_control: bool,
306    /// Enable idle heartbeat messages
307    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
308    pub idle_heartbeat: Duration,
309    /// Maximum size of a request batch
310    #[serde(default, skip_serializing_if = "is_default")]
311    pub max_batch: i64,
312    /// Maximum size of a request max_bytes
313    #[serde(default, skip_serializing_if = "is_default")]
314    pub max_bytes: i64,
315    /// Maximum value for request expiration
316    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
317    pub max_expires: Duration,
318    /// Threshold for ephemeral consumer inactivity
319    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
320    pub inactive_threshold: Duration,
321    /// Number of consumer replicas
322    #[serde(default, skip_serializing_if = "is_default")]
323    pub num_replicas: usize,
324    /// Force consumer to use memory storage.
325    #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
326    pub memory_storage: bool,
327
328    #[cfg(feature = "server_2_10")]
329    /// Additional consumer metadata.
330    #[serde(default, skip_serializing_if = "is_default")]
331    pub metadata: HashMap<String, String>,
332    /// Custom backoff for missed acknowledgments.
333    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
334    pub backoff: Vec<Duration>,
335    #[cfg(feature = "server_2_11")]
336    #[serde(default, skip_serializing_if = "is_default")]
337    pub priority_policy: PriorityPolicy,
338    #[cfg(feature = "server_2_11")]
339    #[serde(default, skip_serializing_if = "is_default")]
340    pub priority_groups: Vec<String>,
341    /// For suspending the consumer until the deadline.
342    #[cfg(feature = "server_2_11")]
343    #[serde(
344        default,
345        with = "rfc3339::option",
346        skip_serializing_if = "Option::is_none"
347    )]
348    pub pause_until: Option<OffsetDateTime>,
349}
350
351#[cfg(feature = "server_2_11")]
352#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
353pub enum PriorityPolicy {
354    #[serde(rename = "overflow")]
355    Overflow,
356    /// This feature is not yet supported by the client.
357    /// It's part of the enum to ensure that the client can deserialize
358    /// Consumer configurations that used [PriorityPolicy::PinnedClient].
359    #[serde(rename = "pinned_client")]
360    PinnedClient,
361    #[serde(rename = "none")]
362    #[default]
363    None,
364}
365
366impl From<&Config> for Config {
367    fn from(cc: &Config) -> Config {
368        cc.clone()
369    }
370}
371
372impl From<&str> for Config {
373    fn from(s: &str) -> Config {
374        Config {
375            durable_name: Some(s.to_string()),
376            ..Default::default()
377        }
378    }
379}
380
381impl IntoConsumerConfig for Config {
382    fn into_consumer_config(self) -> Config {
383        self
384    }
385}
386impl IntoConsumerConfig for &Config {
387    fn into_consumer_config(self) -> Config {
388        self.clone()
389    }
390}
391
392impl FromConsumer for Config {
393    fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
394    where
395        Self: Sized,
396    {
397        Ok(config)
398    }
399}
400
401/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
402#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
403#[repr(u8)]
404#[serde(tag = "deliver_policy")]
405pub enum DeliverPolicy {
406    /// All causes the consumer to receive the oldest messages still present in the system.
407    /// This is the default.
408    #[default]
409    #[serde(rename = "all")]
410    All,
411    /// Last will start the consumer with the last sequence received.
412    #[serde(rename = "last")]
413    Last,
414    /// New will only deliver new messages that are received by the `JetStream` server
415    /// after the consumer is created.
416    #[serde(rename = "new")]
417    New,
418    /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
419    /// parameter.
420    #[serde(rename = "by_start_sequence")]
421    ByStartSequence {
422        #[serde(rename = "opt_start_seq")]
423        start_sequence: u64,
424    },
425    /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
426    /// configured `opt_start_time` parameter.
427    #[serde(rename = "by_start_time")]
428    ByStartTime {
429        #[serde(rename = "opt_start_time", with = "rfc3339")]
430        start_time: time::OffsetDateTime,
431    },
432    /// `LastPerSubject` will start the consumer with the last message
433    /// for all subjects received.
434    #[serde(rename = "last_per_subject")]
435    LastPerSubject,
436}
437
438/// Determines whether messages will be acknowledged individually,
439/// in batches, or never.
440#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
441#[repr(u8)]
442pub enum AckPolicy {
443    /// All messages will be individually acknowledged. This is the default.
444    #[default]
445    #[serde(rename = "explicit")]
446    Explicit = 2,
447    /// No messages are acknowledged.
448    #[serde(rename = "none")]
449    None = 0,
450    /// Acknowledges all messages with lower sequence numbers when a later
451    /// message is acknowledged. Useful for "batching" acknowledgment.
452    #[serde(rename = "all")]
453    All = 1,
454}
455
456/// `ReplayPolicy` controls whether messages are sent to a consumer
457/// as quickly as possible or at the rate that they were originally received at.
458#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
459#[repr(u8)]
460pub enum ReplayPolicy {
461    /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
462    #[default]
463    #[serde(rename = "instant")]
464    Instant = 0,
465    /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
466    /// is useful for replaying traffic in a testing or staging environment based on production
467    /// traffic patterns.
468    #[serde(rename = "original")]
469    Original = 1,
470}
471
472fn is_default<T: Default + Eq>(t: &T) -> bool {
473    t == &T::default()
474}
475
476pub(crate) mod sample_freq_deser {
477    pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
478    where
479        T: std::str::FromStr,
480        T::Err: std::fmt::Display,
481        D: serde::Deserializer<'de>,
482    {
483        let s = <String as serde::Deserialize>::deserialize(deserializer)?;
484
485        let mut spliterator = s.split('%');
486        match (spliterator.next(), spliterator.next()) {
487            // No percentage occurred, parse as number
488            (Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
489            // A percentage sign occurred right at the end
490            (Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
491            _ => Err(serde::de::Error::custom(format!(
492                "Malformed sample frequency: {s}"
493            ))),
494        }
495    }
496
497    pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
498    where
499        T: std::fmt::Display,
500        S: serde::Serializer,
501    {
502        serializer.serialize_str(&value.to_string())
503    }
504}
505
506#[derive(Clone, Copy, Debug, PartialEq)]
507pub enum StreamErrorKind {
508    TimedOut,
509    Other,
510}
511
512impl std::fmt::Display for StreamErrorKind {
513    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514        match self {
515            Self::TimedOut => write!(f, "timed out"),
516            Self::Other => write!(f, "failed"),
517        }
518    }
519}
520
521pub type StreamError = Error<StreamErrorKind>;