Skip to main content

ruststream_nats/
subscribe_options.rs

1//! Builder describing one NATS subscription, Core or `JetStream`.
2//!
3//! Designed to map cleanly onto the `#[subscriber(...)]` proc-macro from Phase 10 -- each
4//! keyword in the macro corresponds to a single builder method on this struct.
5
6use std::time::Duration;
7
8pub use async_nats::jetstream::consumer::DeliverPolicy;
9
10use crate::error::NatsError;
11
12/// Builder describing one subscription against [`crate::NatsBroker`] (or its test counterpart).
13///
14/// Core NATS is the default. Calling [`SubscribeOptions::jetstream`] switches to a `JetStream`
15/// pull-consumer; remaining `JetStream`-only fields (durable name, ack wait, max ack pending,
16/// deliver policy, filter subject) only take effect when `jetstream(_)` was called and are
17/// rejected otherwise.
18///
19/// # Examples
20///
21/// ```
22/// use std::time::Duration;
23/// use ruststream_nats::SubscribeOptions;
24///
25/// // Core NATS subscription with a queue group.
26/// let core = SubscribeOptions::new("orders.*").queue_group("workers");
27///
28/// // JetStream pull consumer.
29/// let js = SubscribeOptions::new("orders.*")
30///     .jetstream("ORDERS")
31///     .durable("worker-1")
32///     .ack_wait(Duration::from_secs(30));
33/// # let _ = (core, js);
34/// ```
35#[derive(Debug, Clone)]
36#[must_use]
37pub struct SubscribeOptions {
38    subject: String,
39    queue_group: Option<String>,
40    stream: Option<String>,
41    durable: Option<String>,
42    filter_subject: Option<String>,
43    ack_wait: Option<Duration>,
44    max_ack_pending: Option<i64>,
45    deliver_policy: Option<DeliverPolicy>,
46}
47
48impl SubscribeOptions {
49    /// Constructs a fresh Core NATS subscription options set for `subject`.
50    pub fn new(subject: impl Into<String>) -> Self {
51        Self {
52            subject: subject.into(),
53            queue_group: None,
54            stream: None,
55            durable: None,
56            filter_subject: None,
57            ack_wait: None,
58            max_ack_pending: None,
59            deliver_policy: None,
60        }
61    }
62
63    /// Sets the Core NATS queue group used for load-balanced delivery. Mutually exclusive with
64    /// [`Self::jetstream`].
65    pub fn queue_group(mut self, name: impl Into<String>) -> Self {
66        self.queue_group = Some(name.into());
67        self
68    }
69
70    /// Switches the subscription to a `JetStream` pull-consumer backed by the named stream.
71    pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
72        self.stream = Some(stream.into());
73        self
74    }
75
76    /// Marks the `JetStream` consumer as durable under the given name. Without this the
77    /// consumer is ephemeral and discarded when the subscriber drops.
78    pub fn durable(mut self, name: impl Into<String>) -> Self {
79        self.durable = Some(name.into());
80        self
81    }
82
83    /// Narrows which subjects of the parent `JetStream` stream this consumer reads. Defaults to
84    /// [`Self::subject`] when not set.
85    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
86        self.filter_subject = Some(subject.into());
87        self
88    }
89
90    /// Per-message acknowledgement timeout. After this window an unacked delivery is
91    /// redelivered.
92    pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
93        self.ack_wait = Some(ack_wait);
94        self
95    }
96
97    /// Soft cap on in-flight unacked deliveries.
98    pub const fn max_ack_pending(mut self, max: i64) -> Self {
99        self.max_ack_pending = Some(max);
100        self
101    }
102
103    /// `JetStream` delivery starting position. Defaults to `DeliverPolicy::All` when unset.
104    pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
105        self.deliver_policy = Some(policy);
106        self
107    }
108
109    /// The subject pattern this subscription will receive messages on.
110    #[must_use]
111    pub fn subject(&self) -> &str {
112        &self.subject
113    }
114
115    /// True when [`Self::jetstream`] has been set.
116    #[must_use]
117    pub const fn is_jetstream(&self) -> bool {
118        self.stream.is_some()
119    }
120
121    pub(crate) fn queue_group_ref(&self) -> Option<&str> {
122        self.queue_group.as_deref()
123    }
124
125    pub(crate) fn stream_ref(&self) -> Option<&str> {
126        self.stream.as_deref()
127    }
128
129    pub(crate) fn durable_ref(&self) -> Option<&str> {
130        self.durable.as_deref()
131    }
132
133    pub(crate) fn filter_subject_or_default(&self) -> String {
134        self.filter_subject
135            .clone()
136            .unwrap_or_else(|| self.subject.clone())
137    }
138
139    pub(crate) fn ack_wait_or_default(&self) -> Duration {
140        self.ack_wait.unwrap_or(Duration::from_secs(30))
141    }
142
143    pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
144        self.max_ack_pending.unwrap_or(1024)
145    }
146
147    pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
148        self.deliver_policy.unwrap_or(DeliverPolicy::All)
149    }
150
151    /// Rejects combinations the broker cannot honour. Called by every `subscribe`
152    /// implementation before any work.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`NatsError::InvalidOptions`] when:
157    /// * `queue_group` is set together with `jetstream` (queue groups are Core-only);
158    /// * any `JetStream`-only field (`durable`, `ack_wait`, `max_ack_pending`,
159    ///   `deliver_policy`, `filter_subject`) is set without `jetstream`.
160    pub fn validate(&self) -> Result<(), NatsError> {
161        if self.subject.is_empty() {
162            return Err(NatsError::InvalidOptions(
163                "subject must be non-empty".into(),
164            ));
165        }
166        if self.stream.is_some() && self.queue_group.is_some() {
167            return Err(NatsError::InvalidOptions(
168                "queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
169            ));
170        }
171        if self.stream.is_none() {
172            let js_only = [
173                ("durable", self.durable.is_some()),
174                ("ack_wait", self.ack_wait.is_some()),
175                ("max_ack_pending", self.max_ack_pending.is_some()),
176                ("deliver_policy", self.deliver_policy.is_some()),
177                ("filter_subject", self.filter_subject.is_some()),
178            ];
179            if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
180                return Err(NatsError::InvalidOptions(format!(
181                    "{field}(_) requires jetstream(stream_name)"
182                )));
183            }
184        }
185        Ok(())
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn core_subscription_validates() {
195        let opts = SubscribeOptions::new("orders.*");
196        opts.validate().expect("core ok");
197        assert!(!opts.is_jetstream());
198    }
199
200    #[test]
201    fn core_with_queue_group_validates() {
202        SubscribeOptions::new("x")
203            .queue_group("workers")
204            .validate()
205            .expect("queue group ok on core");
206    }
207
208    #[test]
209    fn jetstream_subscription_validates() {
210        let opts = SubscribeOptions::new("orders.*")
211            .jetstream("ORDERS")
212            .durable("worker")
213            .ack_wait(Duration::from_secs(5))
214            .max_ack_pending(64)
215            .deliver_policy(DeliverPolicy::New);
216        opts.validate().expect("jetstream ok");
217        assert!(opts.is_jetstream());
218    }
219
220    #[test]
221    fn durable_without_jetstream_is_rejected() {
222        let err = SubscribeOptions::new("x")
223            .durable("worker")
224            .validate()
225            .unwrap_err();
226        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
227    }
228
229    #[test]
230    fn ack_wait_without_jetstream_is_rejected() {
231        let err = SubscribeOptions::new("x")
232            .ack_wait(Duration::from_secs(1))
233            .validate()
234            .unwrap_err();
235        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
236    }
237
238    #[test]
239    fn queue_group_plus_jetstream_is_rejected() {
240        let err = SubscribeOptions::new("x")
241            .queue_group("w")
242            .jetstream("S")
243            .validate()
244            .unwrap_err();
245        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
246    }
247
248    #[test]
249    fn empty_subject_is_rejected() {
250        let err = SubscribeOptions::new("").validate().unwrap_err();
251        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
252    }
253}