Skip to main content

ruststream_nats/
subscribe_options.rs

1//! Builder describing one NATS subscription, Core or `JetStream`.
2//!
3//! Designed to map cleanly onto the `#[subscriber(...)]` proc-macro from Phase 10 -- each
4//! keyword in the macro corresponds to a single builder method on this struct.
5
6use std::time::Duration;
7
8pub use async_nats::jetstream::consumer::DeliverPolicy;
9use ruststream::SubscriptionSource;
10
11use crate::{NatsBroker, error::NatsError, subscriber::NatsSubscriber};
12
13/// Builder describing one subscription against [`crate::NatsBroker`] (or its test counterpart).
14///
15/// Core NATS is the default. Calling [`SubscribeOptions::jetstream`] switches to a `JetStream`
16/// pull-consumer; remaining `JetStream`-only fields (durable name, ack wait, max ack pending,
17/// deliver policy, filter subject) only take effect when `jetstream(_)` was called and are
18/// rejected otherwise.
19///
20/// # Examples
21///
22/// ```
23/// use std::time::Duration;
24/// use ruststream_nats::SubscribeOptions;
25///
26/// // Core NATS subscription with a queue group.
27/// let core = SubscribeOptions::new("orders.*").queue_group("workers");
28///
29/// // JetStream pull consumer.
30/// let js = SubscribeOptions::new("orders.*")
31///     .jetstream("ORDERS")
32///     .durable("worker-1")
33///     .ack_wait(Duration::from_secs(30));
34/// # let _ = (core, js);
35/// ```
36#[derive(Debug, Clone)]
37#[must_use]
38pub struct SubscribeOptions {
39    subject: String,
40    queue_group: Option<String>,
41    stream: Option<String>,
42    durable: Option<String>,
43    filter_subject: Option<String>,
44    ack_wait: Option<Duration>,
45    max_ack_pending: Option<i64>,
46    deliver_policy: Option<DeliverPolicy>,
47    pull_batch: Option<usize>,
48    pull_expires: Option<Duration>,
49}
50
51impl SubscribeOptions {
52    /// Constructs a fresh Core NATS subscription options set for `subject`.
53    pub fn new(subject: impl Into<String>) -> Self {
54        Self {
55            subject: subject.into(),
56            queue_group: None,
57            stream: None,
58            durable: None,
59            filter_subject: None,
60            ack_wait: None,
61            max_ack_pending: None,
62            deliver_policy: None,
63            pull_batch: None,
64            pull_expires: None,
65        }
66    }
67
68    /// Sets the Core NATS queue group used for load-balanced delivery. Mutually exclusive with
69    /// [`Self::jetstream`].
70    pub fn queue_group(mut self, name: impl Into<String>) -> Self {
71        self.queue_group = Some(name.into());
72        self
73    }
74
75    /// Switches the subscription to a `JetStream` pull-consumer backed by the named stream.
76    pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
77        self.stream = Some(stream.into());
78        self
79    }
80
81    /// Marks the `JetStream` consumer as durable under the given name. Without this the
82    /// consumer is ephemeral and discarded when the subscriber drops.
83    pub fn durable(mut self, name: impl Into<String>) -> Self {
84        self.durable = Some(name.into());
85        self
86    }
87
88    /// Narrows which subjects of the parent `JetStream` stream this consumer reads. Defaults to
89    /// [`Self::subject`] when not set.
90    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
91        self.filter_subject = Some(subject.into());
92        self
93    }
94
95    /// Per-message acknowledgement timeout. After this window an unacked delivery is
96    /// redelivered.
97    pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
98        self.ack_wait = Some(ack_wait);
99        self
100    }
101
102    /// Soft cap on in-flight unacked deliveries.
103    pub const fn max_ack_pending(mut self, max: i64) -> Self {
104        self.max_ack_pending = Some(max);
105        self
106    }
107
108    /// `JetStream` delivery starting position. Defaults to `DeliverPolicy::All` when unset.
109    pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
110        self.deliver_policy = Some(policy);
111        self
112    }
113
114    /// Upper bound on messages per batch when the subscriber is driven through
115    /// [`BatchSubscriber::batches`](ruststream::BatchSubscriber::batches): one batch is one
116    /// `JetStream` fetch of up to this many messages. Defaults to 100. Has no effect on the
117    /// per-message [`Subscriber::stream`](ruststream::Subscriber::stream) path.
118    pub const fn pull_batch(mut self, max_messages: usize) -> Self {
119        self.pull_batch = Some(max_messages);
120        self
121    }
122
123    /// How long one `JetStream` fetch waits before delivering a partial (or retrying an empty)
124    /// batch. Defaults to 5 seconds. Has no effect on the per-message
125    /// [`Subscriber::stream`](ruststream::Subscriber::stream) path.
126    pub const fn pull_expires(mut self, expires: Duration) -> Self {
127        self.pull_expires = Some(expires);
128        self
129    }
130
131    /// The subject pattern this subscription will receive messages on.
132    #[must_use]
133    pub fn subject(&self) -> &str {
134        &self.subject
135    }
136
137    /// True when [`Self::jetstream`] has been set.
138    #[must_use]
139    pub const fn is_jetstream(&self) -> bool {
140        self.stream.is_some()
141    }
142
143    pub(crate) fn queue_group_ref(&self) -> Option<&str> {
144        self.queue_group.as_deref()
145    }
146
147    pub(crate) fn stream_ref(&self) -> Option<&str> {
148        self.stream.as_deref()
149    }
150
151    pub(crate) fn durable_ref(&self) -> Option<&str> {
152        self.durable.as_deref()
153    }
154
155    pub(crate) fn filter_subject_or_default(&self) -> String {
156        self.filter_subject
157            .clone()
158            .unwrap_or_else(|| self.subject.clone())
159    }
160
161    pub(crate) fn ack_wait_or_default(&self) -> Duration {
162        self.ack_wait.unwrap_or(Duration::from_secs(30))
163    }
164
165    pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
166        self.max_ack_pending.unwrap_or(1024)
167    }
168
169    pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
170        self.deliver_policy.unwrap_or(DeliverPolicy::All)
171    }
172
173    pub(crate) fn pull_batch_or_default(&self) -> usize {
174        self.pull_batch.unwrap_or(100)
175    }
176
177    pub(crate) fn pull_expires_or_default(&self) -> Duration {
178        self.pull_expires.unwrap_or(Duration::from_secs(5))
179    }
180
181    /// Rejects combinations the broker cannot honour. Called by every `subscribe`
182    /// implementation before any work.
183    ///
184    /// # Errors
185    ///
186    /// Returns [`NatsError::InvalidOptions`] when:
187    /// * `queue_group` is set together with `jetstream` (queue groups are Core-only);
188    /// * any `JetStream`-only field (`durable`, `ack_wait`, `max_ack_pending`,
189    ///   `deliver_policy`, `filter_subject`, `pull_batch`, `pull_expires`) is set without
190    ///   `jetstream`.
191    pub fn validate(&self) -> Result<(), NatsError> {
192        if self.subject.is_empty() {
193            return Err(NatsError::InvalidOptions(
194                "subject must be non-empty".into(),
195            ));
196        }
197        if self.stream.is_some() && self.queue_group.is_some() {
198            return Err(NatsError::InvalidOptions(
199                "queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
200            ));
201        }
202        if self.stream.is_none() {
203            let js_only = [
204                ("durable", self.durable.is_some()),
205                ("ack_wait", self.ack_wait.is_some()),
206                ("max_ack_pending", self.max_ack_pending.is_some()),
207                ("deliver_policy", self.deliver_policy.is_some()),
208                ("filter_subject", self.filter_subject.is_some()),
209                ("pull_batch", self.pull_batch.is_some()),
210                ("pull_expires", self.pull_expires.is_some()),
211            ];
212            if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
213                return Err(NatsError::InvalidOptions(format!(
214                    "{field}(_) requires jetstream(stream_name)"
215                )));
216            }
217        }
218        Ok(())
219    }
220}
221
222impl SubscriptionSource<NatsBroker> for SubscribeOptions {
223    type Subscriber = NatsSubscriber;
224
225    fn name(&self) -> &str {
226        self.subject()
227    }
228
229    async fn subscribe(self, broker: &NatsBroker) -> Result<Self::Subscriber, NatsError> {
230        broker.subscribe(self).await
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn core_subscription_validates() {
240        let opts = SubscribeOptions::new("orders.*");
241        opts.validate().expect("core ok");
242        assert!(!opts.is_jetstream());
243    }
244
245    #[test]
246    fn core_with_queue_group_validates() {
247        SubscribeOptions::new("x")
248            .queue_group("workers")
249            .validate()
250            .expect("queue group ok on core");
251    }
252
253    #[test]
254    fn jetstream_subscription_validates() {
255        let opts = SubscribeOptions::new("orders.*")
256            .jetstream("ORDERS")
257            .durable("worker")
258            .ack_wait(Duration::from_secs(5))
259            .max_ack_pending(64)
260            .deliver_policy(DeliverPolicy::New);
261        opts.validate().expect("jetstream ok");
262        assert!(opts.is_jetstream());
263    }
264
265    #[test]
266    fn durable_without_jetstream_is_rejected() {
267        let err = SubscribeOptions::new("x")
268            .durable("worker")
269            .validate()
270            .unwrap_err();
271        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
272    }
273
274    #[test]
275    fn ack_wait_without_jetstream_is_rejected() {
276        let err = SubscribeOptions::new("x")
277            .ack_wait(Duration::from_secs(1))
278            .validate()
279            .unwrap_err();
280        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
281    }
282
283    #[test]
284    fn queue_group_plus_jetstream_is_rejected() {
285        let err = SubscribeOptions::new("x")
286            .queue_group("w")
287            .jetstream("S")
288            .validate()
289            .unwrap_err();
290        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
291    }
292
293    #[test]
294    fn pull_batch_without_jetstream_is_rejected() {
295        let err = SubscribeOptions::new("x")
296            .pull_batch(64)
297            .validate()
298            .unwrap_err();
299        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("pull_batch")));
300    }
301
302    #[test]
303    fn empty_subject_is_rejected() {
304        let err = SubscribeOptions::new("").validate().unwrap_err();
305        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
306    }
307}