Skip to main content

nats/
subscription.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use std::io;
15use std::sync::Arc;
16use std::thread;
17use std::time::Duration;
18
19use crossbeam_channel as channel;
20
21use crate::client::Client;
22use crate::message::Message;
23
24#[derive(Debug)]
25struct Inner {
26    /// Subscription ID.
27    pub(crate) sid: u64,
28
29    /// Subject.
30    pub(crate) subject: String,
31
32    /// MSG operations received from the server.
33    pub(crate) messages: channel::Receiver<Message>,
34
35    /// Client associated with subscription.
36    pub(crate) client: Client,
37}
38
39impl Drop for Inner {
40    fn drop(&mut self) {
41        self.client.unsubscribe(self.sid).ok();
42    }
43}
44
45/// A `Subscription` receives `Message`s published
46/// to specific NATS `Subject`s.
47#[derive(Clone, Debug)]
48pub struct Subscription(Arc<Inner>);
49
50impl Subscription {
51    /// Creates a subscription.
52    pub(crate) fn new(
53        sid: u64,
54        subject: String,
55        messages: channel::Receiver<Message>,
56        client: Client,
57    ) -> Subscription {
58        Subscription(Arc::new(Inner {
59            sid,
60            subject,
61            messages,
62            client,
63        }))
64    }
65
66    /// Get a crossbeam Receiver for subscription messages.
67    /// Useful for `crossbeam_channel::select` macro
68    ///
69    /// # Example
70    /// ```
71    /// # fn main() -> std::io::Result<()> {
72    /// # let nc = nats::connect("demo.nats.io")?;
73    /// # let sub1 = nc.subscribe("foo")?;
74    /// # let sub2 = nc.subscribe("bar")?;
75    /// # nc.publish("foo", "hello")?;
76    /// let sub1_ch = sub1.receiver();
77    /// let sub2_ch = sub2.receiver();
78    /// crossbeam_channel::select! {
79    ///     recv(sub1_ch) -> msg => {
80    ///         println!("Got message from sub1: {:?}", msg);
81    ///         Ok(())
82    ///     }
83    ///     recv(sub2_ch) -> msg => {
84    ///         println!("Got message from sub2: {:?}", msg);
85    ///         Ok(())
86    ///     }
87    /// }
88    /// # }
89    /// ```
90    pub fn receiver(&self) -> &channel::Receiver<Message> {
91        &self.0.messages
92    }
93
94    /// Get the next message, or None if the subscription
95    /// has been unsubscribed or the connection closed.
96    ///
97    /// # Example
98    /// ```
99    /// # fn main() -> std::io::Result<()> {
100    /// # let nc = nats::connect("demo.nats.io")?;
101    /// # let sub = nc.subscribe("foo")?;
102    /// # nc.publish("foo", "hello")?;
103    /// if let Some(msg) = sub.next() {}
104    /// # Ok(())
105    /// # }
106    /// ```
107    pub fn next(&self) -> Option<Message> {
108        self.0.messages.recv().ok()
109    }
110
111    /// Try to get the next message, or None if no messages
112    /// are present or if the subscription has been unsubscribed
113    /// or the connection closed.
114    ///
115    /// # Example
116    /// ```
117    /// # fn main() -> std::io::Result<()> {
118    /// # let nc = nats::connect("demo.nats.io")?;
119    /// # let sub = nc.subscribe("foo")?;
120    /// if let Some(msg) = sub.try_next() {
121    ///     println!("Received {}", msg);
122    /// }
123    /// # Ok(())
124    /// # }
125    /// ```
126    pub fn try_next(&self) -> Option<Message> {
127        self.0.messages.try_recv().ok()
128    }
129
130    /// Get the next message, or a timeout error
131    /// if no messages are available for timeout.
132    ///
133    /// # Example
134    /// ```
135    /// # fn main() -> std::io::Result<()> {
136    /// # let nc = nats::connect("demo.nats.io")?;
137    /// # let sub = nc.subscribe("foo")?;
138    /// if let Ok(msg) = sub.next_timeout(std::time::Duration::from_secs(1)) {}
139    /// # Ok(())
140    /// # }
141    /// ```
142    pub fn next_timeout(&self, timeout: Duration) -> io::Result<Message> {
143        match self.0.messages.recv_timeout(timeout) {
144            Ok(msg) => Ok(msg),
145            Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
146                io::ErrorKind::TimedOut,
147                "next_timeout: timed out",
148            )),
149            Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
150                io::ErrorKind::Other,
151                "next_timeout: unsubscribed",
152            )),
153        }
154    }
155
156    /// Returns a blocking message iterator.
157    /// Same as calling `iter()`.
158    ///
159    /// # Example
160    /// ```no_run
161    /// # fn main() -> std::io::Result<()> {
162    /// # let nc = nats::connect("demo.nats.io")?;
163    /// # let sub = nc.subscribe("foo")?;
164    /// for msg in sub.messages() {}
165    /// # Ok(())
166    /// # }
167    /// ```
168    pub fn messages(&self) -> Iter<'_> {
169        Iter { subscription: self }
170    }
171
172    /// Returns a blocking message iterator.
173    ///
174    /// # Example
175    /// ```no_run
176    /// # fn main() -> std::io::Result<()> {
177    /// # let nc = nats::connect("demo.nats.io")?;
178    /// # let sub = nc.subscribe("foo")?;
179    /// for msg in sub.iter() {}
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub fn iter(&self) -> Iter<'_> {
184        Iter { subscription: self }
185    }
186
187    /// Returns a non-blocking message iterator.
188    ///
189    /// # Example
190    /// ```
191    /// # fn main() -> std::io::Result<()> {
192    /// # let nc = nats::connect("demo.nats.io")?;
193    /// # let sub = nc.subscribe("foo")?;
194    /// for msg in sub.try_iter() {}
195    /// # Ok(())
196    /// # }
197    /// ```
198    pub fn try_iter(&self) -> TryIter<'_> {
199        TryIter { subscription: self }
200    }
201
202    /// Returns a blocking message iterator with a time
203    /// deadline for blocking.
204    ///
205    /// # Example
206    /// ```
207    /// # fn main() -> std::io::Result<()> {
208    /// # let nc = nats::connect("demo.nats.io")?;
209    /// # let sub = nc.subscribe("foo")?;
210    /// for msg in sub.timeout_iter(std::time::Duration::from_secs(1)) {}
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
215        TimeoutIter {
216            subscription: self,
217            to: timeout,
218        }
219    }
220
221    /// Attach a closure to handle messages. This closure will execute in a
222    /// separate thread. The result of this call is a `Handler` which can
223    /// not be iterated and must be unsubscribed or closed directly to
224    /// unregister interest. A `Handler` will not unregister interest with
225    /// the server when `drop(&mut self)` is called.
226    ///
227    /// # Example
228    /// ```
229    /// # fn main() -> std::io::Result<()> {
230    /// # let nc = nats::connect("demo.nats.io")?;
231    /// # nc.publish("bar", b"data")?;
232    /// nc.subscribe("bar")?.with_handler(move |msg| {
233    ///     println!("Received {}", &msg);
234    ///     Ok(())
235    /// });
236    /// # Ok(())
237    /// # }
238    /// ```
239    pub fn with_handler<F>(self, handler: F) -> Handler
240    where
241        F: Fn(Message) -> io::Result<()> + Send + 'static,
242    {
243        // This will allow us to not have to capture the return. When it is
244        // dropped it will not unsubscribe from the server.
245        let sub = self.clone();
246        thread::Builder::new()
247            .name(format!("nats_subscriber_{}_{}", self.0.sid, self.0.subject))
248            .spawn(move || {
249                for m in &sub {
250                    if let Err(e) = handler(m) {
251                        // TODO(dlc) - Capture for last error?
252                        log::error!("Error in callback! {:?}", e);
253                    }
254                }
255            })
256            .expect("threads should be spawnable");
257        Handler { sub: self }
258    }
259
260    /// Sets limit of how many messages can wait in internal queue.
261    /// If limit will be reached, `error_callback` will be fired with information
262    /// which subscription is affected
263    ///
264    /// # Example
265    /// ```
266    /// # fn main() -> std::io::Result<()> {
267    /// # let nc = nats::connect("demo.nats.io")?;
268    /// let sub = nc.subscribe("bar")?;
269    /// sub.set_message_limits(1000);
270    /// # Ok(())
271    /// # }
272    /// ```
273
274    pub fn set_message_limits(&self, limit: usize) {
275        self.0
276            .client
277            .state
278            .read
279            .lock()
280            .subscriptions
281            .entry(self.0.sid)
282            .and_modify(|sub| sub.pending_messages_limit = Some(limit));
283    }
284
285    /// Returns number of dropped messages for this Subscription.
286    /// Dropped messages occur when `set_message_limits` is set and threshold is reached,
287    /// triggering `slow consumer` error.
288    ///
289    /// # Example:
290    /// ```
291    /// # fn main() -> std::io::Result<()> {
292    /// # let nc = nats::connect("demo.nats.io")?;
293    /// let sub = nc.subscribe("bar")?;
294    /// sub.set_message_limits(1000);
295    /// println!("dropped messages: {}", sub.dropped_messages()?);
296    /// # Ok(())
297    /// # }
298    /// ```
299    pub fn dropped_messages(&self) -> io::Result<usize> {
300        self.0
301            .client
302            .state
303            .read
304            .lock()
305            .subscriptions
306            .get(&self.0.sid)
307            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "subscription not found"))
308            .map(|subscription| subscription.dropped_messages)
309    }
310
311    /// Unsubscribe a subscription immediately without draining.
312    /// Use `drain` instead if you want any pending messages
313    /// to be processed by a handler, if one is configured.
314    ///
315    /// # Example
316    /// ```
317    /// # fn main() -> std::io::Result<()> {
318    /// # let nc = nats::connect("demo.nats.io")?;
319    /// let sub = nc.subscribe("foo")?;
320    /// sub.unsubscribe()?;
321    /// # Ok(())
322    /// # }
323    /// ```
324    pub fn unsubscribe(self) -> io::Result<()> {
325        self.0.client.unsubscribe(self.0.sid)?;
326        // Discard all queued messages.
327        while self.0.messages.try_recv().is_ok() {}
328        Ok(())
329    }
330
331    /// Close a subscription. Same as `unsubscribe`
332    ///
333    /// Use `drain` instead if you want any pending messages
334    /// to be processed by a handler, if one is configured.
335    ///
336    /// # Example
337    /// ```
338    /// # fn main() -> std::io::Result<()> {
339    /// # let nc = nats::connect("demo.nats.io")?;
340    /// let sub = nc.subscribe("foo")?;
341    /// sub.close()?;
342    /// # Ok(())
343    /// # }
344    /// ```
345    pub fn close(self) -> io::Result<()> {
346        self.unsubscribe()
347    }
348
349    /// Send an unsubscription then flush the connection,
350    /// allowing any unprocessed messages to be handled
351    /// by a handler function if one is configured.
352    ///
353    /// After the flush returns, we know that a round-trip
354    /// to the server has happened after it received our
355    /// unsubscription, so we shut down the subscriber
356    /// afterwards.
357    ///
358    /// A similar method exists on the `Connection` struct
359    /// which will drain all subscriptions for the NATS
360    /// client, and transition the entire system into
361    /// the closed state afterward.
362    ///
363    /// # Example
364    ///
365    /// ```
366    /// # use std::sync::{Arc, atomic::{AtomicBool, Ordering::SeqCst}};
367    /// # use std::thread;
368    /// # use std::time::Duration;
369    /// # fn main() -> std::io::Result<()> {
370    /// # let nc = nats::connect("demo.nats.io")?;
371    ///
372    /// let mut sub = nc.subscribe("test.drain")?;
373    ///
374    /// nc.publish("test.drain", "message")?;
375    /// sub.drain()?;
376    ///
377    /// let mut received = false;
378    /// for _ in sub {
379    ///     received = true;
380    /// }
381    ///
382    /// assert!(received);
383    ///
384    /// # Ok(())
385    /// # }
386    /// ```
387    pub fn drain(&self) -> io::Result<()> {
388        self.0.client.flush(crate::DEFAULT_FLUSH_TIMEOUT)?;
389        self.0.client.unsubscribe(self.0.sid)?;
390        Ok(())
391    }
392}
393
394impl IntoIterator for Subscription {
395    type Item = Message;
396    type IntoIter = IntoIter;
397
398    fn into_iter(self) -> IntoIter {
399        IntoIter { subscription: self }
400    }
401}
402
403impl<'a> IntoIterator for &'a Subscription {
404    type Item = Message;
405    type IntoIter = Iter<'a>;
406
407    fn into_iter(self) -> Iter<'a> {
408        Iter { subscription: self }
409    }
410}
411
412/// A `Handler` may be used to unsubscribe a handler thread.
413pub struct Handler {
414    sub: Subscription,
415}
416
417impl Handler {
418    /// Unsubscribe a subscription.
419    ///
420    /// # Example
421    /// ```
422    /// # fn main() -> std::io::Result<()> {
423    /// # let nc = nats::connect("demo.nats.io")?;
424    /// let sub = nc.subscribe("foo")?.with_handler(move |msg| {
425    ///     println!("Received {}", &msg);
426    ///     Ok(())
427    /// });
428    /// sub.unsubscribe()?;
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub fn unsubscribe(self) -> io::Result<()> {
433        self.sub.drain()
434    }
435}
436
437/// A non-blocking iterator over messages from a `Subscription`
438pub struct TryIter<'a> {
439    subscription: &'a Subscription,
440}
441
442impl<'a> Iterator for TryIter<'a> {
443    type Item = Message;
444    fn next(&mut self) -> Option<Self::Item> {
445        self.subscription.try_next()
446    }
447}
448
449/// An iterator over messages from a `Subscription`
450pub struct Iter<'a> {
451    subscription: &'a Subscription,
452}
453
454impl<'a> Iterator for Iter<'a> {
455    type Item = Message;
456    fn next(&mut self) -> Option<Self::Item> {
457        self.subscription.next()
458    }
459}
460
461/// An iterator over messages from a `Subscription`
462pub struct IntoIter {
463    subscription: Subscription,
464}
465
466impl Iterator for IntoIter {
467    type Item = Message;
468    fn next(&mut self) -> Option<Self::Item> {
469        self.subscription.next()
470    }
471}
472
473/// An iterator over messages from a `Subscription`
474/// where `None` will be returned if a new `Message`
475/// has not been received by the end of a timeout.
476pub struct TimeoutIter<'a> {
477    subscription: &'a Subscription,
478    to: Duration,
479}
480
481impl<'a> Iterator for TimeoutIter<'a> {
482    type Item = Message;
483    fn next(&mut self) -> Option<Self::Item> {
484        self.subscription.next_timeout(self.to).ok()
485    }
486}