dynomite/cluster/peer.rs
1//! Cluster peer state.
2//!
3//! A [`Peer`] is the in-memory record for one dynomite node in the
4//! ring. Each peer carries its endpoint, rack, datacenter, token list,
5//! liveness state, and a handle to the outbound connection pool used
6//! when the dispatcher routes a request to it.
7//!
8//! The data shape mirrors the reference engine's per-peer node record
9//! (rack, dc, secure flag, same-DC flag, token list, state). Peers are
10//! held by [`crate::cluster::ServerPool`] in an `Arc<RwLock<_>>` so
11//! gossip and dispatch can both observe the table without taking out
12//! exclusive locks for read.
13//!
14//! # Examples
15//!
16//! ```
17//! use dynomite::cluster::peer::{Peer, PeerEndpoint, PeerState};
18//! use dynomite::hashkit::DynToken;
19//!
20//! let p = Peer::new(
21//! 0,
22//! PeerEndpoint::tcp("127.0.0.1".into(), 8101),
23//! "rack1".into(),
24//! "dc1".into(),
25//! vec![DynToken::from_u32(101_134_286)],
26//! true,
27//! true,
28//! false,
29//! );
30//! assert_eq!(p.rack(), "rack1");
31//! assert_eq!(p.state(), PeerState::Joining);
32//! ```
33
34use crate::hashkit::DynToken;
35
36/// Lifecycle state of a peer in the gossip view.
37///
38/// Numeric values match the reference engine's per-node state
39/// constants (`UNKNOWN`, `JOINING`, `NORMAL`, `STANDBY`, `DOWN`,
40/// `RESET`, `LEAVING`).
41#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
42#[repr(u8)]
43pub enum PeerState {
44 /// Initial state before the first observation.
45 #[default]
46 Unknown = 0,
47 /// Peer is bootstrapping into the ring.
48 Joining = 1,
49 /// Peer is healthy and serving traffic.
50 Normal = 2,
51 /// Peer is on warm standby; requests are not routed there.
52 Standby = 3,
53 /// Failure detector marked the peer down.
54 Down = 4,
55 /// Peer is being re-established (connection pool reset).
56 Reset = 5,
57 /// Peer is preparing to leave the ring.
58 Leaving = 6,
59}
60
61impl PeerState {
62 /// Stable string label.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// use dynomite::cluster::peer::PeerState;
68 /// assert_eq!(PeerState::Normal.name(), "NORMAL");
69 /// ```
70 #[must_use]
71 pub fn name(self) -> &'static str {
72 match self {
73 PeerState::Unknown => "UNKNOWN",
74 PeerState::Joining => "JOINING",
75 PeerState::Normal => "NORMAL",
76 PeerState::Standby => "STANDBY",
77 PeerState::Down => "DOWN",
78 PeerState::Reset => "RESET",
79 PeerState::Leaving => "LEAVING",
80 }
81 }
82
83 /// True when the dispatcher should consider this peer for
84 /// routing decisions. `Joining` peers are accepted because the
85 /// reference engine includes them in the continuum until they
86 /// transition to `Down` or `Leaving`.
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// use dynomite::cluster::peer::PeerState;
92 /// assert!(PeerState::Normal.is_routable());
93 /// assert!(!PeerState::Down.is_routable());
94 /// ```
95 #[must_use]
96 pub fn is_routable(self) -> bool {
97 matches!(self, PeerState::Normal | PeerState::Joining)
98 }
99}
100
101/// Network endpoint of a peer.
102#[derive(Clone, Debug, Eq, PartialEq, Hash)]
103pub struct PeerEndpoint {
104 host: String,
105 port: u16,
106}
107
108impl PeerEndpoint {
109 /// Construct a TCP endpoint.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use dynomite::cluster::peer::PeerEndpoint;
115 /// let ep = PeerEndpoint::tcp("10.0.0.1".into(), 8101);
116 /// assert_eq!(ep.host(), "10.0.0.1");
117 /// assert_eq!(ep.port(), 8101);
118 /// assert_eq!(ep.pname(), "10.0.0.1:8101");
119 /// ```
120 #[must_use]
121 pub fn tcp(host: String, port: u16) -> Self {
122 Self { host, port }
123 }
124
125 /// Hostname or numeric IP.
126 #[must_use]
127 pub fn host(&self) -> &str {
128 &self.host
129 }
130
131 /// TCP port.
132 #[must_use]
133 pub fn port(&self) -> u16 {
134 self.port
135 }
136
137 /// Colon-joined `host:port` string.
138 #[must_use]
139 pub fn pname(&self) -> String {
140 format!("{}:{}", self.host, self.port)
141 }
142}
143
144/// One peer in the cluster ring.
145#[derive(Clone, Debug)]
146pub struct Peer {
147 idx: u32,
148 endpoint: PeerEndpoint,
149 rack: String,
150 dc: String,
151 tokens: Vec<DynToken>,
152 is_local: bool,
153 is_same_dc: bool,
154 is_secure: bool,
155 state: PeerState,
156 failure_count: u32,
157 last_state_ts_secs: u64,
158 /// Phi-accrual failure detector for this peer. Fed by
159 /// gossip heartbeats; queried by the gossip task to decide
160 /// when to transition `state` to [`PeerState::Down`].
161 /// Initialised lazily on the first `record_heartbeat` call.
162 fd: crate::cluster::failure_detector::PhiAccrual,
163}
164
165impl Peer {
166 /// Build a new peer record.
167 ///
168 /// `idx` is the peer's index in the pool's peer array (0 is
169 /// always the local node). `is_local` and `is_same_dc` mirror
170 /// the reference engine's flags. The initial state is
171 /// [`PeerState::Joining`] for the local node and
172 /// [`PeerState::Down`] for a remote one (the reference engine
173 /// optimistically waits for the first gossip ack before
174 /// promoting a remote peer).
175 ///
176 /// # Examples
177 ///
178 /// ```
179 /// use dynomite::cluster::peer::{Peer, PeerEndpoint, PeerState};
180 /// use dynomite::hashkit::DynToken;
181 /// let p = Peer::new(
182 /// 1,
183 /// PeerEndpoint::tcp("h".into(), 1),
184 /// "r".into(),
185 /// "d".into(),
186 /// vec![DynToken::from_u32(0)],
187 /// false,
188 /// true,
189 /// false,
190 /// );
191 /// assert_eq!(p.idx(), 1);
192 /// assert_eq!(p.state(), PeerState::Down);
193 /// ```
194 #[must_use]
195 #[allow(clippy::too_many_arguments)]
196 pub fn new(
197 idx: u32,
198 endpoint: PeerEndpoint,
199 rack: String,
200 dc: String,
201 tokens: Vec<DynToken>,
202 is_local: bool,
203 is_same_dc: bool,
204 is_secure: bool,
205 ) -> Self {
206 let state = if is_local {
207 PeerState::Joining
208 } else {
209 PeerState::Down
210 };
211 Self {
212 idx,
213 endpoint,
214 rack,
215 dc,
216 tokens,
217 is_local,
218 is_same_dc,
219 is_secure,
220 state,
221 failure_count: 0,
222 last_state_ts_secs: 0,
223 fd: crate::cluster::failure_detector::PhiAccrual::default(),
224 }
225 }
226
227 /// Index of the peer in the pool's array.
228 #[must_use]
229 pub fn idx(&self) -> u32 {
230 self.idx
231 }
232
233 /// Endpoint reference.
234 #[must_use]
235 pub fn endpoint(&self) -> &PeerEndpoint {
236 &self.endpoint
237 }
238
239 /// Rack name.
240 #[must_use]
241 pub fn rack(&self) -> &str {
242 &self.rack
243 }
244
245 /// Datacenter name.
246 #[must_use]
247 pub fn dc(&self) -> &str {
248 &self.dc
249 }
250
251 /// Token list.
252 #[must_use]
253 pub fn tokens(&self) -> &[DynToken] {
254 &self.tokens
255 }
256
257 /// True for the local node.
258 #[must_use]
259 pub fn is_local(&self) -> bool {
260 self.is_local
261 }
262
263 /// True when the peer shares the local datacenter.
264 #[must_use]
265 pub fn is_same_dc(&self) -> bool {
266 self.is_same_dc
267 }
268
269 /// True when the peer expects an encrypted dnode link.
270 #[must_use]
271 pub fn is_secure(&self) -> bool {
272 self.is_secure
273 }
274
275 /// Current lifecycle state.
276 #[must_use]
277 pub fn state(&self) -> PeerState {
278 self.state
279 }
280
281 /// Update the lifecycle state. The supplied `ts_secs` mirrors
282 /// the reference engine's `gossip_node::ts` and is used by the
283 /// failure detector to age the peer out.
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// use dynomite::cluster::peer::{Peer, PeerEndpoint, PeerState};
289 /// use dynomite::hashkit::DynToken;
290 /// let mut p = Peer::new(
291 /// 0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
292 /// vec![DynToken::from_u32(0)], true, true, false,
293 /// );
294 /// p.set_state(PeerState::Normal, 42);
295 /// assert_eq!(p.state(), PeerState::Normal);
296 /// assert_eq!(p.last_state_ts_secs(), 42);
297 /// ```
298 pub fn set_state(&mut self, state: PeerState, ts_secs: u64) {
299 self.state = state;
300 self.last_state_ts_secs = ts_secs;
301 }
302
303 /// Last observed gossip timestamp (epoch seconds).
304 #[must_use]
305 pub fn last_state_ts_secs(&self) -> u64 {
306 self.last_state_ts_secs
307 }
308
309 /// Increment the consecutive-failure counter.
310 pub fn record_failure(&mut self) {
311 self.failure_count = self.failure_count.saturating_add(1);
312 }
313
314 /// Reset the consecutive-failure counter.
315 pub fn record_success(&mut self) {
316 self.failure_count = 0;
317 }
318
319 /// Current consecutive-failure count.
320 #[must_use]
321 pub fn failure_count(&self) -> u32 {
322 self.failure_count
323 }
324
325 /// Borrow the phi-accrual failure detector for this peer.
326 /// Used by the gossip task to record heartbeat arrivals and
327 /// to query the suspicion level on every tick.
328 #[must_use]
329 pub fn failure_detector(&self) -> &crate::cluster::failure_detector::PhiAccrual {
330 &self.fd
331 }
332
333 /// Mutably borrow the phi-accrual failure detector. Use
334 /// from the gossip / heartbeat task.
335 pub fn failure_detector_mut(&mut self) -> &mut crate::cluster::failure_detector::PhiAccrual {
336 &mut self.fd
337 }
338
339 /// First (primary) token of this peer, if any.
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use dynomite::cluster::peer::{Peer, PeerEndpoint};
345 /// use dynomite::hashkit::DynToken;
346 /// let p = Peer::new(
347 /// 0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
348 /// vec![DynToken::from_u32(7)], true, true, false,
349 /// );
350 /// assert_eq!(p.primary_token().unwrap().get_int(), 7);
351 /// ```
352 #[must_use]
353 pub fn primary_token(&self) -> Option<&DynToken> {
354 self.tokens.first()
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 fn mk(rack: &str, dc: &str, is_local: bool, is_same_dc: bool) -> Peer {
363 Peer::new(
364 0,
365 PeerEndpoint::tcp("127.0.0.1".into(), 8101),
366 rack.into(),
367 dc.into(),
368 vec![DynToken::from_u32(1)],
369 is_local,
370 is_same_dc,
371 false,
372 )
373 }
374
375 #[test]
376 fn local_peer_starts_joining() {
377 let p = mk("r", "d", true, true);
378 assert_eq!(p.state(), PeerState::Joining);
379 }
380
381 #[test]
382 fn remote_peer_starts_down() {
383 let p = mk("r", "d", false, true);
384 assert_eq!(p.state(), PeerState::Down);
385 }
386
387 #[test]
388 fn state_names_round_trip() {
389 for s in [
390 PeerState::Unknown,
391 PeerState::Joining,
392 PeerState::Normal,
393 PeerState::Standby,
394 PeerState::Down,
395 PeerState::Reset,
396 PeerState::Leaving,
397 ] {
398 assert!(!s.name().is_empty());
399 }
400 }
401
402 #[test]
403 fn failure_counter_works() {
404 let mut p = mk("r", "d", false, true);
405 p.record_failure();
406 p.record_failure();
407 assert_eq!(p.failure_count(), 2);
408 p.record_success();
409 assert_eq!(p.failure_count(), 0);
410 }
411
412 #[test]
413 fn routable_states() {
414 assert!(PeerState::Normal.is_routable());
415 assert!(PeerState::Joining.is_routable());
416 assert!(!PeerState::Down.is_routable());
417 assert!(!PeerState::Leaving.is_routable());
418 }
419}