dynomite/events/mod.rs
1//! Cluster events: a structured fan-out channel for state
2//! transitions that test harnesses, admin tools, and OTLP
3//! appenders can consume programmatically.
4//!
5//! The engine already publishes cluster signals to two
6//! observability surfaces - tracing spans and Prometheus
7//! counters - which are excellent for humans and dashboards but
8//! awkward to consume from code. This module adds a third
9//! surface: a strongly-typed, OTP `gen_event`-style broadcast of
10//! [`ClusterEvent`] values backed by a
11//! [`tokio::sync::broadcast`] channel. Consumers subscribe via
12//! [`EventManager::subscribe`] and read events through
13//! [`Subscriber::recv`] / [`Subscriber::try_recv`].
14//!
15//! Slow subscribers do not block publishers. When a subscriber
16//! falls behind the channel tail, [`Subscriber::recv`] returns
17//! [`SubscriberError::Lagged`] reporting the count of dropped
18//! events, then resumes from the freshest event in the buffer.
19//!
20//! # Examples
21//!
22//! ```
23//! use std::time::SystemTime;
24//! use dynomite::events::{ClusterEvent, EventManager};
25//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
26//! let mgr = EventManager::new(16);
27//! let mut sub = mgr.subscribe();
28//! mgr.publish(ClusterEvent::PeerUp {
29//! peer_id: 7,
30//! dc: "dc1".to_string(),
31//! ts: SystemTime::now(),
32//! });
33//! match sub.recv().await.unwrap() {
34//! ClusterEvent::PeerUp { peer_id, .. } => assert_eq!(peer_id, 7),
35//! _ => panic!("unexpected event"),
36//! }
37//! # });
38//! ```
39
40mod manager;
41mod subscriber;
42
43use std::time::{Duration, SystemTime};
44
45pub use self::manager::EventManager;
46pub use self::subscriber::{Subscriber, SubscriberError, TryRecvError};
47
48use crate::hashkit::DynToken;
49
50/// Identifier of a peer in the cluster pool.
51///
52/// Mirrors [`crate::cluster::peer::Peer::idx`] and the existing
53/// [`crate::embed::events::PeerId`] alias so the two event
54/// surfaces remain index-compatible.
55pub type PeerId = u32;
56
57/// Half-open token range `[start, end)` covering one slice of
58/// the consistent-hashing ring.
59///
60/// Used by AAE exchanges to identify which partition is being
61/// reconciled. The range is interpreted as wrap-around when
62/// `start >= end`, matching the ring traversal semantics in
63/// [`crate::cluster::pool`].
64///
65/// # Examples
66///
67/// ```
68/// use dynomite::events::TokenRange;
69/// use dynomite::hashkit::DynToken;
70/// let r = TokenRange::new(DynToken::from_u32(0), DynToken::from_u32(1024));
71/// assert_eq!(r.start(), &DynToken::from_u32(0));
72/// assert_eq!(r.end(), &DynToken::from_u32(1024));
73/// ```
74#[derive(Clone, Debug, PartialEq, Eq, Hash)]
75pub struct TokenRange {
76 start: DynToken,
77 end: DynToken,
78}
79
80impl TokenRange {
81 /// Build a fresh token range covering `[start, end)`.
82 #[must_use]
83 pub fn new(start: DynToken, end: DynToken) -> Self {
84 Self { start, end }
85 }
86
87 /// Inclusive lower bound.
88 #[must_use]
89 pub fn start(&self) -> &DynToken {
90 &self.start
91 }
92
93 /// Exclusive upper bound.
94 #[must_use]
95 pub fn end(&self) -> &DynToken {
96 &self.end
97 }
98}
99
100/// Structured cluster-event payload published on the
101/// [`EventManager`] broadcast.
102///
103/// Variants are non-exhaustive at the type level; consumers must
104/// always include a wildcard arm so future additions remain
105/// non-breaking.
106///
107/// # Examples
108///
109/// ```
110/// use std::time::SystemTime;
111/// use dynomite::events::ClusterEvent;
112/// let e = ClusterEvent::PeerUp { peer_id: 1, dc: "dc1".into(), ts: SystemTime::now() };
113/// match e {
114/// ClusterEvent::PeerUp { peer_id, .. } => assert_eq!(peer_id, 1),
115/// _ => panic!("unexpected"),
116/// }
117/// ```
118#[derive(Clone, Debug)]
119#[non_exhaustive]
120pub enum ClusterEvent {
121 /// A peer transitioned to a routable state.
122 PeerUp {
123 /// Peer index.
124 peer_id: PeerId,
125 /// Datacenter the peer belongs to.
126 dc: String,
127 /// Wall-clock timestamp at which the engine observed
128 /// the transition.
129 ts: SystemTime,
130 },
131 /// A peer transitioned to an unroutable state because the
132 /// failure detector crossed its phi threshold.
133 PeerDown {
134 /// Peer index.
135 peer_id: PeerId,
136 /// Datacenter the peer belongs to.
137 dc: String,
138 /// Phi value observed at the moment of the transition.
139 phi: f64,
140 /// Wall-clock timestamp at which the engine observed
141 /// the transition.
142 ts: SystemTime,
143 },
144 /// One periodic gossip pass finished.
145 GossipRoundComplete {
146 /// Wall-clock duration spent in the round.
147 duration: Duration,
148 /// Number of distinct peers visited during the round.
149 peers_seen: usize,
150 /// Wall-clock timestamp at which the round finished.
151 ts: SystemTime,
152 },
153 /// An anti-entropy exchange started against a peer.
154 AaeExchangeStarted {
155 /// Peer the exchange is running against.
156 with_peer: PeerId,
157 /// Token range the exchange covers.
158 partition: TokenRange,
159 /// Wall-clock timestamp at which the exchange started.
160 ts: SystemTime,
161 },
162 /// An anti-entropy exchange finished against a peer.
163 AaeExchangeCompleted {
164 /// Peer the exchange ran against.
165 with_peer: PeerId,
166 /// Token range the exchange covered.
167 partition: TokenRange,
168 /// Number of keys repaired during the exchange.
169 repaired: u64,
170 /// Wall-clock timestamp at which the exchange finished.
171 ts: SystemTime,
172 },
173 /// A peer was observed restarting (its incarnation changed
174 /// or its peering session was re-established after a clean
175 /// shutdown).
176 RestartObserved {
177 /// Peer index.
178 peer_id: PeerId,
179 /// Wall-clock timestamp at which the restart was
180 /// observed.
181 ts: SystemTime,
182 },
183 /// The token ring topology changed: a peer was added,
184 /// removed, or its tokens were reassigned.
185 RingChanged {
186 /// Free-form tag describing the trigger (e.g.
187 /// `"seed-discovery"`, `"reconfigure"`, `"shutdown"`).
188 tag: String,
189 /// Wall-clock timestamp at which the change was
190 /// applied.
191 ts: SystemTime,
192 },
193}