dynomite/events/subscriber.rs
1//! [`Subscriber`] - the receive side of the cluster events
2//! channel.
3//!
4//! A [`Subscriber`] wraps a [`tokio::sync::broadcast::Receiver`]
5//! and translates the broadcast error shapes into typed
6//! `Result<ClusterEvent, _>` returns.
7
8use tokio::sync::broadcast::{self, error};
9
10use super::ClusterEvent;
11
12/// Error produced by [`Subscriber::recv`].
13#[derive(Debug, thiserror::Error)]
14pub enum SubscriberError {
15 /// The [`super::EventManager`] was dropped; no further
16 /// events will arrive on this subscriber.
17 #[error("event manager closed")]
18 Closed,
19 /// The receiver fell behind the channel tail and missed
20 /// `n` events. The next `recv` resumes from the freshest
21 /// event in the buffer.
22 #[error("subscriber lagged by {0} events")]
23 Lagged(u64),
24}
25
26/// Error produced by [`Subscriber::try_recv`].
27#[derive(Debug, thiserror::Error)]
28pub enum TryRecvError {
29 /// No event is currently buffered.
30 #[error("no event available")]
31 Empty,
32 /// The [`super::EventManager`] was dropped; no further
33 /// events will arrive on this subscriber.
34 #[error("event manager closed")]
35 Closed,
36 /// The receiver fell behind the channel tail and missed
37 /// `n` events. The next call resumes from the freshest
38 /// event in the buffer.
39 #[error("subscriber lagged by {0} events")]
40 Lagged(u64),
41}
42
43/// Receive side of the cluster events channel.
44///
45/// Construct via [`super::EventManager::subscribe`]. Multiple
46/// subscribers are independent: each receives its own copy of
47/// every event published after the subscribe call.
48#[derive(Debug)]
49pub struct Subscriber {
50 rx: broadcast::Receiver<ClusterEvent>,
51}
52
53impl Subscriber {
54 pub(super) fn new(rx: broadcast::Receiver<ClusterEvent>) -> Self {
55 Self { rx }
56 }
57
58 /// Await the next event.
59 ///
60 /// Returns [`SubscriberError::Closed`] when the upstream
61 /// [`super::EventManager`] has been dropped, and
62 /// [`SubscriberError::Lagged`] if the receiver fell behind
63 /// the channel tail. After [`SubscriberError::Lagged`], the
64 /// subscriber is still usable; the next call resumes from
65 /// the freshest event in the buffer.
66 ///
67 /// # Examples
68 ///
69 /// ```
70 /// use std::time::SystemTime;
71 /// use dynomite::events::{ClusterEvent, EventManager};
72 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
73 /// let mgr = EventManager::new(4);
74 /// let mut sub = mgr.subscribe();
75 /// mgr.publish(ClusterEvent::RingChanged {
76 /// tag: "x".into(),
77 /// ts: SystemTime::now(),
78 /// });
79 /// let evt = sub.recv().await.unwrap();
80 /// assert!(matches!(evt, ClusterEvent::RingChanged { .. }));
81 /// # });
82 /// ```
83 pub async fn recv(&mut self) -> Result<ClusterEvent, SubscriberError> {
84 match self.rx.recv().await {
85 Ok(evt) => Ok(evt),
86 Err(error::RecvError::Closed) => Err(SubscriberError::Closed),
87 Err(error::RecvError::Lagged(n)) => Err(SubscriberError::Lagged(n)),
88 }
89 }
90
91 /// Non-blocking poll for the next event.
92 ///
93 /// Returns [`TryRecvError::Empty`] when no event is yet
94 /// available, [`TryRecvError::Closed`] when the upstream
95 /// manager has been dropped and the buffer drained, and
96 /// [`TryRecvError::Lagged`] if the receiver fell behind the
97 /// channel tail.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// use dynomite::events::{EventManager, TryRecvError};
103 /// let mgr = EventManager::new(4);
104 /// let mut sub = mgr.subscribe();
105 /// assert!(matches!(sub.try_recv(), Err(TryRecvError::Empty)));
106 /// ```
107 pub fn try_recv(&mut self) -> Result<ClusterEvent, TryRecvError> {
108 match self.rx.try_recv() {
109 Ok(evt) => Ok(evt),
110 Err(error::TryRecvError::Empty) => Err(TryRecvError::Empty),
111 Err(error::TryRecvError::Closed) => Err(TryRecvError::Closed),
112 Err(error::TryRecvError::Lagged(n)) => Err(TryRecvError::Lagged(n)),
113 }
114 }
115}