ruststream_nats/
subscribe_options.rs1use std::time::Duration;
7
8pub use async_nats::jetstream::consumer::DeliverPolicy;
9use ruststream::SubscriptionSource;
10
11use crate::{NatsBroker, error::NatsError, subscriber::NatsSubscriber};
12
13#[derive(Debug, Clone)]
37#[must_use]
38pub struct SubscribeOptions {
39 subject: String,
40 queue_group: Option<String>,
41 stream: Option<String>,
42 durable: Option<String>,
43 filter_subject: Option<String>,
44 ack_wait: Option<Duration>,
45 max_ack_pending: Option<i64>,
46 deliver_policy: Option<DeliverPolicy>,
47}
48
49impl SubscribeOptions {
50 pub fn new(subject: impl Into<String>) -> Self {
52 Self {
53 subject: subject.into(),
54 queue_group: None,
55 stream: None,
56 durable: None,
57 filter_subject: None,
58 ack_wait: None,
59 max_ack_pending: None,
60 deliver_policy: None,
61 }
62 }
63
64 pub fn queue_group(mut self, name: impl Into<String>) -> Self {
67 self.queue_group = Some(name.into());
68 self
69 }
70
71 pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
73 self.stream = Some(stream.into());
74 self
75 }
76
77 pub fn durable(mut self, name: impl Into<String>) -> Self {
80 self.durable = Some(name.into());
81 self
82 }
83
84 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
87 self.filter_subject = Some(subject.into());
88 self
89 }
90
91 pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
94 self.ack_wait = Some(ack_wait);
95 self
96 }
97
98 pub const fn max_ack_pending(mut self, max: i64) -> Self {
100 self.max_ack_pending = Some(max);
101 self
102 }
103
104 pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
106 self.deliver_policy = Some(policy);
107 self
108 }
109
110 #[must_use]
112 pub fn subject(&self) -> &str {
113 &self.subject
114 }
115
116 #[must_use]
118 pub const fn is_jetstream(&self) -> bool {
119 self.stream.is_some()
120 }
121
122 pub(crate) fn queue_group_ref(&self) -> Option<&str> {
123 self.queue_group.as_deref()
124 }
125
126 pub(crate) fn stream_ref(&self) -> Option<&str> {
127 self.stream.as_deref()
128 }
129
130 pub(crate) fn durable_ref(&self) -> Option<&str> {
131 self.durable.as_deref()
132 }
133
134 pub(crate) fn filter_subject_or_default(&self) -> String {
135 self.filter_subject
136 .clone()
137 .unwrap_or_else(|| self.subject.clone())
138 }
139
140 pub(crate) fn ack_wait_or_default(&self) -> Duration {
141 self.ack_wait.unwrap_or(Duration::from_secs(30))
142 }
143
144 pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
145 self.max_ack_pending.unwrap_or(1024)
146 }
147
148 pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
149 self.deliver_policy.unwrap_or(DeliverPolicy::All)
150 }
151
152 pub fn validate(&self) -> Result<(), NatsError> {
162 if self.subject.is_empty() {
163 return Err(NatsError::InvalidOptions(
164 "subject must be non-empty".into(),
165 ));
166 }
167 if self.stream.is_some() && self.queue_group.is_some() {
168 return Err(NatsError::InvalidOptions(
169 "queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
170 ));
171 }
172 if self.stream.is_none() {
173 let js_only = [
174 ("durable", self.durable.is_some()),
175 ("ack_wait", self.ack_wait.is_some()),
176 ("max_ack_pending", self.max_ack_pending.is_some()),
177 ("deliver_policy", self.deliver_policy.is_some()),
178 ("filter_subject", self.filter_subject.is_some()),
179 ];
180 if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
181 return Err(NatsError::InvalidOptions(format!(
182 "{field}(_) requires jetstream(stream_name)"
183 )));
184 }
185 }
186 Ok(())
187 }
188}
189
190impl SubscriptionSource<NatsBroker> for SubscribeOptions {
191 type Subscriber = NatsSubscriber;
192
193 fn name(&self) -> &str {
194 self.subject()
195 }
196
197 async fn subscribe(self, broker: &NatsBroker) -> Result<Self::Subscriber, NatsError> {
198 broker.subscribe(self).await
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn core_subscription_validates() {
208 let opts = SubscribeOptions::new("orders.*");
209 opts.validate().expect("core ok");
210 assert!(!opts.is_jetstream());
211 }
212
213 #[test]
214 fn core_with_queue_group_validates() {
215 SubscribeOptions::new("x")
216 .queue_group("workers")
217 .validate()
218 .expect("queue group ok on core");
219 }
220
221 #[test]
222 fn jetstream_subscription_validates() {
223 let opts = SubscribeOptions::new("orders.*")
224 .jetstream("ORDERS")
225 .durable("worker")
226 .ack_wait(Duration::from_secs(5))
227 .max_ack_pending(64)
228 .deliver_policy(DeliverPolicy::New);
229 opts.validate().expect("jetstream ok");
230 assert!(opts.is_jetstream());
231 }
232
233 #[test]
234 fn durable_without_jetstream_is_rejected() {
235 let err = SubscribeOptions::new("x")
236 .durable("worker")
237 .validate()
238 .unwrap_err();
239 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
240 }
241
242 #[test]
243 fn ack_wait_without_jetstream_is_rejected() {
244 let err = SubscribeOptions::new("x")
245 .ack_wait(Duration::from_secs(1))
246 .validate()
247 .unwrap_err();
248 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
249 }
250
251 #[test]
252 fn queue_group_plus_jetstream_is_rejected() {
253 let err = SubscribeOptions::new("x")
254 .queue_group("w")
255 .jetstream("S")
256 .validate()
257 .unwrap_err();
258 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
259 }
260
261 #[test]
262 fn empty_subject_is_rejected() {
263 let err = SubscribeOptions::new("").validate().unwrap_err();
264 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
265 }
266}