ruststream_nats/
subscribe_options.rs1use std::time::Duration;
7
8pub use async_nats::jetstream::consumer::DeliverPolicy;
9use ruststream::SubscriptionSource;
10
11use crate::{NatsBroker, error::NatsError, subscriber::NatsSubscriber};
12
13#[derive(Debug, Clone)]
37#[must_use]
38pub struct SubscribeOptions {
39 subject: String,
40 queue_group: Option<String>,
41 stream: Option<String>,
42 durable: Option<String>,
43 filter_subject: Option<String>,
44 ack_wait: Option<Duration>,
45 max_ack_pending: Option<i64>,
46 deliver_policy: Option<DeliverPolicy>,
47 pull_batch: Option<usize>,
48 pull_expires: Option<Duration>,
49}
50
51impl SubscribeOptions {
52 pub fn new(subject: impl Into<String>) -> Self {
54 Self {
55 subject: subject.into(),
56 queue_group: None,
57 stream: None,
58 durable: None,
59 filter_subject: None,
60 ack_wait: None,
61 max_ack_pending: None,
62 deliver_policy: None,
63 pull_batch: None,
64 pull_expires: None,
65 }
66 }
67
68 pub fn queue_group(mut self, name: impl Into<String>) -> Self {
71 self.queue_group = Some(name.into());
72 self
73 }
74
75 pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
77 self.stream = Some(stream.into());
78 self
79 }
80
81 pub fn durable(mut self, name: impl Into<String>) -> Self {
84 self.durable = Some(name.into());
85 self
86 }
87
88 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
91 self.filter_subject = Some(subject.into());
92 self
93 }
94
95 pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
98 self.ack_wait = Some(ack_wait);
99 self
100 }
101
102 pub const fn max_ack_pending(mut self, max: i64) -> Self {
104 self.max_ack_pending = Some(max);
105 self
106 }
107
108 pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
110 self.deliver_policy = Some(policy);
111 self
112 }
113
114 pub const fn pull_batch(mut self, max_messages: usize) -> Self {
119 self.pull_batch = Some(max_messages);
120 self
121 }
122
123 pub const fn pull_expires(mut self, expires: Duration) -> Self {
127 self.pull_expires = Some(expires);
128 self
129 }
130
131 #[must_use]
133 pub fn subject(&self) -> &str {
134 &self.subject
135 }
136
137 #[must_use]
139 pub const fn is_jetstream(&self) -> bool {
140 self.stream.is_some()
141 }
142
143 pub(crate) fn queue_group_ref(&self) -> Option<&str> {
144 self.queue_group.as_deref()
145 }
146
147 pub(crate) fn stream_ref(&self) -> Option<&str> {
148 self.stream.as_deref()
149 }
150
151 pub(crate) fn durable_ref(&self) -> Option<&str> {
152 self.durable.as_deref()
153 }
154
155 pub(crate) fn filter_subject_or_default(&self) -> String {
156 self.filter_subject
157 .clone()
158 .unwrap_or_else(|| self.subject.clone())
159 }
160
161 pub(crate) fn ack_wait_or_default(&self) -> Duration {
162 self.ack_wait.unwrap_or(Duration::from_secs(30))
163 }
164
165 pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
166 self.max_ack_pending.unwrap_or(1024)
167 }
168
169 pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
170 self.deliver_policy.unwrap_or(DeliverPolicy::All)
171 }
172
173 pub(crate) fn pull_batch_or_default(&self) -> usize {
174 self.pull_batch.unwrap_or(100)
175 }
176
177 pub(crate) fn pull_expires_or_default(&self) -> Duration {
178 self.pull_expires.unwrap_or(Duration::from_secs(5))
179 }
180
181 pub fn validate(&self) -> Result<(), NatsError> {
192 if self.subject.is_empty() {
193 return Err(NatsError::InvalidOptions(
194 "subject must be non-empty".into(),
195 ));
196 }
197 if self.stream.is_some() && self.queue_group.is_some() {
198 return Err(NatsError::InvalidOptions(
199 "queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
200 ));
201 }
202 if self.stream.is_none() {
203 let js_only = [
204 ("durable", self.durable.is_some()),
205 ("ack_wait", self.ack_wait.is_some()),
206 ("max_ack_pending", self.max_ack_pending.is_some()),
207 ("deliver_policy", self.deliver_policy.is_some()),
208 ("filter_subject", self.filter_subject.is_some()),
209 ("pull_batch", self.pull_batch.is_some()),
210 ("pull_expires", self.pull_expires.is_some()),
211 ];
212 if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
213 return Err(NatsError::InvalidOptions(format!(
214 "{field}(_) requires jetstream(stream_name)"
215 )));
216 }
217 }
218 Ok(())
219 }
220}
221
222impl SubscriptionSource<NatsBroker> for SubscribeOptions {
223 type Subscriber = NatsSubscriber;
224
225 fn name(&self) -> &str {
226 self.subject()
227 }
228
229 async fn subscribe(self, broker: &NatsBroker) -> Result<Self::Subscriber, NatsError> {
230 broker.subscribe(self).await
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn core_subscription_validates() {
240 let opts = SubscribeOptions::new("orders.*");
241 opts.validate().expect("core ok");
242 assert!(!opts.is_jetstream());
243 }
244
245 #[test]
246 fn core_with_queue_group_validates() {
247 SubscribeOptions::new("x")
248 .queue_group("workers")
249 .validate()
250 .expect("queue group ok on core");
251 }
252
253 #[test]
254 fn jetstream_subscription_validates() {
255 let opts = SubscribeOptions::new("orders.*")
256 .jetstream("ORDERS")
257 .durable("worker")
258 .ack_wait(Duration::from_secs(5))
259 .max_ack_pending(64)
260 .deliver_policy(DeliverPolicy::New);
261 opts.validate().expect("jetstream ok");
262 assert!(opts.is_jetstream());
263 }
264
265 #[test]
266 fn durable_without_jetstream_is_rejected() {
267 let err = SubscribeOptions::new("x")
268 .durable("worker")
269 .validate()
270 .unwrap_err();
271 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
272 }
273
274 #[test]
275 fn ack_wait_without_jetstream_is_rejected() {
276 let err = SubscribeOptions::new("x")
277 .ack_wait(Duration::from_secs(1))
278 .validate()
279 .unwrap_err();
280 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
281 }
282
283 #[test]
284 fn queue_group_plus_jetstream_is_rejected() {
285 let err = SubscribeOptions::new("x")
286 .queue_group("w")
287 .jetstream("S")
288 .validate()
289 .unwrap_err();
290 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
291 }
292
293 #[test]
294 fn pull_batch_without_jetstream_is_rejected() {
295 let err = SubscribeOptions::new("x")
296 .pull_batch(64)
297 .validate()
298 .unwrap_err();
299 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("pull_batch")));
300 }
301
302 #[test]
303 fn empty_subject_is_rejected() {
304 let err = SubscribeOptions::new("").validate().unwrap_err();
305 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
306 }
307}