Skip to main content

dynomite/events/
mod.rs

1//! Cluster events: a structured fan-out channel for state
2//! transitions that test harnesses, admin tools, and OTLP
3//! appenders can consume programmatically.
4//!
5//! The engine already publishes cluster signals to two
6//! observability surfaces - tracing spans and Prometheus
7//! counters - which are excellent for humans and dashboards but
8//! awkward to consume from code. This module adds a third
9//! surface: a strongly-typed, OTP `gen_event`-style broadcast of
10//! [`ClusterEvent`] values backed by a
11//! [`tokio::sync::broadcast`] channel. Consumers subscribe via
12//! [`EventManager::subscribe`] and read events through
13//! [`Subscriber::recv`] / [`Subscriber::try_recv`].
14//!
15//! Slow subscribers do not block publishers. When a subscriber
16//! falls behind the channel tail, [`Subscriber::recv`] returns
17//! [`SubscriberError::Lagged`] reporting the count of dropped
18//! events, then resumes from the freshest event in the buffer.
19//!
20//! # Examples
21//!
22//! ```
23//! use std::time::SystemTime;
24//! use dynomite::events::{ClusterEvent, EventManager};
25//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
26//! let mgr = EventManager::new(16);
27//! let mut sub = mgr.subscribe();
28//! mgr.publish(ClusterEvent::PeerUp {
29//!     peer_id: 7,
30//!     dc: "dc1".to_string(),
31//!     ts: SystemTime::now(),
32//! });
33//! match sub.recv().await.unwrap() {
34//!     ClusterEvent::PeerUp { peer_id, .. } => assert_eq!(peer_id, 7),
35//!     _ => panic!("unexpected event"),
36//! }
37//! # });
38//! ```
39
40mod manager;
41mod subscriber;
42
43use std::time::{Duration, SystemTime};
44
45pub use self::manager::EventManager;
46pub use self::subscriber::{Subscriber, SubscriberError, TryRecvError};
47
48use crate::hashkit::DynToken;
49
50/// Identifier of a peer in the cluster pool.
51///
52/// Mirrors [`crate::cluster::peer::Peer::idx`] and the existing
53/// [`crate::embed::events::PeerId`] alias so the two event
54/// surfaces remain index-compatible.
55pub type PeerId = u32;
56
57/// Half-open token range `[start, end)` covering one slice of
58/// the consistent-hashing ring.
59///
60/// Used by AAE exchanges to identify which partition is being
61/// reconciled. The range is interpreted as wrap-around when
62/// `start >= end`, matching the ring traversal semantics in
63/// [`crate::cluster::pool`].
64///
65/// # Examples
66///
67/// ```
68/// use dynomite::events::TokenRange;
69/// use dynomite::hashkit::DynToken;
70/// let r = TokenRange::new(DynToken::from_u32(0), DynToken::from_u32(1024));
71/// assert_eq!(r.start(), &DynToken::from_u32(0));
72/// assert_eq!(r.end(), &DynToken::from_u32(1024));
73/// ```
74#[derive(Clone, Debug, PartialEq, Eq, Hash)]
75pub struct TokenRange {
76    start: DynToken,
77    end: DynToken,
78}
79
80impl TokenRange {
81    /// Build a fresh token range covering `[start, end)`.
82    #[must_use]
83    pub fn new(start: DynToken, end: DynToken) -> Self {
84        Self { start, end }
85    }
86
87    /// Inclusive lower bound.
88    #[must_use]
89    pub fn start(&self) -> &DynToken {
90        &self.start
91    }
92
93    /// Exclusive upper bound.
94    #[must_use]
95    pub fn end(&self) -> &DynToken {
96        &self.end
97    }
98}
99
100/// Structured cluster-event payload published on the
101/// [`EventManager`] broadcast.
102///
103/// Variants are non-exhaustive at the type level; consumers must
104/// always include a wildcard arm so future additions remain
105/// non-breaking.
106///
107/// # Examples
108///
109/// ```
110/// use std::time::SystemTime;
111/// use dynomite::events::ClusterEvent;
112/// let e = ClusterEvent::PeerUp { peer_id: 1, dc: "dc1".into(), ts: SystemTime::now() };
113/// match e {
114///     ClusterEvent::PeerUp { peer_id, .. } => assert_eq!(peer_id, 1),
115///     _ => panic!("unexpected"),
116/// }
117/// ```
118#[derive(Clone, Debug)]
119#[non_exhaustive]
120pub enum ClusterEvent {
121    /// A peer transitioned to a routable state.
122    PeerUp {
123        /// Peer index.
124        peer_id: PeerId,
125        /// Datacenter the peer belongs to.
126        dc: String,
127        /// Wall-clock timestamp at which the engine observed
128        /// the transition.
129        ts: SystemTime,
130    },
131    /// A peer transitioned to an unroutable state because the
132    /// failure detector crossed its phi threshold.
133    PeerDown {
134        /// Peer index.
135        peer_id: PeerId,
136        /// Datacenter the peer belongs to.
137        dc: String,
138        /// Phi value observed at the moment of the transition.
139        phi: f64,
140        /// Wall-clock timestamp at which the engine observed
141        /// the transition.
142        ts: SystemTime,
143    },
144    /// One periodic gossip pass finished.
145    GossipRoundComplete {
146        /// Wall-clock duration spent in the round.
147        duration: Duration,
148        /// Number of distinct peers visited during the round.
149        peers_seen: usize,
150        /// Wall-clock timestamp at which the round finished.
151        ts: SystemTime,
152    },
153    /// An anti-entropy exchange started against a peer.
154    AaeExchangeStarted {
155        /// Peer the exchange is running against.
156        with_peer: PeerId,
157        /// Token range the exchange covers.
158        partition: TokenRange,
159        /// Wall-clock timestamp at which the exchange started.
160        ts: SystemTime,
161    },
162    /// An anti-entropy exchange finished against a peer.
163    AaeExchangeCompleted {
164        /// Peer the exchange ran against.
165        with_peer: PeerId,
166        /// Token range the exchange covered.
167        partition: TokenRange,
168        /// Number of keys repaired during the exchange.
169        repaired: u64,
170        /// Wall-clock timestamp at which the exchange finished.
171        ts: SystemTime,
172    },
173    /// A peer was observed restarting (its incarnation changed
174    /// or its peering session was re-established after a clean
175    /// shutdown).
176    RestartObserved {
177        /// Peer index.
178        peer_id: PeerId,
179        /// Wall-clock timestamp at which the restart was
180        /// observed.
181        ts: SystemTime,
182    },
183    /// The token ring topology changed: a peer was added,
184    /// removed, or its tokens were reassigned.
185    RingChanged {
186        /// Free-form tag describing the trigger (e.g.
187        /// `"seed-discovery"`, `"reconfigure"`, `"shutdown"`).
188        tag: String,
189        /// Wall-clock timestamp at which the change was
190        /// applied.
191        ts: SystemTime,
192    },
193}