Skip to main content

ruststream_nats/
subscribe_options.rs

1//! Builder describing one NATS subscription, Core or `JetStream`.
2//!
3//! Designed to map cleanly onto the `#[subscriber(...)]` proc-macro from Phase 10 -- each
4//! keyword in the macro corresponds to a single builder method on this struct.
5
6use std::time::Duration;
7
8pub use async_nats::jetstream::consumer::DeliverPolicy;
9use ruststream::SubscriptionSource;
10
11use crate::{NatsBroker, error::NatsError, subscriber::NatsSubscriber};
12
13/// Builder describing one subscription against [`crate::NatsBroker`] (or its test counterpart).
14///
15/// Core NATS is the default. Calling [`SubscribeOptions::jetstream`] switches to a `JetStream`
16/// pull-consumer; remaining `JetStream`-only fields (durable name, ack wait, max ack pending,
17/// deliver policy, filter subject) only take effect when `jetstream(_)` was called and are
18/// rejected otherwise.
19///
20/// # Examples
21///
22/// ```
23/// use std::time::Duration;
24/// use ruststream_nats::SubscribeOptions;
25///
26/// // Core NATS subscription with a queue group.
27/// let core = SubscribeOptions::new("orders.*").queue_group("workers");
28///
29/// // JetStream pull consumer.
30/// let js = SubscribeOptions::new("orders.*")
31///     .jetstream("ORDERS")
32///     .durable("worker-1")
33///     .ack_wait(Duration::from_secs(30));
34/// # let _ = (core, js);
35/// ```
36#[derive(Debug, Clone)]
37#[must_use]
38pub struct SubscribeOptions {
39    subject: String,
40    queue_group: Option<String>,
41    stream: Option<String>,
42    durable: Option<String>,
43    filter_subject: Option<String>,
44    ack_wait: Option<Duration>,
45    max_ack_pending: Option<i64>,
46    deliver_policy: Option<DeliverPolicy>,
47}
48
49impl SubscribeOptions {
50    /// Constructs a fresh Core NATS subscription options set for `subject`.
51    pub fn new(subject: impl Into<String>) -> Self {
52        Self {
53            subject: subject.into(),
54            queue_group: None,
55            stream: None,
56            durable: None,
57            filter_subject: None,
58            ack_wait: None,
59            max_ack_pending: None,
60            deliver_policy: None,
61        }
62    }
63
64    /// Sets the Core NATS queue group used for load-balanced delivery. Mutually exclusive with
65    /// [`Self::jetstream`].
66    pub fn queue_group(mut self, name: impl Into<String>) -> Self {
67        self.queue_group = Some(name.into());
68        self
69    }
70
71    /// Switches the subscription to a `JetStream` pull-consumer backed by the named stream.
72    pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
73        self.stream = Some(stream.into());
74        self
75    }
76
77    /// Marks the `JetStream` consumer as durable under the given name. Without this the
78    /// consumer is ephemeral and discarded when the subscriber drops.
79    pub fn durable(mut self, name: impl Into<String>) -> Self {
80        self.durable = Some(name.into());
81        self
82    }
83
84    /// Narrows which subjects of the parent `JetStream` stream this consumer reads. Defaults to
85    /// [`Self::subject`] when not set.
86    pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
87        self.filter_subject = Some(subject.into());
88        self
89    }
90
91    /// Per-message acknowledgement timeout. After this window an unacked delivery is
92    /// redelivered.
93    pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
94        self.ack_wait = Some(ack_wait);
95        self
96    }
97
98    /// Soft cap on in-flight unacked deliveries.
99    pub const fn max_ack_pending(mut self, max: i64) -> Self {
100        self.max_ack_pending = Some(max);
101        self
102    }
103
104    /// `JetStream` delivery starting position. Defaults to `DeliverPolicy::All` when unset.
105    pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
106        self.deliver_policy = Some(policy);
107        self
108    }
109
110    /// The subject pattern this subscription will receive messages on.
111    #[must_use]
112    pub fn subject(&self) -> &str {
113        &self.subject
114    }
115
116    /// True when [`Self::jetstream`] has been set.
117    #[must_use]
118    pub const fn is_jetstream(&self) -> bool {
119        self.stream.is_some()
120    }
121
122    pub(crate) fn queue_group_ref(&self) -> Option<&str> {
123        self.queue_group.as_deref()
124    }
125
126    pub(crate) fn stream_ref(&self) -> Option<&str> {
127        self.stream.as_deref()
128    }
129
130    pub(crate) fn durable_ref(&self) -> Option<&str> {
131        self.durable.as_deref()
132    }
133
134    pub(crate) fn filter_subject_or_default(&self) -> String {
135        self.filter_subject
136            .clone()
137            .unwrap_or_else(|| self.subject.clone())
138    }
139
140    pub(crate) fn ack_wait_or_default(&self) -> Duration {
141        self.ack_wait.unwrap_or(Duration::from_secs(30))
142    }
143
144    pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
145        self.max_ack_pending.unwrap_or(1024)
146    }
147
148    pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
149        self.deliver_policy.unwrap_or(DeliverPolicy::All)
150    }
151
152    /// Rejects combinations the broker cannot honour. Called by every `subscribe`
153    /// implementation before any work.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`NatsError::InvalidOptions`] when:
158    /// * `queue_group` is set together with `jetstream` (queue groups are Core-only);
159    /// * any `JetStream`-only field (`durable`, `ack_wait`, `max_ack_pending`,
160    ///   `deliver_policy`, `filter_subject`) is set without `jetstream`.
161    pub fn validate(&self) -> Result<(), NatsError> {
162        if self.subject.is_empty() {
163            return Err(NatsError::InvalidOptions(
164                "subject must be non-empty".into(),
165            ));
166        }
167        if self.stream.is_some() && self.queue_group.is_some() {
168            return Err(NatsError::InvalidOptions(
169                "queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
170            ));
171        }
172        if self.stream.is_none() {
173            let js_only = [
174                ("durable", self.durable.is_some()),
175                ("ack_wait", self.ack_wait.is_some()),
176                ("max_ack_pending", self.max_ack_pending.is_some()),
177                ("deliver_policy", self.deliver_policy.is_some()),
178                ("filter_subject", self.filter_subject.is_some()),
179            ];
180            if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
181                return Err(NatsError::InvalidOptions(format!(
182                    "{field}(_) requires jetstream(stream_name)"
183                )));
184            }
185        }
186        Ok(())
187    }
188}
189
190impl SubscriptionSource<NatsBroker> for SubscribeOptions {
191    type Subscriber = NatsSubscriber;
192
193    fn name(&self) -> &str {
194        self.subject()
195    }
196
197    async fn subscribe(self, broker: &NatsBroker) -> Result<Self::Subscriber, NatsError> {
198        broker.subscribe(self).await
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn core_subscription_validates() {
208        let opts = SubscribeOptions::new("orders.*");
209        opts.validate().expect("core ok");
210        assert!(!opts.is_jetstream());
211    }
212
213    #[test]
214    fn core_with_queue_group_validates() {
215        SubscribeOptions::new("x")
216            .queue_group("workers")
217            .validate()
218            .expect("queue group ok on core");
219    }
220
221    #[test]
222    fn jetstream_subscription_validates() {
223        let opts = SubscribeOptions::new("orders.*")
224            .jetstream("ORDERS")
225            .durable("worker")
226            .ack_wait(Duration::from_secs(5))
227            .max_ack_pending(64)
228            .deliver_policy(DeliverPolicy::New);
229        opts.validate().expect("jetstream ok");
230        assert!(opts.is_jetstream());
231    }
232
233    #[test]
234    fn durable_without_jetstream_is_rejected() {
235        let err = SubscribeOptions::new("x")
236            .durable("worker")
237            .validate()
238            .unwrap_err();
239        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
240    }
241
242    #[test]
243    fn ack_wait_without_jetstream_is_rejected() {
244        let err = SubscribeOptions::new("x")
245            .ack_wait(Duration::from_secs(1))
246            .validate()
247            .unwrap_err();
248        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
249    }
250
251    #[test]
252    fn queue_group_plus_jetstream_is_rejected() {
253        let err = SubscribeOptions::new("x")
254            .queue_group("w")
255            .jetstream("S")
256            .validate()
257            .unwrap_err();
258        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
259    }
260
261    #[test]
262    fn empty_subject_is_rejected() {
263        let err = SubscribeOptions::new("").validate().unwrap_err();
264        assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
265    }
266}