Skip to main content

dynomite/cluster/
peer.rs

1//! Cluster peer state.
2//!
3//! A [`Peer`] is the in-memory record for one dynomite node in the
4//! ring. Each peer carries its endpoint, rack, datacenter, token list,
5//! liveness state, and a handle to the outbound connection pool used
6//! when the dispatcher routes a request to it.
7//!
8//! The data shape mirrors the reference engine's per-peer node record
9//! (rack, dc, secure flag, same-DC flag, token list, state). Peers are
10//! held by [`crate::cluster::ServerPool`] in an `Arc<RwLock<_>>` so
11//! gossip and dispatch can both observe the table without taking out
12//! exclusive locks for read.
13//!
14//! # Examples
15//!
16//! ```
17//! use dynomite::cluster::peer::{Peer, PeerEndpoint, PeerState};
18//! use dynomite::hashkit::DynToken;
19//!
20//! let p = Peer::new(
21//!     0,
22//!     PeerEndpoint::tcp("127.0.0.1".into(), 8101),
23//!     "rack1".into(),
24//!     "dc1".into(),
25//!     vec![DynToken::from_u32(101_134_286)],
26//!     true,
27//!     true,
28//!     false,
29//! );
30//! assert_eq!(p.rack(), "rack1");
31//! assert_eq!(p.state(), PeerState::Joining);
32//! ```
33
34use crate::hashkit::DynToken;
35
36/// Lifecycle state of a peer in the gossip view.
37///
38/// Numeric values match the reference engine's per-node state
39/// constants (`UNKNOWN`, `JOINING`, `NORMAL`, `STANDBY`, `DOWN`,
40/// `RESET`, `LEAVING`).
41#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
42#[repr(u8)]
43pub enum PeerState {
44    /// Initial state before the first observation.
45    #[default]
46    Unknown = 0,
47    /// Peer is bootstrapping into the ring.
48    Joining = 1,
49    /// Peer is healthy and serving traffic.
50    Normal = 2,
51    /// Peer is on warm standby; requests are not routed there.
52    Standby = 3,
53    /// Failure detector marked the peer down.
54    Down = 4,
55    /// Peer is being re-established (connection pool reset).
56    Reset = 5,
57    /// Peer is preparing to leave the ring.
58    Leaving = 6,
59}
60
61impl PeerState {
62    /// Stable string label.
63    ///
64    /// # Examples
65    ///
66    /// ```
67    /// use dynomite::cluster::peer::PeerState;
68    /// assert_eq!(PeerState::Normal.name(), "NORMAL");
69    /// ```
70    #[must_use]
71    pub fn name(self) -> &'static str {
72        match self {
73            PeerState::Unknown => "UNKNOWN",
74            PeerState::Joining => "JOINING",
75            PeerState::Normal => "NORMAL",
76            PeerState::Standby => "STANDBY",
77            PeerState::Down => "DOWN",
78            PeerState::Reset => "RESET",
79            PeerState::Leaving => "LEAVING",
80        }
81    }
82
83    /// True when the dispatcher should consider this peer for
84    /// routing decisions. `Joining` peers are accepted because the
85    /// reference engine includes them in the continuum until they
86    /// transition to `Down` or `Leaving`.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use dynomite::cluster::peer::PeerState;
92    /// assert!(PeerState::Normal.is_routable());
93    /// assert!(!PeerState::Down.is_routable());
94    /// ```
95    #[must_use]
96    pub fn is_routable(self) -> bool {
97        matches!(self, PeerState::Normal | PeerState::Joining)
98    }
99}
100
101/// Network endpoint of a peer.
102#[derive(Clone, Debug, Eq, PartialEq, Hash)]
103pub struct PeerEndpoint {
104    host: String,
105    port: u16,
106}
107
108impl PeerEndpoint {
109    /// Construct a TCP endpoint.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use dynomite::cluster::peer::PeerEndpoint;
115    /// let ep = PeerEndpoint::tcp("10.0.0.1".into(), 8101);
116    /// assert_eq!(ep.host(), "10.0.0.1");
117    /// assert_eq!(ep.port(), 8101);
118    /// assert_eq!(ep.pname(), "10.0.0.1:8101");
119    /// ```
120    #[must_use]
121    pub fn tcp(host: String, port: u16) -> Self {
122        Self { host, port }
123    }
124
125    /// Hostname or numeric IP.
126    #[must_use]
127    pub fn host(&self) -> &str {
128        &self.host
129    }
130
131    /// TCP port.
132    #[must_use]
133    pub fn port(&self) -> u16 {
134        self.port
135    }
136
137    /// Colon-joined `host:port` string.
138    #[must_use]
139    pub fn pname(&self) -> String {
140        format!("{}:{}", self.host, self.port)
141    }
142}
143
144/// One peer in the cluster ring.
145#[derive(Clone, Debug)]
146pub struct Peer {
147    idx: u32,
148    endpoint: PeerEndpoint,
149    rack: String,
150    dc: String,
151    tokens: Vec<DynToken>,
152    is_local: bool,
153    is_same_dc: bool,
154    is_secure: bool,
155    state: PeerState,
156    failure_count: u32,
157    last_state_ts_secs: u64,
158    /// Phi-accrual failure detector for this peer. Fed by
159    /// gossip heartbeats; queried by the gossip task to decide
160    /// when to transition `state` to [`PeerState::Down`].
161    /// Initialised lazily on the first `record_heartbeat` call.
162    fd: crate::cluster::failure_detector::PhiAccrual,
163}
164
165impl Peer {
166    /// Build a new peer record.
167    ///
168    /// `idx` is the peer's index in the pool's peer array (0 is
169    /// always the local node). `is_local` and `is_same_dc` mirror
170    /// the reference engine's flags. The initial state is
171    /// [`PeerState::Joining`] for the local node and
172    /// [`PeerState::Down`] for a remote one (the reference engine
173    /// optimistically waits for the first gossip ack before
174    /// promoting a remote peer).
175    ///
176    /// # Examples
177    ///
178    /// ```
179    /// use dynomite::cluster::peer::{Peer, PeerEndpoint, PeerState};
180    /// use dynomite::hashkit::DynToken;
181    /// let p = Peer::new(
182    ///     1,
183    ///     PeerEndpoint::tcp("h".into(), 1),
184    ///     "r".into(),
185    ///     "d".into(),
186    ///     vec![DynToken::from_u32(0)],
187    ///     false,
188    ///     true,
189    ///     false,
190    /// );
191    /// assert_eq!(p.idx(), 1);
192    /// assert_eq!(p.state(), PeerState::Down);
193    /// ```
194    #[must_use]
195    #[allow(clippy::too_many_arguments)]
196    pub fn new(
197        idx: u32,
198        endpoint: PeerEndpoint,
199        rack: String,
200        dc: String,
201        tokens: Vec<DynToken>,
202        is_local: bool,
203        is_same_dc: bool,
204        is_secure: bool,
205    ) -> Self {
206        let state = if is_local {
207            PeerState::Joining
208        } else {
209            PeerState::Down
210        };
211        Self {
212            idx,
213            endpoint,
214            rack,
215            dc,
216            tokens,
217            is_local,
218            is_same_dc,
219            is_secure,
220            state,
221            failure_count: 0,
222            last_state_ts_secs: 0,
223            fd: crate::cluster::failure_detector::PhiAccrual::default(),
224        }
225    }
226
227    /// Index of the peer in the pool's array.
228    #[must_use]
229    pub fn idx(&self) -> u32 {
230        self.idx
231    }
232
233    /// Endpoint reference.
234    #[must_use]
235    pub fn endpoint(&self) -> &PeerEndpoint {
236        &self.endpoint
237    }
238
239    /// Rack name.
240    #[must_use]
241    pub fn rack(&self) -> &str {
242        &self.rack
243    }
244
245    /// Datacenter name.
246    #[must_use]
247    pub fn dc(&self) -> &str {
248        &self.dc
249    }
250
251    /// Token list.
252    #[must_use]
253    pub fn tokens(&self) -> &[DynToken] {
254        &self.tokens
255    }
256
257    /// True for the local node.
258    #[must_use]
259    pub fn is_local(&self) -> bool {
260        self.is_local
261    }
262
263    /// True when the peer shares the local datacenter.
264    #[must_use]
265    pub fn is_same_dc(&self) -> bool {
266        self.is_same_dc
267    }
268
269    /// True when the peer expects an encrypted dnode link.
270    #[must_use]
271    pub fn is_secure(&self) -> bool {
272        self.is_secure
273    }
274
275    /// Current lifecycle state.
276    #[must_use]
277    pub fn state(&self) -> PeerState {
278        self.state
279    }
280
281    /// Update the lifecycle state. The supplied `ts_secs` mirrors
282    /// the reference engine's `gossip_node::ts` and is used by the
283    /// failure detector to age the peer out.
284    ///
285    /// # Examples
286    ///
287    /// ```
288    /// use dynomite::cluster::peer::{Peer, PeerEndpoint, PeerState};
289    /// use dynomite::hashkit::DynToken;
290    /// let mut p = Peer::new(
291    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
292    ///     vec![DynToken::from_u32(0)], true, true, false,
293    /// );
294    /// p.set_state(PeerState::Normal, 42);
295    /// assert_eq!(p.state(), PeerState::Normal);
296    /// assert_eq!(p.last_state_ts_secs(), 42);
297    /// ```
298    pub fn set_state(&mut self, state: PeerState, ts_secs: u64) {
299        self.state = state;
300        self.last_state_ts_secs = ts_secs;
301    }
302
303    /// Last observed gossip timestamp (epoch seconds).
304    #[must_use]
305    pub fn last_state_ts_secs(&self) -> u64 {
306        self.last_state_ts_secs
307    }
308
309    /// Increment the consecutive-failure counter.
310    pub fn record_failure(&mut self) {
311        self.failure_count = self.failure_count.saturating_add(1);
312    }
313
314    /// Reset the consecutive-failure counter.
315    pub fn record_success(&mut self) {
316        self.failure_count = 0;
317    }
318
319    /// Current consecutive-failure count.
320    #[must_use]
321    pub fn failure_count(&self) -> u32 {
322        self.failure_count
323    }
324
325    /// Borrow the phi-accrual failure detector for this peer.
326    /// Used by the gossip task to record heartbeat arrivals and
327    /// to query the suspicion level on every tick.
328    #[must_use]
329    pub fn failure_detector(&self) -> &crate::cluster::failure_detector::PhiAccrual {
330        &self.fd
331    }
332
333    /// Mutably borrow the phi-accrual failure detector. Use
334    /// from the gossip / heartbeat task.
335    pub fn failure_detector_mut(&mut self) -> &mut crate::cluster::failure_detector::PhiAccrual {
336        &mut self.fd
337    }
338
339    /// First (primary) token of this peer, if any.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// use dynomite::cluster::peer::{Peer, PeerEndpoint};
345    /// use dynomite::hashkit::DynToken;
346    /// let p = Peer::new(
347    ///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
348    ///     vec![DynToken::from_u32(7)], true, true, false,
349    /// );
350    /// assert_eq!(p.primary_token().unwrap().get_int(), 7);
351    /// ```
352    #[must_use]
353    pub fn primary_token(&self) -> Option<&DynToken> {
354        self.tokens.first()
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    fn mk(rack: &str, dc: &str, is_local: bool, is_same_dc: bool) -> Peer {
363        Peer::new(
364            0,
365            PeerEndpoint::tcp("127.0.0.1".into(), 8101),
366            rack.into(),
367            dc.into(),
368            vec![DynToken::from_u32(1)],
369            is_local,
370            is_same_dc,
371            false,
372        )
373    }
374
375    #[test]
376    fn local_peer_starts_joining() {
377        let p = mk("r", "d", true, true);
378        assert_eq!(p.state(), PeerState::Joining);
379    }
380
381    #[test]
382    fn remote_peer_starts_down() {
383        let p = mk("r", "d", false, true);
384        assert_eq!(p.state(), PeerState::Down);
385    }
386
387    #[test]
388    fn state_names_round_trip() {
389        for s in [
390            PeerState::Unknown,
391            PeerState::Joining,
392            PeerState::Normal,
393            PeerState::Standby,
394            PeerState::Down,
395            PeerState::Reset,
396            PeerState::Leaving,
397        ] {
398            assert!(!s.name().is_empty());
399        }
400    }
401
402    #[test]
403    fn failure_counter_works() {
404        let mut p = mk("r", "d", false, true);
405        p.record_failure();
406        p.record_failure();
407        assert_eq!(p.failure_count(), 2);
408        p.record_success();
409        assert_eq!(p.failure_count(), 0);
410    }
411
412    #[test]
413    fn routable_states() {
414        assert!(PeerState::Normal.is_routable());
415        assert!(PeerState::Joining.is_routable());
416        assert!(!PeerState::Down.is_routable());
417        assert!(!PeerState::Leaving.is_routable());
418    }
419}