Skip to main content

dynomite/events/
subscriber.rs

1//! [`Subscriber`] - the receive side of the cluster events
2//! channel.
3//!
4//! A [`Subscriber`] wraps a [`tokio::sync::broadcast::Receiver`]
5//! and translates the broadcast error shapes into typed
6//! `Result<ClusterEvent, _>` returns.
7
8use tokio::sync::broadcast::{self, error};
9
10use super::ClusterEvent;
11
12/// Error produced by [`Subscriber::recv`].
13#[derive(Debug, thiserror::Error)]
14pub enum SubscriberError {
15    /// The [`super::EventManager`] was dropped; no further
16    /// events will arrive on this subscriber.
17    #[error("event manager closed")]
18    Closed,
19    /// The receiver fell behind the channel tail and missed
20    /// `n` events. The next `recv` resumes from the freshest
21    /// event in the buffer.
22    #[error("subscriber lagged by {0} events")]
23    Lagged(u64),
24}
25
26/// Error produced by [`Subscriber::try_recv`].
27#[derive(Debug, thiserror::Error)]
28pub enum TryRecvError {
29    /// No event is currently buffered.
30    #[error("no event available")]
31    Empty,
32    /// The [`super::EventManager`] was dropped; no further
33    /// events will arrive on this subscriber.
34    #[error("event manager closed")]
35    Closed,
36    /// The receiver fell behind the channel tail and missed
37    /// `n` events. The next call resumes from the freshest
38    /// event in the buffer.
39    #[error("subscriber lagged by {0} events")]
40    Lagged(u64),
41}
42
43/// Receive side of the cluster events channel.
44///
45/// Construct via [`super::EventManager::subscribe`]. Multiple
46/// subscribers are independent: each receives its own copy of
47/// every event published after the subscribe call.
48#[derive(Debug)]
49pub struct Subscriber {
50    rx: broadcast::Receiver<ClusterEvent>,
51}
52
53impl Subscriber {
54    pub(super) fn new(rx: broadcast::Receiver<ClusterEvent>) -> Self {
55        Self { rx }
56    }
57
58    /// Await the next event.
59    ///
60    /// Returns [`SubscriberError::Closed`] when the upstream
61    /// [`super::EventManager`] has been dropped, and
62    /// [`SubscriberError::Lagged`] if the receiver fell behind
63    /// the channel tail. After [`SubscriberError::Lagged`], the
64    /// subscriber is still usable; the next call resumes from
65    /// the freshest event in the buffer.
66    ///
67    /// # Examples
68    ///
69    /// ```
70    /// use std::time::SystemTime;
71    /// use dynomite::events::{ClusterEvent, EventManager};
72    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
73    /// let mgr = EventManager::new(4);
74    /// let mut sub = mgr.subscribe();
75    /// mgr.publish(ClusterEvent::RingChanged {
76    ///     tag: "x".into(),
77    ///     ts: SystemTime::now(),
78    /// });
79    /// let evt = sub.recv().await.unwrap();
80    /// assert!(matches!(evt, ClusterEvent::RingChanged { .. }));
81    /// # });
82    /// ```
83    pub async fn recv(&mut self) -> Result<ClusterEvent, SubscriberError> {
84        match self.rx.recv().await {
85            Ok(evt) => Ok(evt),
86            Err(error::RecvError::Closed) => Err(SubscriberError::Closed),
87            Err(error::RecvError::Lagged(n)) => Err(SubscriberError::Lagged(n)),
88        }
89    }
90
91    /// Non-blocking poll for the next event.
92    ///
93    /// Returns [`TryRecvError::Empty`] when no event is yet
94    /// available, [`TryRecvError::Closed`] when the upstream
95    /// manager has been dropped and the buffer drained, and
96    /// [`TryRecvError::Lagged`] if the receiver fell behind the
97    /// channel tail.
98    ///
99    /// # Examples
100    ///
101    /// ```
102    /// use dynomite::events::{EventManager, TryRecvError};
103    /// let mgr = EventManager::new(4);
104    /// let mut sub = mgr.subscribe();
105    /// assert!(matches!(sub.try_recv(), Err(TryRecvError::Empty)));
106    /// ```
107    pub fn try_recv(&mut self) -> Result<ClusterEvent, TryRecvError> {
108        match self.rx.try_recv() {
109            Ok(evt) => Ok(evt),
110            Err(error::TryRecvError::Empty) => Err(TryRecvError::Empty),
111            Err(error::TryRecvError::Closed) => Err(TryRecvError::Closed),
112            Err(error::TryRecvError::Lagged(n)) => Err(TryRecvError::Lagged(n)),
113        }
114    }
115}