nats/subscription.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use std::io;
15use std::sync::Arc;
16use std::thread;
17use std::time::Duration;
18
19use crossbeam_channel as channel;
20
21use crate::client::Client;
22use crate::message::Message;
23
24#[derive(Debug)]
25struct Inner {
26 /// Subscription ID.
27 pub(crate) sid: u64,
28
29 /// Subject.
30 pub(crate) subject: String,
31
32 /// MSG operations received from the server.
33 pub(crate) messages: channel::Receiver<Message>,
34
35 /// Client associated with subscription.
36 pub(crate) client: Client,
37}
38
39impl Drop for Inner {
40 fn drop(&mut self) {
41 self.client.unsubscribe(self.sid).ok();
42 }
43}
44
45/// A `Subscription` receives `Message`s published
46/// to specific NATS `Subject`s.
47#[derive(Clone, Debug)]
48pub struct Subscription(Arc<Inner>);
49
50impl Subscription {
51 /// Creates a subscription.
52 pub(crate) fn new(
53 sid: u64,
54 subject: String,
55 messages: channel::Receiver<Message>,
56 client: Client,
57 ) -> Subscription {
58 Subscription(Arc::new(Inner {
59 sid,
60 subject,
61 messages,
62 client,
63 }))
64 }
65
66 /// Get a crossbeam Receiver for subscription messages.
67 /// Useful for `crossbeam_channel::select` macro
68 ///
69 /// # Example
70 /// ```
71 /// # fn main() -> std::io::Result<()> {
72 /// # let nc = nats::connect("demo.nats.io")?;
73 /// # let sub1 = nc.subscribe("foo")?;
74 /// # let sub2 = nc.subscribe("bar")?;
75 /// # nc.publish("foo", "hello")?;
76 /// let sub1_ch = sub1.receiver();
77 /// let sub2_ch = sub2.receiver();
78 /// crossbeam_channel::select! {
79 /// recv(sub1_ch) -> msg => {
80 /// println!("Got message from sub1: {:?}", msg);
81 /// Ok(())
82 /// }
83 /// recv(sub2_ch) -> msg => {
84 /// println!("Got message from sub2: {:?}", msg);
85 /// Ok(())
86 /// }
87 /// }
88 /// # }
89 /// ```
90 pub fn receiver(&self) -> &channel::Receiver<Message> {
91 &self.0.messages
92 }
93
94 /// Get the next message, or None if the subscription
95 /// has been unsubscribed or the connection closed.
96 ///
97 /// # Example
98 /// ```
99 /// # fn main() -> std::io::Result<()> {
100 /// # let nc = nats::connect("demo.nats.io")?;
101 /// # let sub = nc.subscribe("foo")?;
102 /// # nc.publish("foo", "hello")?;
103 /// if let Some(msg) = sub.next() {}
104 /// # Ok(())
105 /// # }
106 /// ```
107 pub fn next(&self) -> Option<Message> {
108 self.0.messages.recv().ok()
109 }
110
111 /// Try to get the next message, or None if no messages
112 /// are present or if the subscription has been unsubscribed
113 /// or the connection closed.
114 ///
115 /// # Example
116 /// ```
117 /// # fn main() -> std::io::Result<()> {
118 /// # let nc = nats::connect("demo.nats.io")?;
119 /// # let sub = nc.subscribe("foo")?;
120 /// if let Some(msg) = sub.try_next() {
121 /// println!("Received {}", msg);
122 /// }
123 /// # Ok(())
124 /// # }
125 /// ```
126 pub fn try_next(&self) -> Option<Message> {
127 self.0.messages.try_recv().ok()
128 }
129
130 /// Get the next message, or a timeout error
131 /// if no messages are available for timeout.
132 ///
133 /// # Example
134 /// ```
135 /// # fn main() -> std::io::Result<()> {
136 /// # let nc = nats::connect("demo.nats.io")?;
137 /// # let sub = nc.subscribe("foo")?;
138 /// if let Ok(msg) = sub.next_timeout(std::time::Duration::from_secs(1)) {}
139 /// # Ok(())
140 /// # }
141 /// ```
142 pub fn next_timeout(&self, timeout: Duration) -> io::Result<Message> {
143 match self.0.messages.recv_timeout(timeout) {
144 Ok(msg) => Ok(msg),
145 Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
146 io::ErrorKind::TimedOut,
147 "next_timeout: timed out",
148 )),
149 Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
150 io::ErrorKind::Other,
151 "next_timeout: unsubscribed",
152 )),
153 }
154 }
155
156 /// Returns a blocking message iterator.
157 /// Same as calling `iter()`.
158 ///
159 /// # Example
160 /// ```no_run
161 /// # fn main() -> std::io::Result<()> {
162 /// # let nc = nats::connect("demo.nats.io")?;
163 /// # let sub = nc.subscribe("foo")?;
164 /// for msg in sub.messages() {}
165 /// # Ok(())
166 /// # }
167 /// ```
168 pub fn messages(&self) -> Iter<'_> {
169 Iter { subscription: self }
170 }
171
172 /// Returns a blocking message iterator.
173 ///
174 /// # Example
175 /// ```no_run
176 /// # fn main() -> std::io::Result<()> {
177 /// # let nc = nats::connect("demo.nats.io")?;
178 /// # let sub = nc.subscribe("foo")?;
179 /// for msg in sub.iter() {}
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub fn iter(&self) -> Iter<'_> {
184 Iter { subscription: self }
185 }
186
187 /// Returns a non-blocking message iterator.
188 ///
189 /// # Example
190 /// ```
191 /// # fn main() -> std::io::Result<()> {
192 /// # let nc = nats::connect("demo.nats.io")?;
193 /// # let sub = nc.subscribe("foo")?;
194 /// for msg in sub.try_iter() {}
195 /// # Ok(())
196 /// # }
197 /// ```
198 pub fn try_iter(&self) -> TryIter<'_> {
199 TryIter { subscription: self }
200 }
201
202 /// Returns a blocking message iterator with a time
203 /// deadline for blocking.
204 ///
205 /// # Example
206 /// ```
207 /// # fn main() -> std::io::Result<()> {
208 /// # let nc = nats::connect("demo.nats.io")?;
209 /// # let sub = nc.subscribe("foo")?;
210 /// for msg in sub.timeout_iter(std::time::Duration::from_secs(1)) {}
211 /// # Ok(())
212 /// # }
213 /// ```
214 pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
215 TimeoutIter {
216 subscription: self,
217 to: timeout,
218 }
219 }
220
221 /// Attach a closure to handle messages. This closure will execute in a
222 /// separate thread. The result of this call is a `Handler` which can
223 /// not be iterated and must be unsubscribed or closed directly to
224 /// unregister interest. A `Handler` will not unregister interest with
225 /// the server when `drop(&mut self)` is called.
226 ///
227 /// # Example
228 /// ```
229 /// # fn main() -> std::io::Result<()> {
230 /// # let nc = nats::connect("demo.nats.io")?;
231 /// # nc.publish("bar", b"data")?;
232 /// nc.subscribe("bar")?.with_handler(move |msg| {
233 /// println!("Received {}", &msg);
234 /// Ok(())
235 /// });
236 /// # Ok(())
237 /// # }
238 /// ```
239 pub fn with_handler<F>(self, handler: F) -> Handler
240 where
241 F: Fn(Message) -> io::Result<()> + Send + 'static,
242 {
243 // This will allow us to not have to capture the return. When it is
244 // dropped it will not unsubscribe from the server.
245 let sub = self.clone();
246 thread::Builder::new()
247 .name(format!("nats_subscriber_{}_{}", self.0.sid, self.0.subject))
248 .spawn(move || {
249 for m in &sub {
250 if let Err(e) = handler(m) {
251 // TODO(dlc) - Capture for last error?
252 log::error!("Error in callback! {:?}", e);
253 }
254 }
255 })
256 .expect("threads should be spawnable");
257 Handler { sub: self }
258 }
259
260 /// Sets limit of how many messages can wait in internal queue.
261 /// If limit will be reached, `error_callback` will be fired with information
262 /// which subscription is affected
263 ///
264 /// # Example
265 /// ```
266 /// # fn main() -> std::io::Result<()> {
267 /// # let nc = nats::connect("demo.nats.io")?;
268 /// let sub = nc.subscribe("bar")?;
269 /// sub.set_message_limits(1000);
270 /// # Ok(())
271 /// # }
272 /// ```
273
274 pub fn set_message_limits(&self, limit: usize) {
275 self.0
276 .client
277 .state
278 .read
279 .lock()
280 .subscriptions
281 .entry(self.0.sid)
282 .and_modify(|sub| sub.pending_messages_limit = Some(limit));
283 }
284
285 /// Returns number of dropped messages for this Subscription.
286 /// Dropped messages occur when `set_message_limits` is set and threshold is reached,
287 /// triggering `slow consumer` error.
288 ///
289 /// # Example:
290 /// ```
291 /// # fn main() -> std::io::Result<()> {
292 /// # let nc = nats::connect("demo.nats.io")?;
293 /// let sub = nc.subscribe("bar")?;
294 /// sub.set_message_limits(1000);
295 /// println!("dropped messages: {}", sub.dropped_messages()?);
296 /// # Ok(())
297 /// # }
298 /// ```
299 pub fn dropped_messages(&self) -> io::Result<usize> {
300 self.0
301 .client
302 .state
303 .read
304 .lock()
305 .subscriptions
306 .get(&self.0.sid)
307 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "subscription not found"))
308 .map(|subscription| subscription.dropped_messages)
309 }
310
311 /// Unsubscribe a subscription immediately without draining.
312 /// Use `drain` instead if you want any pending messages
313 /// to be processed by a handler, if one is configured.
314 ///
315 /// # Example
316 /// ```
317 /// # fn main() -> std::io::Result<()> {
318 /// # let nc = nats::connect("demo.nats.io")?;
319 /// let sub = nc.subscribe("foo")?;
320 /// sub.unsubscribe()?;
321 /// # Ok(())
322 /// # }
323 /// ```
324 pub fn unsubscribe(self) -> io::Result<()> {
325 self.0.client.unsubscribe(self.0.sid)?;
326 // Discard all queued messages.
327 while self.0.messages.try_recv().is_ok() {}
328 Ok(())
329 }
330
331 /// Close a subscription. Same as `unsubscribe`
332 ///
333 /// Use `drain` instead if you want any pending messages
334 /// to be processed by a handler, if one is configured.
335 ///
336 /// # Example
337 /// ```
338 /// # fn main() -> std::io::Result<()> {
339 /// # let nc = nats::connect("demo.nats.io")?;
340 /// let sub = nc.subscribe("foo")?;
341 /// sub.close()?;
342 /// # Ok(())
343 /// # }
344 /// ```
345 pub fn close(self) -> io::Result<()> {
346 self.unsubscribe()
347 }
348
349 /// Send an unsubscription then flush the connection,
350 /// allowing any unprocessed messages to be handled
351 /// by a handler function if one is configured.
352 ///
353 /// After the flush returns, we know that a round-trip
354 /// to the server has happened after it received our
355 /// unsubscription, so we shut down the subscriber
356 /// afterwards.
357 ///
358 /// A similar method exists on the `Connection` struct
359 /// which will drain all subscriptions for the NATS
360 /// client, and transition the entire system into
361 /// the closed state afterward.
362 ///
363 /// # Example
364 ///
365 /// ```
366 /// # use std::sync::{Arc, atomic::{AtomicBool, Ordering::SeqCst}};
367 /// # use std::thread;
368 /// # use std::time::Duration;
369 /// # fn main() -> std::io::Result<()> {
370 /// # let nc = nats::connect("demo.nats.io")?;
371 ///
372 /// let mut sub = nc.subscribe("test.drain")?;
373 ///
374 /// nc.publish("test.drain", "message")?;
375 /// sub.drain()?;
376 ///
377 /// let mut received = false;
378 /// for _ in sub {
379 /// received = true;
380 /// }
381 ///
382 /// assert!(received);
383 ///
384 /// # Ok(())
385 /// # }
386 /// ```
387 pub fn drain(&self) -> io::Result<()> {
388 self.0.client.flush(crate::DEFAULT_FLUSH_TIMEOUT)?;
389 self.0.client.unsubscribe(self.0.sid)?;
390 Ok(())
391 }
392}
393
394impl IntoIterator for Subscription {
395 type Item = Message;
396 type IntoIter = IntoIter;
397
398 fn into_iter(self) -> IntoIter {
399 IntoIter { subscription: self }
400 }
401}
402
403impl<'a> IntoIterator for &'a Subscription {
404 type Item = Message;
405 type IntoIter = Iter<'a>;
406
407 fn into_iter(self) -> Iter<'a> {
408 Iter { subscription: self }
409 }
410}
411
412/// A `Handler` may be used to unsubscribe a handler thread.
413pub struct Handler {
414 sub: Subscription,
415}
416
417impl Handler {
418 /// Unsubscribe a subscription.
419 ///
420 /// # Example
421 /// ```
422 /// # fn main() -> std::io::Result<()> {
423 /// # let nc = nats::connect("demo.nats.io")?;
424 /// let sub = nc.subscribe("foo")?.with_handler(move |msg| {
425 /// println!("Received {}", &msg);
426 /// Ok(())
427 /// });
428 /// sub.unsubscribe()?;
429 /// # Ok(())
430 /// # }
431 /// ```
432 pub fn unsubscribe(self) -> io::Result<()> {
433 self.sub.drain()
434 }
435}
436
437/// A non-blocking iterator over messages from a `Subscription`
438pub struct TryIter<'a> {
439 subscription: &'a Subscription,
440}
441
442impl<'a> Iterator for TryIter<'a> {
443 type Item = Message;
444 fn next(&mut self) -> Option<Self::Item> {
445 self.subscription.try_next()
446 }
447}
448
449/// An iterator over messages from a `Subscription`
450pub struct Iter<'a> {
451 subscription: &'a Subscription,
452}
453
454impl<'a> Iterator for Iter<'a> {
455 type Item = Message;
456 fn next(&mut self) -> Option<Self::Item> {
457 self.subscription.next()
458 }
459}
460
461/// An iterator over messages from a `Subscription`
462pub struct IntoIter {
463 subscription: Subscription,
464}
465
466impl Iterator for IntoIter {
467 type Item = Message;
468 fn next(&mut self) -> Option<Self::Item> {
469 self.subscription.next()
470 }
471}
472
473/// An iterator over messages from a `Subscription`
474/// where `None` will be returned if a new `Message`
475/// has not been received by the end of a timeout.
476pub struct TimeoutIter<'a> {
477 subscription: &'a Subscription,
478 to: Duration,
479}
480
481impl<'a> Iterator for TimeoutIter<'a> {
482 type Item = Message;
483 fn next(&mut self) -> Option<Self::Item> {
484 self.subscription.next_timeout(self.to).ok()
485 }
486}