async_nats_wrpc/jetstream/consumer/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26use super::context::RequestError;
27use super::stream::ClusterInfo;
28use super::Context;
29use crate::error::Error;
30use crate::jetstream::consumer;
31
32pub trait IntoConsumerConfig {
33    fn into_consumer_config(self) -> Config;
34}
35
36#[allow(dead_code)]
37#[derive(Clone, Debug)]
38pub struct Consumer<T: IntoConsumerConfig> {
39    pub(crate) context: Context,
40    pub(crate) config: T,
41    pub(crate) info: Info,
42}
43
44impl<T: IntoConsumerConfig> Consumer<T> {
45    pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
46        Self {
47            config,
48            info,
49            context,
50        }
51    }
52}
53impl<T: IntoConsumerConfig> Consumer<T> {
54    /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
55    /// [Consumer] and returns it.
56    ///
57    /// # Examples
58    ///
59    /// ```no_run
60    /// # #[tokio::main]
61    /// # async fn main() -> Result<(), async_nats::Error> {
62    /// use async_nats::jetstream::consumer::PullConsumer;
63    /// let client = async_nats::connect("localhost:4222").await?;
64    /// let jetstream = async_nats::jetstream::new(client);
65    ///
66    /// let mut consumer: PullConsumer = jetstream
67    ///     .get_stream("events")
68    ///     .await?
69    ///     .get_consumer("pull")
70    ///     .await?;
71    ///
72    /// let info = consumer.info().await?;
73    /// # Ok(())
74    /// # }
75    /// ```
76    pub async fn info(&mut self) -> Result<&consumer::Info, RequestError> {
77        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
78
79        let info = self.context.request(subject, &json!({})).await?;
80        self.info = info;
81        Ok(&self.info)
82    }
83
84    async fn fetch_info(&self) -> Result<consumer::Info, RequestError> {
85        let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
86        self.context.request(subject, &json!({})).await
87    }
88
89    /// Returns cached [Info] for the [Consumer].
90    /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
91    /// [Info].
92    ///
93    /// # Examples
94    ///
95    /// ```no_run
96    /// # #[tokio::main]
97    /// # async fn main() -> Result<(), async_nats::Error> {
98    /// use async_nats::jetstream::consumer::PullConsumer;
99    /// let client = async_nats::connect("localhost:4222").await?;
100    /// let jetstream = async_nats::jetstream::new(client);
101    ///
102    /// let consumer: PullConsumer = jetstream
103    ///     .get_stream("events")
104    ///     .await?
105    ///     .get_consumer("pull")
106    ///     .await?;
107    ///
108    /// let info = consumer.cached_info();
109    /// # Ok(())
110    /// # }
111    /// ```
112    pub fn cached_info(&self) -> &consumer::Info {
113        &self.info
114    }
115}
116
117/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
118/// [Pull][crate::jetstream::consumer::pull::Config] or
119/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
120/// a valid target one.
121pub trait FromConsumer {
122    fn try_from_consumer_config(
123        config: crate::jetstream::consumer::Config,
124    ) -> Result<Self, crate::Error>
125    where
126        Self: Sized;
127}
128
129pub type PullConsumer = Consumer<self::pull::Config>;
130pub type PushConsumer = Consumer<self::push::Config>;
131pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
132pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
133
134/// Information about a consumer
135#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
136pub struct Info {
137    /// The stream being consumed
138    pub stream_name: String,
139    /// The consumer's unique name
140    pub name: String,
141    /// The time the consumer was created
142    #[serde(with = "rfc3339")]
143    pub created: time::OffsetDateTime,
144    /// The consumer's configuration
145    pub config: Config,
146    /// Statistics for delivered messages
147    pub delivered: SequenceInfo,
148    /// Statistics for acknowledged messages
149    pub ack_floor: SequenceInfo,
150    /// The difference between delivered and acknowledged messages
151    pub num_ack_pending: usize,
152    /// The number of messages re-sent after acknowledgment was not received within the configured
153    /// time threshold
154    pub num_redelivered: usize,
155    /// The number of waiting
156    pub num_waiting: usize,
157    /// The number of pending
158    pub num_pending: u64,
159    /// Information about the consumer's cluster
160    #[serde(skip_serializing_if = "is_default")]
161    pub cluster: Option<ClusterInfo>,
162    /// Indicates if any client is connected and receiving messages from a push consumer
163    #[serde(default, skip_serializing_if = "is_default")]
164    pub push_bound: bool,
165}
166
167/// Information about a consumer and the stream it is consuming
168#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
169pub struct SequenceInfo {
170    /// How far along the consumer has progressed
171    #[serde(rename = "consumer_seq")]
172    pub consumer_sequence: u64,
173    /// The aggregate for all stream consumers
174    #[serde(rename = "stream_seq")]
175    pub stream_sequence: u64,
176    // Last activity for the sequence
177    #[serde(
178        default,
179        with = "rfc3339::option",
180        skip_serializing_if = "Option::is_none"
181    )]
182    pub last_active: Option<time::OffsetDateTime>,
183}
184
185/// Configuration for consumers. From a high level, the
186/// `durable_name` and `deliver_subject` fields have a particularly
187/// strong influence on the consumer's overall behavior.
188#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
189pub struct Config {
190    /// Setting `deliver_subject` to `Some(...)` will cause this consumer
191    /// to be "push-based". This is analogous in some ways to a normal
192    /// NATS subscription (rather than a queue subscriber) in that the
193    /// consumer will receive all messages published to the stream that
194    /// the consumer is interested in. Acknowledgment policies such as
195    /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
196    /// push-based consumers, which reduce the amount of effort spent
197    /// tracking delivery. Combining `AckPolicy::All` with
198    /// `Consumer::process_batch` enables particularly nice throughput
199    /// optimizations.
200    ///
201    /// Setting `deliver_subject` to `None` will cause this consumer to
202    /// be "pull-based", and will require explicit acknowledgment of
203    /// each message. This is analogous in some ways to a normal NATS
204    /// queue subscriber, where a message will be delivered to a single
205    /// subscriber. Pull-based consumers are intended to be used for
206    /// workloads where it is desirable to have a single process receive
207    /// a message. The only valid `ack_policy` for pull-based consumers
208    /// is the default of `AckPolicy::Explicit`, which acknowledges each
209    /// processed message individually. Pull-based consumers may be a
210    /// good choice for work queue-like workloads where you want messages
211    /// to be handled by a single consumer process. Note that it is
212    /// possible to deliver a message to multiple consumers if the
213    /// consumer crashes or is slow to acknowledge the delivered message.
214    /// This is a fundamental behavior present in all distributed systems
215    /// that attempt redelivery when a consumer fails to acknowledge a message.
216    /// This is known as "at least once" message processing. To achieve
217    /// "exactly once" semantics, it is necessary to implement idempotent
218    /// semantics in any system that is written to as a result of processing
219    /// a message.
220    #[serde(default, skip_serializing_if = "Option::is_none")]
221    pub deliver_subject: Option<String>,
222
223    /// Setting `durable_name` to `Some(...)` will cause this consumer
224    /// to be "durable". This may be a good choice for workloads that
225    /// benefit from the `JetStream` server or cluster remembering the
226    /// progress of consumers for fault tolerance purposes. If a consumer
227    /// crashes, the `JetStream` server or cluster will remember which
228    /// messages the consumer acknowledged. When the consumer recovers,
229    /// this information will allow the consumer to resume processing
230    /// where it left off. If you're unsure, set this to `Some(...)`.
231    ///
232    /// Setting `durable_name` to `None` will cause this consumer to
233    /// be "ephemeral". This may be a good choice for workloads where
234    /// you don't need the `JetStream` server to remember the consumer's
235    /// progress in the case of a crash, such as certain "high churn"
236    /// workloads or workloads where a crashed instance is not required
237    /// to recover.
238    #[serde(default, skip_serializing_if = "Option::is_none")]
239    pub durable_name: Option<String>,
240    /// A name of the consumer. Can be specified for both durable and ephemeral
241    /// consumers.
242    #[serde(default, skip_serializing_if = "Option::is_none")]
243    pub name: Option<String>,
244    /// A short description of the purpose of this consumer.
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub description: Option<String>,
247    /// Deliver group to use.
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub deliver_group: Option<String>,
250    /// Allows for a variety of options that determine how this consumer will receive messages
251    #[serde(flatten)]
252    pub deliver_policy: DeliverPolicy,
253    /// How messages should be acknowledged
254    pub ack_policy: AckPolicy,
255    /// How long to allow messages to remain un-acknowledged before attempting redelivery
256    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
257    pub ack_wait: Duration,
258    /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
259    #[serde(default, skip_serializing_if = "is_default")]
260    pub max_deliver: i64,
261    /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
262    #[serde(default, skip_serializing_if = "is_default")]
263    pub filter_subject: String,
264    #[cfg(feature = "server_2_10")]
265    /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
266    #[serde(default, skip_serializing_if = "is_default")]
267    pub filter_subjects: Vec<String>,
268    /// Whether messages are sent as quickly as possible or at the rate of receipt
269    pub replay_policy: ReplayPolicy,
270    /// The rate of message delivery in bits per second
271    #[serde(default, skip_serializing_if = "is_default")]
272    pub rate_limit: u64,
273    /// What percentage of acknowledgments should be samples for observability, 0-100
274    #[serde(default, skip_serializing_if = "is_default")]
275    pub sample_frequency: u8,
276    /// The maximum number of waiting consumers.
277    #[serde(default, skip_serializing_if = "is_default")]
278    pub max_waiting: i64,
279    /// The maximum number of unacknowledged messages that may be
280    /// in-flight before pausing sending additional messages to
281    /// this consumer.
282    #[serde(default, skip_serializing_if = "is_default")]
283    pub max_ack_pending: i64,
284    /// Only deliver headers without payloads.
285    #[serde(default, skip_serializing_if = "is_default")]
286    pub headers_only: bool,
287    /// Enable flow control messages
288    #[serde(default, skip_serializing_if = "is_default")]
289    pub flow_control: bool,
290    /// Enable idle heartbeat messages
291    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
292    pub idle_heartbeat: Duration,
293    /// Maximum size of a request batch
294    #[serde(default, skip_serializing_if = "is_default")]
295    pub max_batch: i64,
296    /// Maximum size of a request max_bytes
297    #[serde(default, skip_serializing_if = "is_default")]
298    pub max_bytes: i64,
299    /// Maximum value for request expiration
300    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
301    pub max_expires: Duration,
302    /// Threshold for ephemeral consumer inactivity
303    #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
304    pub inactive_threshold: Duration,
305    /// Number of consumer replicas
306    #[serde(default, skip_serializing_if = "is_default")]
307    pub num_replicas: usize,
308    /// Force consumer to use memory storage.
309    #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
310    pub memory_storage: bool,
311
312    #[cfg(feature = "server_2_10")]
313    /// Additional consumer metadata.
314    #[serde(default, skip_serializing_if = "is_default")]
315    pub metadata: HashMap<String, String>,
316    /// Custom backoff for missed acknowledgments.
317    #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
318    pub backoff: Vec<Duration>,
319}
320
321impl From<&Config> for Config {
322    fn from(cc: &Config) -> Config {
323        cc.clone()
324    }
325}
326
327impl From<&str> for Config {
328    fn from(s: &str) -> Config {
329        Config {
330            durable_name: Some(s.to_string()),
331            ..Default::default()
332        }
333    }
334}
335
336impl IntoConsumerConfig for Config {
337    fn into_consumer_config(self) -> Config {
338        self
339    }
340}
341impl IntoConsumerConfig for &Config {
342    fn into_consumer_config(self) -> Config {
343        self.clone()
344    }
345}
346
347impl FromConsumer for Config {
348    fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
349    where
350        Self: Sized,
351    {
352        Ok(config)
353    }
354}
355
356/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
357#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
358#[repr(u8)]
359#[serde(tag = "deliver_policy")]
360pub enum DeliverPolicy {
361    /// All causes the consumer to receive the oldest messages still present in the system.
362    /// This is the default.
363    #[default]
364    #[serde(rename = "all")]
365    All,
366    /// Last will start the consumer with the last sequence received.
367    #[serde(rename = "last")]
368    Last,
369    /// New will only deliver new messages that are received by the `JetStream` server
370    /// after the consumer is created.
371    #[serde(rename = "new")]
372    New,
373    /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
374    /// parameter.
375    #[serde(rename = "by_start_sequence")]
376    ByStartSequence {
377        #[serde(rename = "opt_start_seq")]
378        start_sequence: u64,
379    },
380    /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
381    /// configured `opt_start_time` parameter.
382    #[serde(rename = "by_start_time")]
383    ByStartTime {
384        #[serde(rename = "opt_start_time", with = "rfc3339")]
385        start_time: time::OffsetDateTime,
386    },
387    /// `LastPerSubject` will start the consumer with the last message
388    /// for all subjects received.
389    #[serde(rename = "last_per_subject")]
390    LastPerSubject,
391}
392
393/// Determines whether messages will be acknowledged individually,
394/// in batches, or never.
395#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
396#[repr(u8)]
397pub enum AckPolicy {
398    /// All messages will be individually acknowledged. This is the default.
399    #[default]
400    #[serde(rename = "explicit")]
401    Explicit = 2,
402    /// No messages are acknowledged.
403    #[serde(rename = "none")]
404    None = 0,
405    /// Acknowledges all messages with lower sequence numbers when a later
406    /// message is acknowledged. Useful for "batching" acknowledgment.
407    #[serde(rename = "all")]
408    All = 1,
409}
410
411/// `ReplayPolicy` controls whether messages are sent to a consumer
412/// as quickly as possible or at the rate that they were originally received at.
413#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
414#[repr(u8)]
415pub enum ReplayPolicy {
416    /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
417    #[default]
418    #[serde(rename = "instant")]
419    Instant = 0,
420    /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
421    /// is useful for replaying traffic in a testing or staging environment based on production
422    /// traffic patterns.
423    #[serde(rename = "original")]
424    Original = 1,
425}
426
427fn is_default<T: Default + Eq>(t: &T) -> bool {
428    t == &T::default()
429}
430
431#[derive(Clone, Copy, Debug, PartialEq)]
432pub enum StreamErrorKind {
433    TimedOut,
434    Other,
435}
436
437impl std::fmt::Display for StreamErrorKind {
438    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439        match self {
440            Self::TimedOut => write!(f, "timed out"),
441            Self::Other => write!(f, "failed"),
442        }
443    }
444}
445
446pub type StreamError = Error<StreamErrorKind>;