dynomite/embed/events.rs
1//! Server-wide event stream.
2//!
3//! The embedding API exposes a [`tokio::sync::broadcast`] of
4//! [`ServerEvent`] values so consumers can observe peer
5//! transitions, gossip rounds, configuration reloads, and other
6//! cluster-wide signals. The handle returned by
7//! [`crate::embed::ServerHandle::subscribe_events`] wraps the
8//! broadcast receiver in [`EventStream`] for ergonomic polling.
9//!
10//! Only the engine publishes to the bus; embedding programs
11//! subscribe through [`EventBus::subscribe`] and consume via
12//! [`EventStream::recv`] / [`EventStream::try_recv`].
13//!
14//! # Examples
15//!
16//! ```
17//! use dynomite::embed::events::{ConnRoleTag, CloseReason, PeerDownReason, EventBus};
18//! let bus = EventBus::new(16);
19//! let _rx = bus.subscribe();
20//! assert_eq!(bus.subscriber_count(), 1);
21//! # let _ = (ConnRoleTag::Client, CloseReason::PeerEof, PeerDownReason::FailureDetector);
22//! ```
23
24use std::net::SocketAddr;
25
26use tokio::sync::broadcast;
27
28/// Identifier handed out for every accepted connection.
29pub type ConnId = u64;
30
31/// Identifier of a peer in the cluster pool.
32pub type PeerId = u32;
33
34/// Summary tag for the role of a connection at accept time.
35///
36/// Mirrors [`crate::io::reactor::ConnRole`] but renamed to keep
37/// the embed surface independent of the I/O substrate.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[non_exhaustive]
40pub enum ConnRoleTag {
41 /// Listener that accepted a client connection.
42 Proxy,
43 /// Connected client.
44 Client,
45 /// Outbound datastore connection.
46 Server,
47 /// Listener that accepted a peer connection.
48 DnodeProxy,
49 /// Inbound peer connection.
50 DnodeClient,
51 /// Outbound peer connection.
52 DnodeServer,
53}
54
55/// Reason carried with [`ServerEvent::ConnectionClosed`].
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57#[non_exhaustive]
58pub enum CloseReason {
59 /// The peer closed cleanly (FIN / EOF).
60 PeerEof,
61 /// The local side initiated the close (graceful shutdown).
62 LocalClose,
63 /// I/O error.
64 IoError,
65 /// Connection was idle past the configured timeout.
66 Timeout,
67}
68
69/// Reason carried with [`ServerEvent::PeerDown`].
70#[derive(Debug, Clone, PartialEq, Eq, Hash)]
71#[non_exhaustive]
72pub enum PeerDownReason {
73 /// Failure detector marked the peer down.
74 FailureDetector,
75 /// Auto-eject hosts policy ejected the peer.
76 AutoEjected,
77 /// Operator-initiated reload removed the peer.
78 Reconfigured,
79 /// Peer announced a graceful shutdown.
80 Leaving,
81}
82
83/// Cluster-wide event published on the broadcast channel.
84///
85/// The variants are non-exhaustive at the type level; consumers
86/// must use a wildcard arm so future additions remain
87/// non-breaking.
88///
89/// # Examples
90///
91/// ```
92/// use dynomite::embed::events::{ServerEvent, PeerDownReason};
93/// let e = ServerEvent::PeerDown { peer: 7, reason: PeerDownReason::FailureDetector };
94/// match e {
95/// ServerEvent::PeerDown { peer, .. } => assert_eq!(peer, 7),
96/// _ => panic!(),
97/// }
98/// ```
99#[derive(Debug, Clone)]
100#[non_exhaustive]
101pub enum ServerEvent {
102 /// A peer transitioned to a routable state.
103 PeerUp(PeerId),
104 /// A peer transitioned to an unroutable state.
105 PeerDown {
106 /// Peer index.
107 peer: PeerId,
108 /// Why the transition fired.
109 reason: PeerDownReason,
110 },
111 /// A configuration reload completed successfully.
112 ConfigReloaded {
113 /// Monotonic generation id.
114 generation: u64,
115 },
116 /// A periodic gossip pass finished.
117 GossipRound {
118 /// Round number (monotonic).
119 round: u64,
120 /// Number of distinct peers known after the round.
121 peers: u32,
122 },
123 /// Auto-eject policy ejected a peer.
124 AutoEjected {
125 /// Peer index.
126 peer: PeerId,
127 /// Failure count at eject time.
128 failures: u32,
129 },
130 /// Read-repair was triggered for a key.
131 RepairTriggered {
132 /// Hash of the key whose responses diverged.
133 key_hash: u64,
134 /// Datacenter that initiated the repair.
135 dc: String,
136 },
137 /// A connection was accepted on a listener.
138 ConnectionAccepted {
139 /// Synthetic connection id.
140 conn_id: ConnId,
141 /// Role of the accepted connection.
142 role: ConnRoleTag,
143 /// Local socket address; `None` for non-socket transports.
144 local_addr: Option<SocketAddr>,
145 },
146 /// A connection closed.
147 ConnectionClosed {
148 /// Connection id from the prior `ConnectionAccepted`.
149 conn_id: ConnId,
150 /// Why the connection closed.
151 reason: CloseReason,
152 },
153 /// The receiver fell behind the broadcast tail and missed
154 /// `missed` events. The next `recv` will resume from the
155 /// freshest event in the buffer.
156 Lagged {
157 /// Number of events the receiver missed.
158 missed: u64,
159 },
160}
161
162/// Event publisher held by the embed runtime.
163///
164/// `EventBus` is a thin wrapper around a
165/// [`tokio::sync::broadcast::Sender`] that drops events silently
166/// when no receivers are attached.
167#[derive(Debug, Clone)]
168pub struct EventBus {
169 tx: broadcast::Sender<ServerEvent>,
170}
171
172impl EventBus {
173 /// Build a fresh bus with the supplied channel capacity.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// use dynomite::embed::events::EventBus;
179 /// let bus = EventBus::new(8);
180 /// assert_eq!(bus.subscriber_count(), 0);
181 /// ```
182 #[must_use]
183 pub fn new(capacity: usize) -> Self {
184 let cap = capacity.max(1);
185 let (tx, _) = broadcast::channel(cap);
186 Self { tx }
187 }
188
189 /// Subscribe a fresh receiver.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// use dynomite::embed::events::EventBus;
195 /// let bus = EventBus::new(4);
196 /// let _rx = bus.subscribe();
197 /// assert!(bus.subscriber_count() >= 1);
198 /// ```
199 #[must_use]
200 pub fn subscribe(&self) -> EventStream {
201 EventStream {
202 rx: self.tx.subscribe(),
203 }
204 }
205
206 /// Publish an event. Returns the number of live subscribers
207 /// the event was delivered to (zero is normal during quiet
208 /// periods and is never an error).
209 ///
210 /// `pub(crate)` because only the engine itself is allowed to
211 /// publish events; embedding programs subscribe through
212 /// [`EventBus::subscribe`] and consume via
213 /// [`EventStream::recv`].
214 pub(crate) fn send(&self, event: ServerEvent) -> usize {
215 self.tx.send(event).unwrap_or(0)
216 }
217
218 /// Number of attached subscribers.
219 #[must_use]
220 pub fn subscriber_count(&self) -> usize {
221 self.tx.receiver_count()
222 }
223}
224
225/// Pollable async stream of [`ServerEvent`]s.
226///
227/// `EventStream` is the public-facing receiver returned by
228/// [`crate::embed::ServerHandle::subscribe_events`]. It wraps a
229/// [`tokio::sync::broadcast::Receiver`] and translates the
230/// `RecvError::Lagged` shape into a synthesized
231/// [`ServerEvent::Lagged`] so consumers can stay on the happy
232/// path.
233#[derive(Debug)]
234pub struct EventStream {
235 rx: broadcast::Receiver<ServerEvent>,
236}
237
238impl EventStream {
239 /// Receive the next event.
240 ///
241 /// Returns `None` when the bus is closed (the server has shut
242 /// down and dropped its [`EventBus`]). On lag, returns a
243 /// synthesized [`ServerEvent::Lagged`] and resumes from the
244 /// freshest event in the buffer.
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use dynomite::embed::events::EventBus;
250 /// # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
251 /// // The bus is published-to only by the engine; subscribers
252 /// // poll for events. Dropping the bus closes the stream.
253 /// let bus = EventBus::new(2);
254 /// let mut s = bus.subscribe();
255 /// drop(bus);
256 /// assert!(s.recv().await.is_none());
257 /// # });
258 /// ```
259 pub async fn recv(&mut self) -> Option<ServerEvent> {
260 match self.rx.recv().await {
261 Ok(evt) => Some(evt),
262 Err(broadcast::error::RecvError::Closed) => None,
263 Err(broadcast::error::RecvError::Lagged(missed)) => {
264 Some(ServerEvent::Lagged { missed })
265 }
266 }
267 }
268
269 /// Non-blocking poll: returns the next event if one is
270 /// already buffered, else `None`.
271 ///
272 /// # Examples
273 ///
274 /// ```
275 /// use dynomite::embed::events::EventBus;
276 /// let bus = EventBus::new(2);
277 /// let mut s = bus.subscribe();
278 /// assert!(s.try_recv().is_none());
279 /// ```
280 pub fn try_recv(&mut self) -> Option<ServerEvent> {
281 match self.rx.try_recv() {
282 Ok(evt) => Some(evt),
283 Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
284 None
285 }
286 Err(broadcast::error::TryRecvError::Lagged(missed)) => {
287 Some(ServerEvent::Lagged { missed })
288 }
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[tokio::test]
298 async fn round_trip_ok() {
299 let bus = EventBus::new(4);
300 let mut s = bus.subscribe();
301 bus.send(ServerEvent::PeerUp(1));
302 let evt = s.recv().await.unwrap();
303 assert!(matches!(evt, ServerEvent::PeerUp(1)));
304 }
305
306 #[tokio::test]
307 async fn lagged_synthesised() {
308 let bus = EventBus::new(2);
309 let mut s = bus.subscribe();
310 for i in 0..8u64 {
311 bus.send(ServerEvent::ConfigReloaded { generation: i });
312 }
313 let first = s.recv().await.unwrap();
314 // Either Lagged or one of the surviving events.
315 assert!(matches!(
316 first,
317 ServerEvent::Lagged { .. } | ServerEvent::ConfigReloaded { .. }
318 ));
319 }
320
321 #[test]
322 fn try_recv_empty() {
323 let bus = EventBus::new(2);
324 let mut s = bus.subscribe();
325 assert!(s.try_recv().is_none());
326 }
327}