async_nats/jetstream/consumer/mod.rs
1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13//
14//! Push and Pull [Consumer] API.
15
16pub mod pull;
17pub mod push;
18#[cfg(feature = "server_2_10")]
19use std::collections::HashMap;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24use time::serde::rfc3339;
25
26#[cfg(feature = "server_2_11")]
27use time::OffsetDateTime;
28
29use super::context::{ConsumerInfoError, RequestError};
30use super::response::Response;
31use super::stream::ClusterInfo;
32use super::Context;
33use crate::error::Error;
34use crate::jetstream::consumer;
35
36pub trait IntoConsumerConfig {
37 fn into_consumer_config(self) -> Config;
38}
39
40#[allow(dead_code)]
41#[derive(Clone, Debug)]
42pub struct Consumer<T: IntoConsumerConfig> {
43 pub(crate) context: Context,
44 pub(crate) config: T,
45 pub(crate) info: Info,
46}
47
48impl<T: IntoConsumerConfig> Consumer<T> {
49 pub fn new(config: T, info: consumer::Info, context: Context) -> Self {
50 Self {
51 config,
52 info,
53 context,
54 }
55 }
56}
57impl<T: IntoConsumerConfig> Consumer<T> {
58 /// Retrieves `info` about [Consumer] from the server, updates the cached `info` inside
59 /// [Consumer] and returns it.
60 ///
61 /// # Examples
62 ///
63 /// ```no_run
64 /// # #[tokio::main]
65 /// # async fn main() -> Result<(), async_nats::Error> {
66 /// use async_nats::jetstream::consumer::PullConsumer;
67 /// let client = async_nats::connect("localhost:4222").await?;
68 /// let jetstream = async_nats::jetstream::new(client);
69 ///
70 /// let mut consumer: PullConsumer = jetstream
71 /// .get_stream("events")
72 /// .await?
73 /// .get_consumer("pull")
74 /// .await?;
75 ///
76 /// let info = consumer.info().await?;
77 /// # Ok(())
78 /// # }
79 /// ```
80 pub async fn info(&mut self) -> Result<&consumer::Info, ConsumerInfoError> {
81 let info = self.get_info().await?;
82 self.info = info;
83 Ok(&self.info)
84 }
85
86 /// Retrieves `info` about [Consumer] from the server. Does not update the cache.
87 ///
88 /// # Examples
89 ///
90 /// ```no_run
91 /// # #[tokio::main]
92 /// # async fn main() -> Result<(), async_nats::Error> {
93 /// use async_nats::jetstream::consumer::PullConsumer;
94 /// let client = async_nats::connect("localhost:4222").await?;
95 /// let jetstream = async_nats::jetstream::new(client);
96 ///
97 /// let mut consumer: PullConsumer = jetstream
98 /// .get_stream("events")
99 /// .await?
100 /// .get_consumer("pull")
101 /// .await?;
102 ///
103 /// let info = consumer.get_info().await?;
104 /// # Ok(())
105 /// # }
106 /// ```
107 pub async fn get_info(&self) -> Result<consumer::Info, ConsumerInfoError> {
108 let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
109 match self.context.request(subject, &json!({})).await? {
110 Response::Err { error } => Err(error.into()),
111 Response::Ok(info) => Ok(info),
112 }
113 }
114
115 /// Returns cached [Info] for the [Consumer].
116 /// Cache is either from initial creation/retrieval of the [Consumer] or last call to
117 /// [Info].
118 ///
119 /// # Examples
120 ///
121 /// ```no_run
122 /// # #[tokio::main]
123 /// # async fn main() -> Result<(), async_nats::Error> {
124 /// use async_nats::jetstream::consumer::PullConsumer;
125 /// let client = async_nats::connect("localhost:4222").await?;
126 /// let jetstream = async_nats::jetstream::new(client);
127 ///
128 /// let consumer: PullConsumer = jetstream
129 /// .get_stream("events")
130 /// .await?
131 /// .get_consumer("pull")
132 /// .await?;
133 ///
134 /// let info = consumer.cached_info();
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub fn cached_info(&self) -> &consumer::Info {
139 &self.info
140 }
141}
142
143/// Trait used to convert generic [Stream Config][crate::jetstream::consumer::Config] into either
144/// [Pull][crate::jetstream::consumer::pull::Config] or
145/// [Push][crate::jetstream::consumer::push::Config] config. It validates if given config is
146/// a valid target one.
147pub trait FromConsumer {
148 fn try_from_consumer_config(
149 config: crate::jetstream::consumer::Config,
150 ) -> Result<Self, crate::Error>
151 where
152 Self: Sized;
153}
154
155pub type PullConsumer = Consumer<self::pull::Config>;
156pub type PushConsumer = Consumer<self::push::Config>;
157pub type OrderedPullConsumer = Consumer<self::pull::OrderedConfig>;
158pub type OrderedPushConsumer = Consumer<self::push::OrderedConfig>;
159
160/// Information about a consumer
161#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
162pub struct Info {
163 /// The stream being consumed
164 pub stream_name: String,
165 /// The consumer's unique name
166 pub name: String,
167 /// The time the consumer was created
168 #[serde(with = "rfc3339")]
169 pub created: time::OffsetDateTime,
170 /// The consumer's configuration
171 pub config: Config,
172 /// Statistics for delivered messages
173 pub delivered: SequenceInfo,
174 /// Statistics for acknowledged messages
175 pub ack_floor: SequenceInfo,
176 /// The number of messages delivered but not yet acknowledged
177 pub num_ack_pending: usize,
178 /// The number of messages re-sent after acknowledgment was not received within the configured
179 /// time threshold
180 pub num_redelivered: usize,
181 /// The number of pull requests waiting for messages
182 pub num_waiting: usize,
183 /// The number of messages pending delivery
184 pub num_pending: u64,
185 /// Information about the consumer's cluster
186 #[serde(skip_serializing_if = "is_default")]
187 pub cluster: Option<ClusterInfo>,
188 /// Indicates if any client is connected and receiving messages from a push consumer
189 #[serde(default, skip_serializing_if = "is_default")]
190 pub push_bound: bool,
191 #[cfg(feature = "server_2_11")]
192 /// Indicates if the consumer is paused
193 #[serde(default)]
194 pub paused: bool,
195 #[cfg(feature = "server_2_11")]
196 /// The remaining time the consumer is paused
197 #[serde(default, with = "serde_nanos")]
198 pub pause_remaining: Option<Duration>,
199}
200
201/// Information about a consumer and the stream it is consuming
202#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
203pub struct SequenceInfo {
204 /// How far along the consumer has progressed
205 #[serde(rename = "consumer_seq")]
206 pub consumer_sequence: u64,
207 /// The aggregate for all stream consumers
208 #[serde(rename = "stream_seq")]
209 pub stream_sequence: u64,
210 // Last activity for the sequence
211 #[serde(
212 default,
213 with = "rfc3339::option",
214 skip_serializing_if = "Option::is_none"
215 )]
216 pub last_active: Option<time::OffsetDateTime>,
217}
218
219/// Configuration for consumers. From a high level, the
220/// `durable_name` and `deliver_subject` fields have a particularly
221/// strong influence on the consumer's overall behavior.
222#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
223pub struct Config {
224 /// Setting `deliver_subject` to `Some(...)` will cause this consumer
225 /// to be "push-based". This is analogous in some ways to a normal
226 /// NATS subscription (rather than a queue subscriber) in that the
227 /// consumer will receive all messages published to the stream that
228 /// the consumer is interested in. Acknowledgment policies such as
229 /// `AckPolicy::None` and `AckPolicy::All` may be enabled for such
230 /// push-based consumers, which reduce the amount of effort spent
231 /// tracking delivery. Combining `AckPolicy::All` with
232 /// `Consumer::process_batch` enables particularly nice throughput
233 /// optimizations.
234 ///
235 /// Setting `deliver_subject` to `None` will cause this consumer to
236 /// be "pull-based", and will require explicit acknowledgment of
237 /// each message. This is analogous in some ways to a normal NATS
238 /// queue subscriber, where a message will be delivered to a single
239 /// subscriber. Pull-based consumers are intended to be used for
240 /// workloads where it is desirable to have a single process receive
241 /// a message. The only valid `ack_policy` for pull-based consumers
242 /// is the default of `AckPolicy::Explicit`, which acknowledges each
243 /// processed message individually. Pull-based consumers may be a
244 /// good choice for work queue-like workloads where you want messages
245 /// to be handled by a single consumer process. Note that it is
246 /// possible to deliver a message to multiple consumers if the
247 /// consumer crashes or is slow to acknowledge the delivered message.
248 /// This is a fundamental behavior present in all distributed systems
249 /// that attempt redelivery when a consumer fails to acknowledge a message.
250 /// This is known as "at least once" message processing. To achieve
251 /// "exactly once" semantics, it is necessary to implement idempotent
252 /// semantics in any system that is written to as a result of processing
253 /// a message.
254 #[serde(default, skip_serializing_if = "Option::is_none")]
255 pub deliver_subject: Option<String>,
256
257 /// Setting `durable_name` to `Some(...)` will cause this consumer
258 /// to be "durable". This may be a good choice for workloads that
259 /// benefit from the `JetStream` server or cluster remembering the
260 /// progress of consumers for fault tolerance purposes. If a consumer
261 /// crashes, the `JetStream` server or cluster will remember which
262 /// messages the consumer acknowledged. When the consumer recovers,
263 /// this information will allow the consumer to resume processing
264 /// where it left off. If you're unsure, set this to `Some(...)`.
265 ///
266 /// Setting `durable_name` to `None` will cause this consumer to
267 /// be "ephemeral". This may be a good choice for workloads where
268 /// you don't need the `JetStream` server to remember the consumer's
269 /// progress in the case of a crash, such as certain "high churn"
270 /// workloads or workloads where a crashed instance is not required
271 /// to recover.
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub durable_name: Option<String>,
274 /// A name of the consumer. Can be specified for both durable and ephemeral
275 /// consumers.
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub name: Option<String>,
278 /// A short description of the purpose of this consumer.
279 #[serde(default, skip_serializing_if = "Option::is_none")]
280 pub description: Option<String>,
281 /// Deliver group to use.
282 #[serde(default, skip_serializing_if = "Option::is_none")]
283 pub deliver_group: Option<String>,
284 /// Allows for a variety of options that determine how this consumer will receive messages
285 #[serde(flatten)]
286 pub deliver_policy: DeliverPolicy,
287 /// How messages should be acknowledged
288 pub ack_policy: AckPolicy,
289 /// How long to allow messages to remain un-acknowledged before attempting redelivery
290 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
291 pub ack_wait: Duration,
292 /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
293 #[serde(default, skip_serializing_if = "is_default")]
294 pub max_deliver: i64,
295 /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
296 #[serde(default, skip_serializing_if = "is_default")]
297 pub filter_subject: String,
298 #[cfg(feature = "server_2_10")]
299 /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
300 #[serde(default, skip_serializing_if = "is_default")]
301 pub filter_subjects: Vec<String>,
302 /// Whether messages are sent as quickly as possible or at the rate of receipt
303 pub replay_policy: ReplayPolicy,
304 /// The rate of message delivery in bits per second
305 #[serde(default, skip_serializing_if = "is_default")]
306 pub rate_limit: u64,
307 /// What percentage of acknowledgments should be samples for observability, 0-100
308 #[serde(
309 rename = "sample_freq",
310 with = "sample_freq_deser",
311 default,
312 skip_serializing_if = "is_default"
313 )]
314 pub sample_frequency: u8,
315 /// The maximum number of waiting consumers.
316 #[serde(default, skip_serializing_if = "is_default")]
317 pub max_waiting: i64,
318 /// The maximum number of unacknowledged messages that may be
319 /// in-flight before pausing sending additional messages to
320 /// this consumer.
321 #[serde(default, skip_serializing_if = "is_default")]
322 pub max_ack_pending: i64,
323 /// Only deliver headers without payloads.
324 #[serde(default, skip_serializing_if = "is_default")]
325 pub headers_only: bool,
326 /// Enable flow control messages
327 #[serde(default, skip_serializing_if = "is_default")]
328 pub flow_control: bool,
329 /// Enable idle heartbeat messages
330 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
331 pub idle_heartbeat: Duration,
332 /// Maximum size of a request batch
333 #[serde(default, skip_serializing_if = "is_default")]
334 pub max_batch: i64,
335 /// Maximum size of a request max_bytes
336 #[serde(default, skip_serializing_if = "is_default")]
337 pub max_bytes: i64,
338 /// Maximum value for request expiration
339 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
340 pub max_expires: Duration,
341 /// Threshold for ephemeral consumer inactivity
342 #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
343 pub inactive_threshold: Duration,
344 /// Number of consumer replicas
345 #[serde(default, skip_serializing_if = "is_default")]
346 pub num_replicas: usize,
347 /// Force consumer to use memory storage.
348 #[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
349 pub memory_storage: bool,
350
351 #[cfg(feature = "server_2_10")]
352 /// Additional consumer metadata.
353 #[serde(default, skip_serializing_if = "is_default")]
354 pub metadata: HashMap<String, String>,
355 /// Custom backoff for missed acknowledgments.
356 #[serde(default, skip_serializing_if = "is_default", with = "serde_nanos")]
357 pub backoff: Vec<Duration>,
358 #[cfg(feature = "server_2_11")]
359 #[serde(default, skip_serializing_if = "is_default")]
360 pub priority_policy: PriorityPolicy,
361 #[cfg(feature = "server_2_11")]
362 #[serde(default, skip_serializing_if = "is_default")]
363 pub priority_groups: Vec<String>,
364 /// For suspending the consumer until the deadline.
365 #[cfg(feature = "server_2_11")]
366 #[serde(
367 default,
368 with = "rfc3339::option",
369 skip_serializing_if = "Option::is_none"
370 )]
371 pub pause_until: Option<OffsetDateTime>,
372}
373
374#[cfg(feature = "server_2_11")]
375#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
376pub enum PriorityPolicy {
377 #[serde(rename = "overflow")]
378 Overflow,
379 /// This feature is not yet supported by the client.
380 /// It's part of the enum to ensure that the client can deserialize
381 /// Consumer configurations that used [PriorityPolicy::PinnedClient].
382 #[serde(rename = "pinned_client")]
383 PinnedClient,
384 #[cfg(feature = "server_2_12")]
385 #[serde(rename = "prioritized")]
386 Prioritized,
387 #[serde(rename = "none")]
388 #[default]
389 None,
390}
391
392impl From<&Config> for Config {
393 fn from(cc: &Config) -> Config {
394 cc.clone()
395 }
396}
397
398impl From<&str> for Config {
399 fn from(s: &str) -> Config {
400 Config {
401 durable_name: Some(s.to_string()),
402 ..Default::default()
403 }
404 }
405}
406
407impl IntoConsumerConfig for Config {
408 fn into_consumer_config(self) -> Config {
409 self
410 }
411}
412impl IntoConsumerConfig for &Config {
413 fn into_consumer_config(self) -> Config {
414 self.clone()
415 }
416}
417
418impl FromConsumer for Config {
419 fn try_from_consumer_config(config: Config) -> Result<Self, crate::Error>
420 where
421 Self: Sized,
422 {
423 Ok(config)
424 }
425}
426
427/// `DeliverPolicy` determines how the consumer should select the first message to deliver.
428#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
429#[repr(u8)]
430#[serde(tag = "deliver_policy")]
431pub enum DeliverPolicy {
432 /// All causes the consumer to receive the oldest messages still present in the system.
433 /// This is the default.
434 #[default]
435 #[serde(rename = "all")]
436 All,
437 /// Last will start the consumer with the last sequence received.
438 #[serde(rename = "last")]
439 Last,
440 /// New will only deliver new messages that are received by the `JetStream` server
441 /// after the consumer is created.
442 #[serde(rename = "new")]
443 New,
444 /// `ByStartSeq` will look for a defined starting sequence to the consumer's configured `opt_start_seq`
445 /// parameter.
446 #[serde(rename = "by_start_sequence")]
447 ByStartSequence {
448 #[serde(rename = "opt_start_seq")]
449 start_sequence: u64,
450 },
451 /// `ByStartTime` will select the first message with a timestamp >= to the consumer's
452 /// configured `opt_start_time` parameter.
453 #[serde(rename = "by_start_time")]
454 ByStartTime {
455 #[serde(rename = "opt_start_time", with = "rfc3339")]
456 start_time: time::OffsetDateTime,
457 },
458 /// `LastPerSubject` will start the consumer with the last message
459 /// for all subjects received.
460 #[serde(rename = "last_per_subject")]
461 LastPerSubject,
462}
463
464/// Determines whether messages will be acknowledged individually,
465/// in batches, or never.
466#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
467#[repr(u8)]
468pub enum AckPolicy {
469 /// All messages will be individually acknowledged. This is the default.
470 #[default]
471 #[serde(rename = "explicit")]
472 Explicit = 2,
473 /// No messages are acknowledged.
474 #[serde(rename = "none")]
475 None = 0,
476 /// Acknowledges all messages with lower sequence numbers when a later
477 /// message is acknowledged. Useful for "batching" acknowledgment.
478 #[serde(rename = "all")]
479 All = 1,
480}
481
482/// `ReplayPolicy` controls whether messages are sent to a consumer
483/// as quickly as possible or at the rate that they were originally received at.
484#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
485#[repr(u8)]
486pub enum ReplayPolicy {
487 /// Sends all messages in a stream to the consumer as quickly as possible. This is the default.
488 #[default]
489 #[serde(rename = "instant")]
490 Instant = 0,
491 /// Sends messages to a consumer in a rate-limited fashion based on the rate of receipt. This
492 /// is useful for replaying traffic in a testing or staging environment based on production
493 /// traffic patterns.
494 #[serde(rename = "original")]
495 Original = 1,
496}
497
498fn is_default<T: Default + Eq>(t: &T) -> bool {
499 t == &T::default()
500}
501
502pub(crate) mod sample_freq_deser {
503 pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
504 where
505 T: std::str::FromStr,
506 T::Err: std::fmt::Display,
507 D: serde::Deserializer<'de>,
508 {
509 let s = <String as serde::Deserialize>::deserialize(deserializer)?;
510
511 let mut spliterator = s.split('%');
512 match (spliterator.next(), spliterator.next()) {
513 // No percentage occurred, parse as number
514 (Some(number), None) => T::from_str(number).map_err(serde::de::Error::custom),
515 // A percentage sign occurred right at the end
516 (Some(number), Some("")) => T::from_str(number).map_err(serde::de::Error::custom),
517 _ => Err(serde::de::Error::custom(format!(
518 "Malformed sample frequency: {s}"
519 ))),
520 }
521 }
522
523 pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
524 where
525 T: std::fmt::Display,
526 S: serde::Serializer,
527 {
528 serializer.serialize_str(&value.to_string())
529 }
530}
531
532#[derive(Clone, Copy, Debug, PartialEq)]
533pub enum StreamErrorKind {
534 TimedOut,
535 Other,
536}
537
538impl std::fmt::Display for StreamErrorKind {
539 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540 match self {
541 Self::TimedOut => write!(f, "timed out"),
542 Self::Other => write!(f, "failed"),
543 }
544 }
545}
546
547pub type StreamError = Error<StreamErrorKind>;
548
549fn backoff(attempt: u32, _: &impl std::error::Error) -> Duration {
550 if attempt < 5 {
551 Duration::from_millis(500 * attempt as u64)
552 } else {
553 Duration::from_secs(10)
554 }
555}