ruststream_nats/
subscribe_options.rs1use std::time::Duration;
7
8pub use async_nats::jetstream::consumer::DeliverPolicy;
9
10use crate::error::NatsError;
11
12#[derive(Debug, Clone)]
36#[must_use]
37pub struct SubscribeOptions {
38 subject: String,
39 queue_group: Option<String>,
40 stream: Option<String>,
41 durable: Option<String>,
42 filter_subject: Option<String>,
43 ack_wait: Option<Duration>,
44 max_ack_pending: Option<i64>,
45 deliver_policy: Option<DeliverPolicy>,
46}
47
48impl SubscribeOptions {
49 pub fn new(subject: impl Into<String>) -> Self {
51 Self {
52 subject: subject.into(),
53 queue_group: None,
54 stream: None,
55 durable: None,
56 filter_subject: None,
57 ack_wait: None,
58 max_ack_pending: None,
59 deliver_policy: None,
60 }
61 }
62
63 pub fn queue_group(mut self, name: impl Into<String>) -> Self {
66 self.queue_group = Some(name.into());
67 self
68 }
69
70 pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
72 self.stream = Some(stream.into());
73 self
74 }
75
76 pub fn durable(mut self, name: impl Into<String>) -> Self {
79 self.durable = Some(name.into());
80 self
81 }
82
83 pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
86 self.filter_subject = Some(subject.into());
87 self
88 }
89
90 pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
93 self.ack_wait = Some(ack_wait);
94 self
95 }
96
97 pub const fn max_ack_pending(mut self, max: i64) -> Self {
99 self.max_ack_pending = Some(max);
100 self
101 }
102
103 pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
105 self.deliver_policy = Some(policy);
106 self
107 }
108
109 #[must_use]
111 pub fn subject(&self) -> &str {
112 &self.subject
113 }
114
115 #[must_use]
117 pub const fn is_jetstream(&self) -> bool {
118 self.stream.is_some()
119 }
120
121 pub(crate) fn queue_group_ref(&self) -> Option<&str> {
122 self.queue_group.as_deref()
123 }
124
125 pub(crate) fn stream_ref(&self) -> Option<&str> {
126 self.stream.as_deref()
127 }
128
129 pub(crate) fn durable_ref(&self) -> Option<&str> {
130 self.durable.as_deref()
131 }
132
133 pub(crate) fn filter_subject_or_default(&self) -> String {
134 self.filter_subject
135 .clone()
136 .unwrap_or_else(|| self.subject.clone())
137 }
138
139 pub(crate) fn ack_wait_or_default(&self) -> Duration {
140 self.ack_wait.unwrap_or(Duration::from_secs(30))
141 }
142
143 pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
144 self.max_ack_pending.unwrap_or(1024)
145 }
146
147 pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
148 self.deliver_policy.unwrap_or(DeliverPolicy::All)
149 }
150
151 pub fn validate(&self) -> Result<(), NatsError> {
161 if self.subject.is_empty() {
162 return Err(NatsError::InvalidOptions(
163 "subject must be non-empty".into(),
164 ));
165 }
166 if self.stream.is_some() && self.queue_group.is_some() {
167 return Err(NatsError::InvalidOptions(
168 "queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
169 ));
170 }
171 if self.stream.is_none() {
172 let js_only = [
173 ("durable", self.durable.is_some()),
174 ("ack_wait", self.ack_wait.is_some()),
175 ("max_ack_pending", self.max_ack_pending.is_some()),
176 ("deliver_policy", self.deliver_policy.is_some()),
177 ("filter_subject", self.filter_subject.is_some()),
178 ];
179 if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
180 return Err(NatsError::InvalidOptions(format!(
181 "{field}(_) requires jetstream(stream_name)"
182 )));
183 }
184 }
185 Ok(())
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn core_subscription_validates() {
195 let opts = SubscribeOptions::new("orders.*");
196 opts.validate().expect("core ok");
197 assert!(!opts.is_jetstream());
198 }
199
200 #[test]
201 fn core_with_queue_group_validates() {
202 SubscribeOptions::new("x")
203 .queue_group("workers")
204 .validate()
205 .expect("queue group ok on core");
206 }
207
208 #[test]
209 fn jetstream_subscription_validates() {
210 let opts = SubscribeOptions::new("orders.*")
211 .jetstream("ORDERS")
212 .durable("worker")
213 .ack_wait(Duration::from_secs(5))
214 .max_ack_pending(64)
215 .deliver_policy(DeliverPolicy::New);
216 opts.validate().expect("jetstream ok");
217 assert!(opts.is_jetstream());
218 }
219
220 #[test]
221 fn durable_without_jetstream_is_rejected() {
222 let err = SubscribeOptions::new("x")
223 .durable("worker")
224 .validate()
225 .unwrap_err();
226 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
227 }
228
229 #[test]
230 fn ack_wait_without_jetstream_is_rejected() {
231 let err = SubscribeOptions::new("x")
232 .ack_wait(Duration::from_secs(1))
233 .validate()
234 .unwrap_err();
235 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
236 }
237
238 #[test]
239 fn queue_group_plus_jetstream_is_rejected() {
240 let err = SubscribeOptions::new("x")
241 .queue_group("w")
242 .jetstream("S")
243 .validate()
244 .unwrap_err();
245 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
246 }
247
248 #[test]
249 fn empty_subject_is_rejected() {
250 let err = SubscribeOptions::new("").validate().unwrap_err();
251 assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
252 }
253}