Skip to main content

dynomite/embed/
events.rs

1//! Server-wide event stream.
2//!
3//! The embedding API exposes a [`tokio::sync::broadcast`] of
4//! [`ServerEvent`] values so consumers can observe peer
5//! transitions, gossip rounds, configuration reloads, and other
6//! cluster-wide signals. The handle returned by
7//! [`crate::embed::ServerHandle::subscribe_events`] wraps the
8//! broadcast receiver in [`EventStream`] for ergonomic polling.
9//!
10//! Only the engine publishes to the bus; embedding programs
11//! subscribe through [`EventBus::subscribe`] and consume via
12//! [`EventStream::recv`] / [`EventStream::try_recv`].
13//!
14//! # Examples
15//!
16//! ```
17//! use dynomite::embed::events::{ConnRoleTag, CloseReason, PeerDownReason, EventBus};
18//! let bus = EventBus::new(16);
19//! let _rx = bus.subscribe();
20//! assert_eq!(bus.subscriber_count(), 1);
21//! # let _ = (ConnRoleTag::Client, CloseReason::PeerEof, PeerDownReason::FailureDetector);
22//! ```
23
24use std::net::SocketAddr;
25
26use tokio::sync::broadcast;
27
28/// Identifier handed out for every accepted connection.
29pub type ConnId = u64;
30
31/// Identifier of a peer in the cluster pool.
32pub type PeerId = u32;
33
34/// Summary tag for the role of a connection at accept time.
35///
36/// Mirrors [`crate::io::reactor::ConnRole`] but renamed to keep
37/// the embed surface independent of the I/O substrate.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[non_exhaustive]
40pub enum ConnRoleTag {
41    /// Listener that accepted a client connection.
42    Proxy,
43    /// Connected client.
44    Client,
45    /// Outbound datastore connection.
46    Server,
47    /// Listener that accepted a peer connection.
48    DnodeProxy,
49    /// Inbound peer connection.
50    DnodeClient,
51    /// Outbound peer connection.
52    DnodeServer,
53}
54
55/// Reason carried with [`ServerEvent::ConnectionClosed`].
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57#[non_exhaustive]
58pub enum CloseReason {
59    /// The peer closed cleanly (FIN / EOF).
60    PeerEof,
61    /// The local side initiated the close (graceful shutdown).
62    LocalClose,
63    /// I/O error.
64    IoError,
65    /// Connection was idle past the configured timeout.
66    Timeout,
67}
68
69/// Reason carried with [`ServerEvent::PeerDown`].
70#[derive(Debug, Clone, PartialEq, Eq, Hash)]
71#[non_exhaustive]
72pub enum PeerDownReason {
73    /// Failure detector marked the peer down.
74    FailureDetector,
75    /// Auto-eject hosts policy ejected the peer.
76    AutoEjected,
77    /// Operator-initiated reload removed the peer.
78    Reconfigured,
79    /// Peer announced a graceful shutdown.
80    Leaving,
81}
82
83/// Cluster-wide event published on the broadcast channel.
84///
85/// The variants are non-exhaustive at the type level; consumers
86/// must use a wildcard arm so future additions remain
87/// non-breaking.
88///
89/// # Examples
90///
91/// ```
92/// use dynomite::embed::events::{ServerEvent, PeerDownReason};
93/// let e = ServerEvent::PeerDown { peer: 7, reason: PeerDownReason::FailureDetector };
94/// match e {
95///     ServerEvent::PeerDown { peer, .. } => assert_eq!(peer, 7),
96///     _ => panic!(),
97/// }
98/// ```
99#[derive(Debug, Clone)]
100#[non_exhaustive]
101pub enum ServerEvent {
102    /// A peer transitioned to a routable state.
103    PeerUp(PeerId),
104    /// A peer transitioned to an unroutable state.
105    PeerDown {
106        /// Peer index.
107        peer: PeerId,
108        /// Why the transition fired.
109        reason: PeerDownReason,
110    },
111    /// A configuration reload completed successfully.
112    ConfigReloaded {
113        /// Monotonic generation id.
114        generation: u64,
115    },
116    /// A periodic gossip pass finished.
117    GossipRound {
118        /// Round number (monotonic).
119        round: u64,
120        /// Number of distinct peers known after the round.
121        peers: u32,
122    },
123    /// Auto-eject policy ejected a peer.
124    AutoEjected {
125        /// Peer index.
126        peer: PeerId,
127        /// Failure count at eject time.
128        failures: u32,
129    },
130    /// Read-repair was triggered for a key.
131    RepairTriggered {
132        /// Hash of the key whose responses diverged.
133        key_hash: u64,
134        /// Datacenter that initiated the repair.
135        dc: String,
136    },
137    /// A connection was accepted on a listener.
138    ConnectionAccepted {
139        /// Synthetic connection id.
140        conn_id: ConnId,
141        /// Role of the accepted connection.
142        role: ConnRoleTag,
143        /// Local socket address; `None` for non-socket transports.
144        local_addr: Option<SocketAddr>,
145    },
146    /// A connection closed.
147    ConnectionClosed {
148        /// Connection id from the prior `ConnectionAccepted`.
149        conn_id: ConnId,
150        /// Why the connection closed.
151        reason: CloseReason,
152    },
153    /// The receiver fell behind the broadcast tail and missed
154    /// `missed` events. The next `recv` will resume from the
155    /// freshest event in the buffer.
156    Lagged {
157        /// Number of events the receiver missed.
158        missed: u64,
159    },
160}
161
162/// Event publisher held by the embed runtime.
163///
164/// `EventBus` is a thin wrapper around a
165/// [`tokio::sync::broadcast::Sender`] that drops events silently
166/// when no receivers are attached.
167#[derive(Debug, Clone)]
168pub struct EventBus {
169    tx: broadcast::Sender<ServerEvent>,
170}
171
172impl EventBus {
173    /// Build a fresh bus with the supplied channel capacity.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use dynomite::embed::events::EventBus;
179    /// let bus = EventBus::new(8);
180    /// assert_eq!(bus.subscriber_count(), 0);
181    /// ```
182    #[must_use]
183    pub fn new(capacity: usize) -> Self {
184        let cap = capacity.max(1);
185        let (tx, _) = broadcast::channel(cap);
186        Self { tx }
187    }
188
189    /// Subscribe a fresh receiver.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use dynomite::embed::events::EventBus;
195    /// let bus = EventBus::new(4);
196    /// let _rx = bus.subscribe();
197    /// assert!(bus.subscriber_count() >= 1);
198    /// ```
199    #[must_use]
200    pub fn subscribe(&self) -> EventStream {
201        EventStream {
202            rx: self.tx.subscribe(),
203        }
204    }
205
206    /// Publish an event. Returns the number of live subscribers
207    /// the event was delivered to (zero is normal during quiet
208    /// periods and is never an error).
209    ///
210    /// `pub(crate)` because only the engine itself is allowed to
211    /// publish events; embedding programs subscribe through
212    /// [`EventBus::subscribe`] and consume via
213    /// [`EventStream::recv`].
214    pub(crate) fn send(&self, event: ServerEvent) -> usize {
215        self.tx.send(event).unwrap_or(0)
216    }
217
218    /// Number of attached subscribers.
219    #[must_use]
220    pub fn subscriber_count(&self) -> usize {
221        self.tx.receiver_count()
222    }
223}
224
225/// Pollable async stream of [`ServerEvent`]s.
226///
227/// `EventStream` is the public-facing receiver returned by
228/// [`crate::embed::ServerHandle::subscribe_events`]. It wraps a
229/// [`tokio::sync::broadcast::Receiver`] and translates the
230/// `RecvError::Lagged` shape into a synthesized
231/// [`ServerEvent::Lagged`] so consumers can stay on the happy
232/// path.
233#[derive(Debug)]
234pub struct EventStream {
235    rx: broadcast::Receiver<ServerEvent>,
236}
237
238impl EventStream {
239    /// Receive the next event.
240    ///
241    /// Returns `None` when the bus is closed (the server has shut
242    /// down and dropped its [`EventBus`]). On lag, returns a
243    /// synthesized [`ServerEvent::Lagged`] and resumes from the
244    /// freshest event in the buffer.
245    ///
246    /// # Examples
247    ///
248    /// ```
249    /// use dynomite::embed::events::EventBus;
250    /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
251    /// // The bus is published-to only by the engine; subscribers
252    /// // poll for events. Dropping the bus closes the stream.
253    /// let bus = EventBus::new(2);
254    /// let mut s = bus.subscribe();
255    /// drop(bus);
256    /// assert!(s.recv().await.is_none());
257    /// # });
258    /// ```
259    pub async fn recv(&mut self) -> Option<ServerEvent> {
260        match self.rx.recv().await {
261            Ok(evt) => Some(evt),
262            Err(broadcast::error::RecvError::Closed) => None,
263            Err(broadcast::error::RecvError::Lagged(missed)) => {
264                Some(ServerEvent::Lagged { missed })
265            }
266        }
267    }
268
269    /// Non-blocking poll: returns the next event if one is
270    /// already buffered, else `None`.
271    ///
272    /// # Examples
273    ///
274    /// ```
275    /// use dynomite::embed::events::EventBus;
276    /// let bus = EventBus::new(2);
277    /// let mut s = bus.subscribe();
278    /// assert!(s.try_recv().is_none());
279    /// ```
280    pub fn try_recv(&mut self) -> Option<ServerEvent> {
281        match self.rx.try_recv() {
282            Ok(evt) => Some(evt),
283            Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
284                None
285            }
286            Err(broadcast::error::TryRecvError::Lagged(missed)) => {
287                Some(ServerEvent::Lagged { missed })
288            }
289        }
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[tokio::test]
298    async fn round_trip_ok() {
299        let bus = EventBus::new(4);
300        let mut s = bus.subscribe();
301        bus.send(ServerEvent::PeerUp(1));
302        let evt = s.recv().await.unwrap();
303        assert!(matches!(evt, ServerEvent::PeerUp(1)));
304    }
305
306    #[tokio::test]
307    async fn lagged_synthesised() {
308        let bus = EventBus::new(2);
309        let mut s = bus.subscribe();
310        for i in 0..8u64 {
311            bus.send(ServerEvent::ConfigReloaded { generation: i });
312        }
313        let first = s.recv().await.unwrap();
314        // Either Lagged or one of the surviving events.
315        assert!(matches!(
316            first,
317            ServerEvent::Lagged { .. } | ServerEvent::ConfigReloaded { .. }
318        ));
319    }
320
321    #[test]
322    fn try_recv_empty() {
323        let bus = EventBus::new(2);
324        let mut s = bus.subscribe();
325        assert!(s.try_recv().is_none());
326    }
327}