Skip to main content

dynomite/cluster/
gossip.rs

1//! Gossip state machine and seed-list bookkeeping.
2//!
3//! The reference engine runs a dedicated pthread that wakes on a
4//! fixed interval, queries the seeds provider, parses returned
5//! `host:port:rack:dc:tokens|...` blobs, and reconciles the
6//! resulting nodes against the per-DC / per-rack tables. Nodes
7//! are added when absent, replaced when their IP changes, and
8//! gossip-updated when only the timestamp / state moves. Once
9//! per round, the engine forwards either a `GOSSIP_SYN` (if
10//! joining) or the local state digest (if normal) to a randomly
11//! chosen peer.
12//!
13//! This module ports the data shape, the seed-list parser, and a
14//! deterministic state machine that the dispatcher / a tokio
15//! periodic task drives. The actual outbound dnode framing of
16//! `GOSSIP_SYN` lives in [`crate::proto::dnode`]; the cluster
17//! layer composes the two.
18//!
19//! # Examples
20//!
21//! ```
22//! use dynomite::cluster::gossip::{parse_seed_node, SeedRecord};
23//! let r = parse_seed_node("10.0.0.1:8101:rackA:dcX:1383429731").unwrap();
24//! assert_eq!(r.host, "10.0.0.1");
25//! assert_eq!(r.port, 8101);
26//! assert_eq!(r.dc, "dcX");
27//! assert_eq!(r.rack, "rackA");
28//! assert_eq!(r.tokens.len(), 1);
29//! ```
30
31use std::collections::HashMap;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use crate::cluster::failure_detector::DEFAULT_THRESHOLD;
36use crate::cluster::peer::PeerState;
37use crate::cluster::pool::ServerPool;
38use crate::events::{ClusterEvent, EventManager};
39use crate::hashkit::{token::parse_token, DynToken};
40
41/// Default gossip period (ms) - mirrors `CONF_DEFAULT_GOS_INTERVAL`
42/// (1000 ms).
43pub const DEFAULT_GOSSIP_INTERVAL_MS: u64 = 1_000;
44
45/// Default seeds-check interval (`SEEDS_CHECK_INTERVAL`, 30s).
46pub const DEFAULT_SEEDS_CHECK_INTERVAL_MS: u64 = 30_000;
47
48/// Static configuration consumed by the gossip task.
49#[derive(Clone, Debug)]
50pub struct GossipConfig {
51    /// Whether gossip is enabled.
52    pub enabled: bool,
53    /// Gossip period.
54    pub interval: Duration,
55    /// Seeds-check period (the C engine queries the seeds
56    /// provider at most once per `SEEDS_CHECK_INTERVAL`).
57    pub seeds_check_interval: Duration,
58}
59
60impl Default for GossipConfig {
61    fn default() -> Self {
62        Self {
63            enabled: false,
64            interval: Duration::from_millis(DEFAULT_GOSSIP_INTERVAL_MS),
65            seeds_check_interval: Duration::from_millis(DEFAULT_SEEDS_CHECK_INTERVAL_MS),
66        }
67    }
68}
69
70/// Parsed view of one entry from a seeds-provider blob.
71#[derive(Clone, Debug, Eq, PartialEq)]
72pub struct SeedRecord {
73    /// Hostname or IP.
74    pub host: String,
75    /// TCP port.
76    pub port: u16,
77    /// Rack name.
78    pub rack: String,
79    /// Datacenter name.
80    pub dc: String,
81    /// Token list.
82    pub tokens: Vec<DynToken>,
83}
84
85/// In-memory record of a node observed via gossip. Sits next to
86/// [`crate::cluster::peer::Peer`]; the gossip task keeps a
87/// dedicated table because the reference engine separates the two
88/// (`gossip_node` vs `node`).
89#[derive(Clone, Debug)]
90pub struct GossipNode {
91    /// Datacenter.
92    pub dc: String,
93    /// Rack.
94    pub rack: String,
95    /// Hostname or IP.
96    pub host: String,
97    /// TCP port.
98    pub port: u16,
99    /// Token list.
100    pub tokens: Vec<DynToken>,
101    /// Lifecycle state.
102    pub state: PeerState,
103    /// Epoch-seconds timestamp of the last update.
104    pub ts_secs: u64,
105    /// True for the local node.
106    pub is_local: bool,
107}
108
109/// Live gossip state.
110///
111/// A simple `HashMap` keyed on `(dc, rack, primary token bytes)`
112/// reproduces the reference engine's per-rack `dict_token_nodes`
113/// behaviour. A second map keyed on `(dc, rack, host)` reproduces
114/// the per-rack `dict_name_nodes` lookup used to detect IP
115/// replacement.
116#[derive(Clone, Debug, Default)]
117pub struct GossipState {
118    by_token: HashMap<(String, String, String), GossipNode>,
119    by_name: HashMap<(String, String, String), GossipNode>,
120    node_count: usize,
121}
122
123impl GossipState {
124    /// Empty state.
125    ///
126    /// # Examples
127    ///
128    /// ```
129    /// use dynomite::cluster::gossip::GossipState;
130    /// let s = GossipState::new();
131    /// assert_eq!(s.node_count(), 0);
132    /// ```
133    #[must_use]
134    pub fn new() -> Self {
135        Self::default()
136    }
137
138    /// Number of distinct gossip nodes tracked.
139    #[must_use]
140    pub fn node_count(&self) -> usize {
141        self.node_count
142    }
143
144    /// Step result of [`GossipState::add_or_update`].
145    fn token_key(node: &GossipNode) -> (String, String, String) {
146        let primary = node
147            .tokens
148            .first()
149            .map(|t| format!("{}", t.get_int()))
150            .unwrap_or_default();
151        (node.dc.clone(), node.rack.clone(), primary)
152    }
153
154    fn name_key(node: &GossipNode) -> (String, String, String) {
155        (node.dc.clone(), node.rack.clone(), node.host.clone())
156    }
157
158    /// Add or update a [`GossipNode`].
159    ///
160    /// Mirrors the reference engine's `gossip_add_node_if_absent`
161    /// state machine:
162    ///
163    /// * brand-new (dc, rack, token) -> insert.
164    /// * known token but new host -> replace IP and re-index.
165    /// * known token + known host -> update timestamp / state if
166    ///   the supplied `ts_secs` is newer than the stored value.
167    ///
168    /// Returns the [`GossipStep`] that classifies the change for
169    /// the caller (handy in tests).
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// use dynomite::cluster::gossip::{GossipNode, GossipState, GossipStep};
175    /// use dynomite::cluster::peer::PeerState;
176    /// use dynomite::hashkit::DynToken;
177    /// let mut s = GossipState::new();
178    /// let n = GossipNode {
179    ///     dc: "d".into(), rack: "r".into(), host: "h".into(), port: 1,
180    ///     tokens: vec![DynToken::from_u32(7)], state: PeerState::Normal,
181    ///     ts_secs: 1, is_local: false,
182    /// };
183    /// assert_eq!(s.add_or_update(n.clone()), GossipStep::Added);
184    /// assert_eq!(s.add_or_update(n), GossipStep::Unchanged);
185    /// ```
186    pub fn add_or_update(&mut self, node: GossipNode) -> GossipStep {
187        let token_key = Self::token_key(&node);
188        let name_key = Self::name_key(&node);
189        if let Some(existing) = self.by_token.get_mut(&token_key) {
190            if existing.host == node.host {
191                if node.ts_secs > existing.ts_secs {
192                    let changed = existing.state != node.state;
193                    existing.state = node.state;
194                    existing.ts_secs = node.ts_secs;
195                    if changed {
196                        return GossipStep::StateChanged;
197                    }
198                    return GossipStep::TimestampUpdated;
199                }
200                GossipStep::Unchanged
201            } else {
202                // Replace IP.
203                let old_name_key = Self::name_key(existing);
204                self.by_name.remove(&old_name_key);
205                *existing = node.clone();
206                self.by_name.insert(name_key, node);
207                GossipStep::Replaced
208            }
209        } else {
210            self.by_token.insert(token_key, node.clone());
211            self.by_name.insert(name_key, node);
212            self.node_count += 1;
213            GossipStep::Added
214        }
215    }
216
217    /// Iterate over the live gossip nodes.
218    pub fn nodes(&self) -> impl Iterator<Item = &GossipNode> + '_ {
219        self.by_token.values()
220    }
221
222    /// Apply the failure detector to every non-local node.
223    ///
224    /// Mirrors `gossip_failure_detector`: a node whose
225    /// `now_secs - ts_secs` exceeds `(interval_ms / 1000) * 40`
226    /// is marked [`PeerState::Down`].
227    ///
228    /// # Examples
229    ///
230    /// ```
231    /// use dynomite::cluster::gossip::{GossipNode, GossipState};
232    /// use dynomite::cluster::peer::PeerState;
233    /// use dynomite::hashkit::DynToken;
234    /// let mut s = GossipState::new();
235    /// s.add_or_update(GossipNode {
236    ///     dc: "d".into(), rack: "r".into(), host: "h".into(), port: 1,
237    ///     tokens: vec![DynToken::from_u32(7)], state: PeerState::Normal,
238    ///     ts_secs: 0, is_local: false,
239    /// });
240    /// s.run_failure_detector(100, 1000);
241    /// assert_eq!(s.nodes().next().unwrap().state, PeerState::Down);
242    /// ```
243    pub fn run_failure_detector(&mut self, now_secs: u64, interval_ms: u64) {
244        let delta_secs = (interval_ms / 1000).saturating_mul(40);
245        for node in self.by_token.values_mut() {
246            if node.is_local {
247                continue;
248            }
249            if now_secs.saturating_sub(node.ts_secs) > delta_secs {
250                node.state = PeerState::Down;
251            }
252        }
253        // Mirror by_name.
254        for node in self.by_name.values_mut() {
255            if node.is_local {
256                continue;
257            }
258            if now_secs.saturating_sub(node.ts_secs) > delta_secs {
259                node.state = PeerState::Down;
260            }
261        }
262    }
263}
264
265/// Outcome of [`GossipState::add_or_update`].
266#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
267pub enum GossipStep {
268    /// Node is brand new.
269    Added,
270    /// Same node, same host, newer state arrived.
271    StateChanged,
272    /// Same node, same host, only the timestamp moved forward.
273    TimestampUpdated,
274    /// Same token but different host: IP replacement.
275    Replaced,
276    /// Stale or duplicate ack.
277    Unchanged,
278}
279
280/// Parse one `host:port:rack:dc:tokens` seed string.
281///
282/// Mirrors the reference engine's `parse_seeds` routine. The token
283/// list may be a single big-int or a comma-separated list.
284///
285/// # Examples
286///
287/// ```
288/// use dynomite::cluster::gossip::parse_seed_node;
289/// assert!(parse_seed_node("h:1:r:d:1,2,3").is_ok());
290/// assert!(parse_seed_node("h:1:r:d").is_err());
291/// ```
292pub fn parse_seed_node(raw: &str) -> Result<SeedRecord, String> {
293    let parts: Vec<&str> = raw.splitn(5, ':').collect();
294    if parts.len() != 5 {
295        return Err(format!("malformed seed entry '{raw}'"));
296    }
297    // The reference engine splits from the right (strrchr), so
298    // tokens get the rightmost field. To preserve that with hosts
299    // that may contain colons (rare; typically IPv4), we instead
300    // rsplit:
301    let mut iter = raw.rsplitn(5, ':');
302    let tokens_str = iter.next().ok_or("missing tokens")?;
303    let dc = iter.next().ok_or("missing dc")?;
304    let rack = iter.next().ok_or("missing rack")?;
305    let port_str = iter.next().ok_or("missing port")?;
306    let host = iter.next().ok_or("missing host")?;
307    if host.is_empty() {
308        return Err(format!("empty host in '{raw}'"));
309    }
310    if rack.is_empty() {
311        return Err(format!("empty rack in '{raw}'"));
312    }
313    if dc.is_empty() {
314        return Err(format!("empty dc in '{raw}'"));
315    }
316    let port: u16 = port_str
317        .parse()
318        .map_err(|e| format!("bad port '{port_str}': {e}"))?;
319    if port == 0 {
320        return Err(format!("zero port in '{raw}'"));
321    }
322    if tokens_str.is_empty() {
323        return Err(format!("empty tokens in '{raw}'"));
324    }
325    let mut tokens = Vec::new();
326    for t in tokens_str.split(',') {
327        let parsed = parse_token(t.as_bytes()).map_err(|e| format!("bad token '{t}': {e}"))?;
328        tokens.push(parsed);
329    }
330    Ok(SeedRecord {
331        host: host.to_string(),
332        port,
333        rack: rack.to_string(),
334        dc: dc.to_string(),
335        tokens,
336    })
337}
338
339/// Parse a multi-entry seeds blob (entries separated by `|`).
340///
341/// # Examples
342///
343/// ```
344/// use dynomite::cluster::gossip::parse_seed_blob;
345/// let v = parse_seed_blob("h1:8101:r:d:1|h2:8101:r:d:2").unwrap();
346/// assert_eq!(v.len(), 2);
347/// ```
348pub fn parse_seed_blob(raw: &str) -> Result<Vec<SeedRecord>, String> {
349    let mut out = Vec::new();
350    for piece in raw.split('|') {
351        if piece.is_empty() {
352            continue;
353        }
354        out.push(parse_seed_node(piece)?);
355    }
356    Ok(out)
357}
358
359/// Authoritative owner of [`PeerState`] transitions for the
360/// gossip plane.
361///
362/// The handler holds an `Arc<ServerPool>` and feeds the
363/// per-peer phi-accrual failure detectors as gossip frames
364/// arrive. A periodic tick re-evaluates phi for every non-local
365/// peer and toggles `PeerState` between `Normal` and `Down` based
366/// on the configured threshold:
367///
368/// * a peer is `Normal` once at least one heartbeat has been
369///   recorded AND `phi(now) <= threshold`,
370/// * a peer is `Down` when no heartbeat has ever been recorded
371///   OR `phi(now) > threshold`.
372///
373/// The handler is the single place that mutates `peer.state`
374/// once gossip is wired; the supervisor loop that owns the TCP
375/// link no longer publishes peer-state transitions of its own.
376///
377/// # Examples
378///
379/// ```
380/// use std::sync::Arc;
381/// use dynomite::cluster::gossip::GossipHandler;
382/// use dynomite::cluster::peer::{Peer, PeerEndpoint};
383/// use dynomite::cluster::pool::{PoolConfig, ServerPool};
384/// use dynomite::hashkit::DynToken;
385///
386/// let cfg = PoolConfig::default();
387/// let local = Peer::new(
388///     0, PeerEndpoint::tcp("h".into(), 1), "r".into(), "d".into(),
389///     vec![DynToken::from_u32(0)], true, true, false,
390/// );
391/// let pool = Arc::new(ServerPool::new(cfg, vec![local]));
392/// let handler = GossipHandler::new(pool);
393/// assert!((handler.threshold() - 8.0).abs() < f64::EPSILON);
394/// ```
395#[derive(Debug)]
396pub struct GossipHandler {
397    pool: Arc<ServerPool>,
398    threshold: f64,
399    interval: Duration,
400    /// Optional failure-cause metrics handle. When wired,
401    /// every peer-state transition observed by
402    /// [`Self::evaluate`] increments the matching
403    /// `peer_state_transitions_total` counter and updates the
404    /// `peer_state_current` and `gossip_phi_score` gauges.
405    failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
406    /// Optional structured-event publisher. When wired, every
407    /// peer-state transition observed by [`Self::evaluate`] or
408    /// [`Self::record_heartbeat_pname`] /
409    /// [`Self::record_heartbeat_idx`] surfaces a
410    /// [`ClusterEvent::PeerUp`] / [`ClusterEvent::PeerDown`]
411    /// payload on the manager's broadcast.
412    events: Option<Arc<EventManager>>,
413}
414
415impl GossipHandler {
416    /// Build a fresh handler over `pool` using the default
417    /// phi-accrual threshold ([`crate::cluster::failure_detector::DEFAULT_THRESHOLD`]).
418    #[must_use]
419    pub fn new(pool: Arc<ServerPool>) -> Self {
420        Self {
421            pool,
422            threshold: DEFAULT_THRESHOLD,
423            interval: Duration::from_millis(DEFAULT_GOSSIP_INTERVAL_MS),
424            failure_metrics: None,
425            events: None,
426        }
427    }
428
429    /// Attach a [`crate::stats::FailureMetrics`] handle.
430    ///
431    /// When set, [`Self::evaluate`] emits a
432    /// `peer_state_transitions_total` counter tick and a
433    /// `peer_state_current` gauge update for every transition
434    /// it applies, plus a `gossip_phi_score` gauge update for
435    /// every non-local peer regardless of whether its state
436    /// changed. Default behaviour is unchanged when no metrics
437    /// handle is supplied.
438    #[must_use]
439    pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
440        self.failure_metrics = Some(metrics);
441        self
442    }
443
444    /// Attach an [`EventManager`] handle.
445    ///
446    /// When set, every peer-state transition the handler
447    /// applies surfaces a [`ClusterEvent::PeerUp`] or
448    /// [`ClusterEvent::PeerDown`] payload on the manager's
449    /// broadcast. Default behaviour is unchanged when no event
450    /// manager is supplied.
451    #[must_use]
452    pub fn with_events(mut self, events: Arc<EventManager>) -> Self {
453        self.events = Some(events);
454        self
455    }
456
457    /// Borrow the installed event manager, if any.
458    #[must_use]
459    pub fn events(&self) -> Option<&Arc<EventManager>> {
460        self.events.as_ref()
461    }
462
463    /// Override the phi threshold (default 8.0).
464    #[must_use]
465    pub fn with_threshold(mut self, threshold: f64) -> Self {
466        self.threshold = threshold;
467        self
468    }
469
470    /// Override the gossip interval used by the periodic tick
471    /// when the handler is driven by the binary's run loop. The
472    /// in-process tests do not depend on this value.
473    #[must_use]
474    pub fn with_interval(mut self, interval: Duration) -> Self {
475        self.interval = interval;
476        self
477    }
478
479    /// Phi threshold the handler is configured with.
480    #[must_use]
481    pub fn threshold(&self) -> f64 {
482        self.threshold
483    }
484
485    /// Configured gossip interval.
486    #[must_use]
487    pub fn interval(&self) -> Duration {
488        self.interval
489    }
490
491    /// Borrow the underlying pool.
492    #[must_use]
493    pub fn pool(&self) -> &Arc<ServerPool> {
494        &self.pool
495    }
496
497    /// Record an inbound gossip heartbeat from the peer
498    /// identified by `pname` (a `host:port` string matching the
499    /// peer's [`crate::cluster::peer::PeerEndpoint::pname`]).
500    ///
501    /// Mutates the peer's failure detector and immediately
502    /// promotes the peer's state to [`PeerState::Normal`] when
503    /// `phi(now)` is below the threshold; this gives gossip a
504    /// snappy first-contact transition without waiting for the
505    /// next periodic tick.
506    ///
507    /// Unknown pnames are ignored.
508    pub fn record_heartbeat_pname(&self, pname: &str, now: Instant) {
509        let mut peers = self.pool.peers().write();
510        for p in peers.iter_mut() {
511            if p.is_local() {
512                continue;
513            }
514            if p.endpoint().pname() == pname {
515                p.failure_detector_mut().record_heartbeat(now);
516                if p.failure_detector().phi(now) <= self.threshold && p.state() != PeerState::Normal
517                {
518                    let prev = p.state();
519                    p.set_state(PeerState::Normal, now_secs_wall());
520                    if let Some(m) = self.failure_metrics.as_ref() {
521                        m.record_peer_state_transition(
522                            p.idx(),
523                            p.dc(),
524                            p.rack(),
525                            prev,
526                            PeerState::Normal,
527                        );
528                    }
529                    if let Some(ev) = self.events.as_ref() {
530                        ev.publish(ClusterEvent::PeerUp {
531                            peer_id: p.idx(),
532                            dc: p.dc().to_string(),
533                            ts: std::time::SystemTime::now(),
534                        });
535                    }
536                }
537                return;
538            }
539        }
540    }
541
542    /// Record an inbound gossip heartbeat against a known peer
543    /// index. Used by tests and by callers that already resolved
544    /// the originating peer.
545    pub fn record_heartbeat_idx(&self, peer_idx: u32, now: Instant) {
546        let mut peers = self.pool.peers().write();
547        if let Some(p) = peers.iter_mut().find(|p| p.idx() == peer_idx) {
548            if p.is_local() {
549                return;
550            }
551            p.failure_detector_mut().record_heartbeat(now);
552            if p.failure_detector().phi(now) <= self.threshold && p.state() != PeerState::Normal {
553                let prev = p.state();
554                p.set_state(PeerState::Normal, now_secs_wall());
555                if let Some(m) = self.failure_metrics.as_ref() {
556                    m.record_peer_state_transition(
557                        p.idx(),
558                        p.dc(),
559                        p.rack(),
560                        prev,
561                        PeerState::Normal,
562                    );
563                }
564                if let Some(ev) = self.events.as_ref() {
565                    ev.publish(ClusterEvent::PeerUp {
566                        peer_id: p.idx(),
567                        dc: p.dc().to_string(),
568                        ts: std::time::SystemTime::now(),
569                    });
570                }
571            }
572        }
573    }
574
575    /// Walk every non-local peer and reconcile its `PeerState`
576    /// with the failure detector's current view of `phi(now)`.
577    /// Returns the list of `(peer_idx, new_state)` transitions
578    /// the call applied (handy in tests).
579    ///
580    /// This is the failure-detector tick the binary runs on a
581    /// periodic timer. Calling it never panics and it never
582    /// blocks on I/O.
583    pub fn evaluate(&self, now: Instant) -> Vec<(u32, PeerState)> {
584        let mut peers = self.pool.peers().write();
585        let mut transitions = Vec::new();
586        for p in peers.iter_mut() {
587            if p.is_local() {
588                continue;
589            }
590            let phi = p.failure_detector().phi(now);
591            if let Some(m) = self.failure_metrics.as_ref() {
592                m.observe_phi(p.idx(), p.dc(), p.rack(), phi);
593            }
594            let target = if p.failure_detector().last_heartbeat().is_some() && phi <= self.threshold
595            {
596                PeerState::Normal
597            } else {
598                PeerState::Down
599            };
600            let prev = p.state();
601            if prev != target {
602                p.set_state(target, now_secs_wall());
603                transitions.push((p.idx(), target));
604                if let Some(m) = self.failure_metrics.as_ref() {
605                    m.record_peer_state_transition(p.idx(), p.dc(), p.rack(), prev, target);
606                }
607                if let Some(ev) = self.events.as_ref() {
608                    let ts = std::time::SystemTime::now();
609                    match target {
610                        PeerState::Normal => ev.publish(ClusterEvent::PeerUp {
611                            peer_id: p.idx(),
612                            dc: p.dc().to_string(),
613                            ts,
614                        }),
615                        PeerState::Down => ev.publish(ClusterEvent::PeerDown {
616                            peer_id: p.idx(),
617                            dc: p.dc().to_string(),
618                            phi,
619                            ts,
620                        }),
621                        _ => {}
622                    }
623                }
624            } else if let Some(m) = self.failure_metrics.as_ref() {
625                m.observe_peer_state(p.idx(), p.dc(), p.rack(), target);
626            }
627        }
628        transitions
629    }
630
631    /// Mark the peer identified by `pname` as [`PeerState::Down`]
632    /// without consulting the failure detector. Used by the
633    /// gossip-shutdown path so the dispatcher can short-circuit
634    /// routing to a peer that announced its own departure.
635    pub fn mark_down_pname(&self, pname: &str) {
636        let mut peers = self.pool.peers().write();
637        for p in peers.iter_mut() {
638            if p.is_local() {
639                continue;
640            }
641            if p.endpoint().pname() == pname && p.state() != PeerState::Down {
642                let prev = p.state();
643                p.set_state(PeerState::Down, now_secs_wall());
644                if let Some(m) = self.failure_metrics.as_ref() {
645                    m.record_peer_state_transition(
646                        p.idx(),
647                        p.dc(),
648                        p.rack(),
649                        prev,
650                        PeerState::Down,
651                    );
652                }
653                if let Some(ev) = self.events.as_ref() {
654                    ev.publish(ClusterEvent::PeerDown {
655                        peer_id: p.idx(),
656                        dc: p.dc().to_string(),
657                        phi: p.failure_detector().phi(Instant::now()),
658                        ts: std::time::SystemTime::now(),
659                    });
660                }
661                return;
662            }
663        }
664    }
665
666    /// Reset the per-peer failure detector. Used when a peer is
667    /// removed and re-added so historical jitter does not bias
668    /// the new suspicion value.
669    pub fn reset_detector(&self, peer_idx: u32) {
670        let mut peers = self.pool.peers().write();
671        if let Some(p) = peers.iter_mut().find(|p| p.idx() == peer_idx) {
672            p.failure_detector_mut().reset();
673        }
674    }
675}
676
677fn now_secs_wall() -> u64 {
678    std::time::SystemTime::now()
679        .duration_since(std::time::UNIX_EPOCH)
680        .map_or(0, |d| d.as_secs())
681}
682
683#[cfg(test)]
684mod tests {
685    use super::*;
686
687    fn node(dc: &str, rack: &str, host: &str, tok: u32, ts: u64) -> GossipNode {
688        GossipNode {
689            dc: dc.into(),
690            rack: rack.into(),
691            host: host.into(),
692            port: 8101,
693            tokens: vec![DynToken::from_u32(tok)],
694            state: PeerState::Normal,
695            ts_secs: ts,
696            is_local: false,
697        }
698    }
699
700    #[test]
701    fn add_then_update_state() {
702        let mut s = GossipState::new();
703        assert_eq!(
704            s.add_or_update(node("d", "r", "h", 7, 1)),
705            GossipStep::Added
706        );
707        let mut n2 = node("d", "r", "h", 7, 2);
708        n2.state = PeerState::Down;
709        assert_eq!(s.add_or_update(n2), GossipStep::StateChanged);
710    }
711
712    #[test]
713    fn ip_replacement() {
714        let mut s = GossipState::new();
715        s.add_or_update(node("d", "r", "h1", 7, 1));
716        let n2 = node("d", "r", "h2", 7, 2);
717        assert_eq!(s.add_or_update(n2), GossipStep::Replaced);
718    }
719
720    #[test]
721    fn stale_update_ignored() {
722        let mut s = GossipState::new();
723        s.add_or_update(node("d", "r", "h", 7, 5));
724        let stale = node("d", "r", "h", 7, 1);
725        assert_eq!(s.add_or_update(stale), GossipStep::Unchanged);
726    }
727
728    #[test]
729    fn parse_one_seed() {
730        let r = parse_seed_node("10.0.0.1:8101:rA:dc1:1383429731").unwrap();
731        assert_eq!(r.host, "10.0.0.1");
732        assert_eq!(r.port, 8101);
733        assert_eq!(r.rack, "rA");
734        assert_eq!(r.dc, "dc1");
735    }
736
737    #[test]
738    fn parse_multi_token_seed() {
739        let r = parse_seed_node("h:1:r:d:1,2,3").unwrap();
740        assert_eq!(r.tokens.len(), 3);
741    }
742
743    #[test]
744    fn parse_blob_with_pipe() {
745        let v = parse_seed_blob("h1:1:r:d:1|h2:2:r:d:2").unwrap();
746        assert_eq!(v.len(), 2);
747    }
748
749    #[test]
750    fn parse_seed_rejects_short() {
751        assert!(parse_seed_node("h:1:r:d").is_err());
752    }
753
754    #[test]
755    fn failure_detector_ages_node_to_down() {
756        let mut s = GossipState::new();
757        s.add_or_update(node("d", "r", "h", 7, 0));
758        s.run_failure_detector(1000, 1000); // delta = 40s, now > 40s
759        assert_eq!(s.nodes().next().unwrap().state, PeerState::Down);
760    }
761
762    /// Construction helper for the `GossipHandler` test suite.
763    /// The handler operates on a real `ServerPool`, so each test
764    /// builds a small two-peer pool (one local, one remote).
765    mod handler_helpers {
766        use std::sync::Arc;
767
768        use crate::cluster::peer::{Peer, PeerEndpoint};
769        use crate::cluster::pool::{PoolConfig, ServerPool};
770        use crate::hashkit::DynToken;
771
772        pub fn pool() -> Arc<ServerPool> {
773            let cfg = PoolConfig {
774                dc: "dc1".into(),
775                rack: "r1".into(),
776                enable_gossip: true,
777                ..PoolConfig::default()
778            };
779            let local = Peer::new(
780                0,
781                PeerEndpoint::tcp("127.0.0.1".into(), 8101),
782                "r1".into(),
783                "dc1".into(),
784                vec![DynToken::from_u32(0)],
785                true,
786                true,
787                false,
788            );
789            let remote = Peer::new(
790                1,
791                PeerEndpoint::tcp("127.0.0.1".into(), 8102),
792                "r1".into(),
793                "dc1".into(),
794                vec![DynToken::from_u32(2_147_483_648)],
795                false,
796                true,
797                false,
798            );
799            Arc::new(ServerPool::new(cfg, vec![local, remote]))
800        }
801    }
802
803    fn remote_state(pool: &super::ServerPool) -> PeerState {
804        pool.peers()
805            .read()
806            .iter()
807            .find(|p| !p.is_local())
808            .map_or(PeerState::Unknown, super::super::peer::Peer::state)
809    }
810
811    #[test]
812    fn handler_first_heartbeat_promotes_to_normal() {
813        let pool = handler_helpers::pool();
814        let handler = GossipHandler::new(pool.clone());
815        let t0 = std::time::Instant::now();
816        assert_eq!(remote_state(&pool), PeerState::Down);
817        handler.record_heartbeat_pname("127.0.0.1:8102", t0);
818        // After the first received heartbeat the remote peer is
819        // promoted out of the initial `Down` state.
820        assert_eq!(remote_state(&pool), PeerState::Normal);
821    }
822
823    #[test]
824    fn handler_steady_heartbeats_keep_peer_normal() {
825        // Drive 100 heartbeats at 1s intervals; phi must stay
826        // below 1.0 throughout and the peer must remain `Normal`.
827        let pool = handler_helpers::pool();
828        let handler = GossipHandler::new(pool.clone());
829        let t0 = std::time::Instant::now();
830        for i in 0..100 {
831            let now = t0 + std::time::Duration::from_secs(i);
832            handler.record_heartbeat_pname("127.0.0.1:8102", now);
833            handler.evaluate(now);
834        }
835        let after_last =
836            t0 + std::time::Duration::from_secs(99) + std::time::Duration::from_millis(10);
837        let phi = pool
838            .peers()
839            .read()
840            .iter()
841            .find(|p| !p.is_local())
842            .map_or(0.0, |p| p.failure_detector().phi(after_last));
843        assert!(
844            phi < 1.0,
845            "phi should be < 1.0 right after a heartbeat, got {phi}"
846        );
847        assert_eq!(remote_state(&pool), PeerState::Normal);
848    }
849
850    #[test]
851    fn handler_silence_transitions_peer_to_down() {
852        // Stop heartbeats; advance the clock 60s; assert the
853        // periodic evaluation transitions the peer to `Down`.
854        let pool = handler_helpers::pool();
855        let handler = GossipHandler::new(pool.clone());
856        let t0 = std::time::Instant::now();
857        for i in 0..100 {
858            let now = t0 + std::time::Duration::from_secs(i);
859            handler.record_heartbeat_pname("127.0.0.1:8102", now);
860        }
861        // Advance 60 seconds past the last heartbeat with no new
862        // gossip; phi crosses the default threshold of 8.0.
863        let later = t0 + std::time::Duration::from_secs(159);
864        let transitions = handler.evaluate(later);
865        assert_eq!(transitions, vec![(1, PeerState::Down)]);
866        assert_eq!(remote_state(&pool), PeerState::Down);
867    }
868
869    #[test]
870    fn handler_evaluate_no_data_keeps_peer_down() {
871        // A peer we have never heard from stays `Down`.
872        let pool = handler_helpers::pool();
873        let handler = GossipHandler::new(pool.clone());
874        let t0 = std::time::Instant::now();
875        let transitions = handler.evaluate(t0);
876        assert!(transitions.is_empty());
877        assert_eq!(remote_state(&pool), PeerState::Down);
878    }
879
880    #[test]
881    fn handler_unknown_pname_is_silent() {
882        let pool = handler_helpers::pool();
883        let handler = GossipHandler::new(pool.clone());
884        let t0 = std::time::Instant::now();
885        handler.record_heartbeat_pname("10.0.0.99:9999", t0);
886        assert_eq!(remote_state(&pool), PeerState::Down);
887    }
888
889    #[test]
890    fn handler_mark_down_overrides_normal() {
891        let pool = handler_helpers::pool();
892        let handler = GossipHandler::new(pool.clone());
893        let t0 = std::time::Instant::now();
894        handler.record_heartbeat_pname("127.0.0.1:8102", t0);
895        assert_eq!(remote_state(&pool), PeerState::Normal);
896        handler.mark_down_pname("127.0.0.1:8102");
897        assert_eq!(remote_state(&pool), PeerState::Down);
898    }
899
900    /// `evaluate` toggles a peer Normal->Down once gossip
901    /// quiesces. The wired `FailureMetrics` accumulator must
902    /// see exactly one `(from=Normal, to=Down)` transition
903    /// counter tick and the matching `peer_state_current`
904    /// gauge entry.
905    #[test]
906    fn handler_evaluate_records_normal_to_down_transition() {
907        let pool = handler_helpers::pool();
908        let metrics = std::sync::Arc::new(crate::stats::FailureMetrics::new());
909        let handler = GossipHandler::new(pool.clone()).with_failure_metrics(metrics.clone());
910        let t0 = std::time::Instant::now();
911        // Drive 100 heartbeats so the peer is firmly `Normal`.
912        for i in 0..100 {
913            let now = t0 + std::time::Duration::from_secs(i);
914            handler.record_heartbeat_pname("127.0.0.1:8102", now);
915            handler.evaluate(now);
916        }
917        let mid_snap = metrics.snapshot();
918        let normal_count = mid_snap
919            .peer_state_transitions
920            .iter()
921            .filter(|t| t.to == PeerState::Normal)
922            .map(|t| t.count)
923            .sum::<u64>();
924        // There should be exactly one Down->Normal flip from
925        // the very first heartbeat.
926        assert_eq!(
927            normal_count, 1,
928            "got transitions: {:?}",
929            mid_snap.peer_state_transitions
930        );
931
932        // Now stop heartbeats and skip 60 seconds of wall
933        // time. evaluate should flip the peer to Down once.
934        let later = t0 + std::time::Duration::from_secs(159);
935        let transitions = handler.evaluate(later);
936        assert_eq!(transitions, vec![(1, PeerState::Down)]);
937
938        let snap = metrics.snapshot();
939        let down_entry = snap
940            .peer_state_transitions
941            .iter()
942            .find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
943            .expect("normal->down transition should be recorded");
944        assert_eq!(down_entry.count, 1);
945        assert_eq!(down_entry.peer_idx, 1);
946
947        // The current-state gauge follows the latest
948        // observation.
949        let current = snap
950            .peer_state_current
951            .iter()
952            .find(|c| c.peer_idx == 1)
953            .expect("peer_state_current entry should be present");
954        assert_eq!(current.state, PeerState::Down);
955        assert_eq!(current.dc, "dc1");
956        assert_eq!(current.rack, "r1");
957
958        // Phi gauge must be populated for the remote peer.
959        let phi_entry = snap
960            .peer_phi
961            .iter()
962            .find(|p| p.peer_idx == 1)
963            .expect("gossip_phi_score gauge should be populated");
964        assert!(
965            phi_entry.phi >= 0.0,
966            "phi should be non-negative; got {}",
967            phi_entry.phi
968        );
969    }
970}