use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::cluster::failure_detector::DEFAULT_THRESHOLD;
use crate::cluster::peer::PeerState;
use crate::cluster::pool::ServerPool;
use crate::events::{ClusterEvent, EventManager};
use crate::hashkit::{token::parse_token, DynToken};
pub const DEFAULT_GOSSIP_INTERVAL_MS: u64 = 1_000;
pub const DEFAULT_SEEDS_CHECK_INTERVAL_MS: u64 = 30_000;
#[derive(Clone, Debug)]
pub struct GossipConfig {
pub enabled: bool,
pub interval: Duration,
pub seeds_check_interval: Duration,
}
impl Default for GossipConfig {
fn default() -> Self {
Self {
enabled: false,
interval: Duration::from_millis(DEFAULT_GOSSIP_INTERVAL_MS),
seeds_check_interval: Duration::from_millis(DEFAULT_SEEDS_CHECK_INTERVAL_MS),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SeedRecord {
pub host: String,
pub port: u16,
pub rack: String,
pub dc: String,
pub tokens: Vec<DynToken>,
}
#[derive(Clone, Debug)]
pub struct GossipNode {
pub dc: String,
pub rack: String,
pub host: String,
pub port: u16,
pub tokens: Vec<DynToken>,
pub state: PeerState,
pub ts_secs: u64,
pub is_local: bool,
}
#[derive(Clone, Debug, Default)]
pub struct GossipState {
by_token: HashMap<(String, String, String), GossipNode>,
by_name: HashMap<(String, String, String), GossipNode>,
node_count: usize,
}
impl GossipState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn node_count(&self) -> usize {
self.node_count
}
fn token_key(node: &GossipNode) -> (String, String, String) {
let primary = node
.tokens
.first()
.map(|t| format!("{}", t.get_int()))
.unwrap_or_default();
(node.dc.clone(), node.rack.clone(), primary)
}
fn name_key(node: &GossipNode) -> (String, String, String) {
(node.dc.clone(), node.rack.clone(), node.host.clone())
}
pub fn add_or_update(&mut self, node: GossipNode) -> GossipStep {
let token_key = Self::token_key(&node);
let name_key = Self::name_key(&node);
if let Some(existing) = self.by_token.get_mut(&token_key) {
if existing.host == node.host {
if node.ts_secs > existing.ts_secs {
let changed = existing.state != node.state;
existing.state = node.state;
existing.ts_secs = node.ts_secs;
if changed {
return GossipStep::StateChanged;
}
return GossipStep::TimestampUpdated;
}
GossipStep::Unchanged
} else {
let old_name_key = Self::name_key(existing);
self.by_name.remove(&old_name_key);
*existing = node.clone();
self.by_name.insert(name_key, node);
GossipStep::Replaced
}
} else {
self.by_token.insert(token_key, node.clone());
self.by_name.insert(name_key, node);
self.node_count += 1;
GossipStep::Added
}
}
pub fn nodes(&self) -> impl Iterator<Item = &GossipNode> + '_ {
self.by_token.values()
}
pub fn run_failure_detector(&mut self, now_secs: u64, interval_ms: u64) {
let delta_secs = (interval_ms / 1000).saturating_mul(40);
for node in self.by_token.values_mut() {
if node.is_local {
continue;
}
if now_secs.saturating_sub(node.ts_secs) > delta_secs {
node.state = PeerState::Down;
}
}
for node in self.by_name.values_mut() {
if node.is_local {
continue;
}
if now_secs.saturating_sub(node.ts_secs) > delta_secs {
node.state = PeerState::Down;
}
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum GossipStep {
Added,
StateChanged,
TimestampUpdated,
Replaced,
Unchanged,
}
pub fn parse_seed_node(raw: &str) -> Result<SeedRecord, String> {
let parts: Vec<&str> = raw.splitn(5, ':').collect();
if parts.len() != 5 {
return Err(format!("malformed seed entry '{raw}'"));
}
let mut iter = raw.rsplitn(5, ':');
let tokens_str = iter.next().ok_or("missing tokens")?;
let dc = iter.next().ok_or("missing dc")?;
let rack = iter.next().ok_or("missing rack")?;
let port_str = iter.next().ok_or("missing port")?;
let host = iter.next().ok_or("missing host")?;
if host.is_empty() {
return Err(format!("empty host in '{raw}'"));
}
if rack.is_empty() {
return Err(format!("empty rack in '{raw}'"));
}
if dc.is_empty() {
return Err(format!("empty dc in '{raw}'"));
}
let port: u16 = port_str
.parse()
.map_err(|e| format!("bad port '{port_str}': {e}"))?;
if port == 0 {
return Err(format!("zero port in '{raw}'"));
}
if tokens_str.is_empty() {
return Err(format!("empty tokens in '{raw}'"));
}
let mut tokens = Vec::new();
for t in tokens_str.split(',') {
let parsed = parse_token(t.as_bytes()).map_err(|e| format!("bad token '{t}': {e}"))?;
tokens.push(parsed);
}
Ok(SeedRecord {
host: host.to_string(),
port,
rack: rack.to_string(),
dc: dc.to_string(),
tokens,
})
}
pub fn parse_seed_blob(raw: &str) -> Result<Vec<SeedRecord>, String> {
let mut out = Vec::new();
for piece in raw.split('|') {
if piece.is_empty() {
continue;
}
out.push(parse_seed_node(piece)?);
}
Ok(out)
}
#[derive(Debug)]
pub struct GossipHandler {
pool: Arc<ServerPool>,
threshold: f64,
interval: Duration,
failure_metrics: Option<Arc<crate::stats::FailureMetrics>>,
events: Option<Arc<EventManager>>,
}
impl GossipHandler {
#[must_use]
pub fn new(pool: Arc<ServerPool>) -> Self {
Self {
pool,
threshold: DEFAULT_THRESHOLD,
interval: Duration::from_millis(DEFAULT_GOSSIP_INTERVAL_MS),
failure_metrics: None,
events: None,
}
}
#[must_use]
pub fn with_failure_metrics(mut self, metrics: Arc<crate::stats::FailureMetrics>) -> Self {
self.failure_metrics = Some(metrics);
self
}
#[must_use]
pub fn with_events(mut self, events: Arc<EventManager>) -> Self {
self.events = Some(events);
self
}
#[must_use]
pub fn events(&self) -> Option<&Arc<EventManager>> {
self.events.as_ref()
}
#[must_use]
pub fn with_threshold(mut self, threshold: f64) -> Self {
self.threshold = threshold;
self
}
#[must_use]
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
#[must_use]
pub fn threshold(&self) -> f64 {
self.threshold
}
#[must_use]
pub fn interval(&self) -> Duration {
self.interval
}
#[must_use]
pub fn pool(&self) -> &Arc<ServerPool> {
&self.pool
}
pub fn record_heartbeat_pname(&self, pname: &str, now: Instant) {
let mut peers = self.pool.peers().write();
for p in peers.iter_mut() {
if p.is_local() {
continue;
}
if p.endpoint().pname() == pname {
p.failure_detector_mut().record_heartbeat(now);
if p.failure_detector().phi(now) <= self.threshold && p.state() != PeerState::Normal
{
let prev = p.state();
p.set_state(PeerState::Normal, now_secs_wall());
if let Some(m) = self.failure_metrics.as_ref() {
m.record_peer_state_transition(
p.idx(),
p.dc(),
p.rack(),
prev,
PeerState::Normal,
);
}
if let Some(ev) = self.events.as_ref() {
ev.publish(ClusterEvent::PeerUp {
peer_id: p.idx(),
dc: p.dc().to_string(),
ts: std::time::SystemTime::now(),
});
}
}
return;
}
}
}
pub fn record_heartbeat_idx(&self, peer_idx: u32, now: Instant) {
let mut peers = self.pool.peers().write();
if let Some(p) = peers.iter_mut().find(|p| p.idx() == peer_idx) {
if p.is_local() {
return;
}
p.failure_detector_mut().record_heartbeat(now);
if p.failure_detector().phi(now) <= self.threshold && p.state() != PeerState::Normal {
let prev = p.state();
p.set_state(PeerState::Normal, now_secs_wall());
if let Some(m) = self.failure_metrics.as_ref() {
m.record_peer_state_transition(
p.idx(),
p.dc(),
p.rack(),
prev,
PeerState::Normal,
);
}
if let Some(ev) = self.events.as_ref() {
ev.publish(ClusterEvent::PeerUp {
peer_id: p.idx(),
dc: p.dc().to_string(),
ts: std::time::SystemTime::now(),
});
}
}
}
}
pub fn evaluate(&self, now: Instant) -> Vec<(u32, PeerState)> {
let mut peers = self.pool.peers().write();
let mut transitions = Vec::new();
for p in peers.iter_mut() {
if p.is_local() {
continue;
}
let phi = p.failure_detector().phi(now);
if let Some(m) = self.failure_metrics.as_ref() {
m.observe_phi(p.idx(), p.dc(), p.rack(), phi);
}
let target = if p.failure_detector().last_heartbeat().is_some() && phi <= self.threshold
{
PeerState::Normal
} else {
PeerState::Down
};
let prev = p.state();
if prev != target {
p.set_state(target, now_secs_wall());
transitions.push((p.idx(), target));
if let Some(m) = self.failure_metrics.as_ref() {
m.record_peer_state_transition(p.idx(), p.dc(), p.rack(), prev, target);
}
if let Some(ev) = self.events.as_ref() {
let ts = std::time::SystemTime::now();
match target {
PeerState::Normal => ev.publish(ClusterEvent::PeerUp {
peer_id: p.idx(),
dc: p.dc().to_string(),
ts,
}),
PeerState::Down => ev.publish(ClusterEvent::PeerDown {
peer_id: p.idx(),
dc: p.dc().to_string(),
phi,
ts,
}),
_ => {}
}
}
} else if let Some(m) = self.failure_metrics.as_ref() {
m.observe_peer_state(p.idx(), p.dc(), p.rack(), target);
}
}
transitions
}
pub fn mark_down_pname(&self, pname: &str) {
let mut peers = self.pool.peers().write();
for p in peers.iter_mut() {
if p.is_local() {
continue;
}
if p.endpoint().pname() == pname && p.state() != PeerState::Down {
let prev = p.state();
p.set_state(PeerState::Down, now_secs_wall());
if let Some(m) = self.failure_metrics.as_ref() {
m.record_peer_state_transition(
p.idx(),
p.dc(),
p.rack(),
prev,
PeerState::Down,
);
}
if let Some(ev) = self.events.as_ref() {
ev.publish(ClusterEvent::PeerDown {
peer_id: p.idx(),
dc: p.dc().to_string(),
phi: p.failure_detector().phi(Instant::now()),
ts: std::time::SystemTime::now(),
});
}
return;
}
}
}
pub fn reset_detector(&self, peer_idx: u32) {
let mut peers = self.pool.peers().write();
if let Some(p) = peers.iter_mut().find(|p| p.idx() == peer_idx) {
p.failure_detector_mut().reset();
}
}
}
fn now_secs_wall() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs())
}
#[cfg(test)]
mod tests {
use super::*;
fn node(dc: &str, rack: &str, host: &str, tok: u32, ts: u64) -> GossipNode {
GossipNode {
dc: dc.into(),
rack: rack.into(),
host: host.into(),
port: 8101,
tokens: vec![DynToken::from_u32(tok)],
state: PeerState::Normal,
ts_secs: ts,
is_local: false,
}
}
#[test]
fn add_then_update_state() {
let mut s = GossipState::new();
assert_eq!(
s.add_or_update(node("d", "r", "h", 7, 1)),
GossipStep::Added
);
let mut n2 = node("d", "r", "h", 7, 2);
n2.state = PeerState::Down;
assert_eq!(s.add_or_update(n2), GossipStep::StateChanged);
}
#[test]
fn ip_replacement() {
let mut s = GossipState::new();
s.add_or_update(node("d", "r", "h1", 7, 1));
let n2 = node("d", "r", "h2", 7, 2);
assert_eq!(s.add_or_update(n2), GossipStep::Replaced);
}
#[test]
fn stale_update_ignored() {
let mut s = GossipState::new();
s.add_or_update(node("d", "r", "h", 7, 5));
let stale = node("d", "r", "h", 7, 1);
assert_eq!(s.add_or_update(stale), GossipStep::Unchanged);
}
#[test]
fn parse_one_seed() {
let r = parse_seed_node("10.0.0.1:8101:rA:dc1:1383429731").unwrap();
assert_eq!(r.host, "10.0.0.1");
assert_eq!(r.port, 8101);
assert_eq!(r.rack, "rA");
assert_eq!(r.dc, "dc1");
}
#[test]
fn parse_multi_token_seed() {
let r = parse_seed_node("h:1:r:d:1,2,3").unwrap();
assert_eq!(r.tokens.len(), 3);
}
#[test]
fn parse_blob_with_pipe() {
let v = parse_seed_blob("h1:1:r:d:1|h2:2:r:d:2").unwrap();
assert_eq!(v.len(), 2);
}
#[test]
fn parse_seed_rejects_short() {
assert!(parse_seed_node("h:1:r:d").is_err());
}
#[test]
fn failure_detector_ages_node_to_down() {
let mut s = GossipState::new();
s.add_or_update(node("d", "r", "h", 7, 0));
s.run_failure_detector(1000, 1000); assert_eq!(s.nodes().next().unwrap().state, PeerState::Down);
}
mod handler_helpers {
use std::sync::Arc;
use crate::cluster::peer::{Peer, PeerEndpoint};
use crate::cluster::pool::{PoolConfig, ServerPool};
use crate::hashkit::DynToken;
pub fn pool() -> Arc<ServerPool> {
let cfg = PoolConfig {
dc: "dc1".into(),
rack: "r1".into(),
enable_gossip: true,
..PoolConfig::default()
};
let local = Peer::new(
0,
PeerEndpoint::tcp("127.0.0.1".into(), 8101),
"r1".into(),
"dc1".into(),
vec![DynToken::from_u32(0)],
true,
true,
false,
);
let remote = Peer::new(
1,
PeerEndpoint::tcp("127.0.0.1".into(), 8102),
"r1".into(),
"dc1".into(),
vec![DynToken::from_u32(2_147_483_648)],
false,
true,
false,
);
Arc::new(ServerPool::new(cfg, vec![local, remote]))
}
}
fn remote_state(pool: &super::ServerPool) -> PeerState {
pool.peers()
.read()
.iter()
.find(|p| !p.is_local())
.map_or(PeerState::Unknown, super::super::peer::Peer::state)
}
#[test]
fn handler_first_heartbeat_promotes_to_normal() {
let pool = handler_helpers::pool();
let handler = GossipHandler::new(pool.clone());
let t0 = std::time::Instant::now();
assert_eq!(remote_state(&pool), PeerState::Down);
handler.record_heartbeat_pname("127.0.0.1:8102", t0);
assert_eq!(remote_state(&pool), PeerState::Normal);
}
#[test]
fn handler_steady_heartbeats_keep_peer_normal() {
let pool = handler_helpers::pool();
let handler = GossipHandler::new(pool.clone());
let t0 = std::time::Instant::now();
for i in 0..100 {
let now = t0 + std::time::Duration::from_secs(i);
handler.record_heartbeat_pname("127.0.0.1:8102", now);
handler.evaluate(now);
}
let after_last =
t0 + std::time::Duration::from_secs(99) + std::time::Duration::from_millis(10);
let phi = pool
.peers()
.read()
.iter()
.find(|p| !p.is_local())
.map_or(0.0, |p| p.failure_detector().phi(after_last));
assert!(
phi < 1.0,
"phi should be < 1.0 right after a heartbeat, got {phi}"
);
assert_eq!(remote_state(&pool), PeerState::Normal);
}
#[test]
fn handler_silence_transitions_peer_to_down() {
let pool = handler_helpers::pool();
let handler = GossipHandler::new(pool.clone());
let t0 = std::time::Instant::now();
for i in 0..100 {
let now = t0 + std::time::Duration::from_secs(i);
handler.record_heartbeat_pname("127.0.0.1:8102", now);
}
let later = t0 + std::time::Duration::from_secs(159);
let transitions = handler.evaluate(later);
assert_eq!(transitions, vec![(1, PeerState::Down)]);
assert_eq!(remote_state(&pool), PeerState::Down);
}
#[test]
fn handler_evaluate_no_data_keeps_peer_down() {
let pool = handler_helpers::pool();
let handler = GossipHandler::new(pool.clone());
let t0 = std::time::Instant::now();
let transitions = handler.evaluate(t0);
assert!(transitions.is_empty());
assert_eq!(remote_state(&pool), PeerState::Down);
}
#[test]
fn handler_unknown_pname_is_silent() {
let pool = handler_helpers::pool();
let handler = GossipHandler::new(pool.clone());
let t0 = std::time::Instant::now();
handler.record_heartbeat_pname("10.0.0.99:9999", t0);
assert_eq!(remote_state(&pool), PeerState::Down);
}
#[test]
fn handler_mark_down_overrides_normal() {
let pool = handler_helpers::pool();
let handler = GossipHandler::new(pool.clone());
let t0 = std::time::Instant::now();
handler.record_heartbeat_pname("127.0.0.1:8102", t0);
assert_eq!(remote_state(&pool), PeerState::Normal);
handler.mark_down_pname("127.0.0.1:8102");
assert_eq!(remote_state(&pool), PeerState::Down);
}
#[test]
fn handler_evaluate_records_normal_to_down_transition() {
let pool = handler_helpers::pool();
let metrics = std::sync::Arc::new(crate::stats::FailureMetrics::new());
let handler = GossipHandler::new(pool.clone()).with_failure_metrics(metrics.clone());
let t0 = std::time::Instant::now();
for i in 0..100 {
let now = t0 + std::time::Duration::from_secs(i);
handler.record_heartbeat_pname("127.0.0.1:8102", now);
handler.evaluate(now);
}
let mid_snap = metrics.snapshot();
let normal_count = mid_snap
.peer_state_transitions
.iter()
.filter(|t| t.to == PeerState::Normal)
.map(|t| t.count)
.sum::<u64>();
assert_eq!(
normal_count, 1,
"got transitions: {:?}",
mid_snap.peer_state_transitions
);
let later = t0 + std::time::Duration::from_secs(159);
let transitions = handler.evaluate(later);
assert_eq!(transitions, vec![(1, PeerState::Down)]);
let snap = metrics.snapshot();
let down_entry = snap
.peer_state_transitions
.iter()
.find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
.expect("normal->down transition should be recorded");
assert_eq!(down_entry.count, 1);
assert_eq!(down_entry.peer_idx, 1);
let current = snap
.peer_state_current
.iter()
.find(|c| c.peer_idx == 1)
.expect("peer_state_current entry should be present");
assert_eq!(current.state, PeerState::Down);
assert_eq!(current.dc, "dc1");
assert_eq!(current.rack, "r1");
let phi_entry = snap
.peer_phi
.iter()
.find(|p| p.peer_idx == 1)
.expect("gossip_phi_score gauge should be populated");
assert!(
phi_entry.phi >= 0.0,
"phi should be non-negative; got {}",
phi_entry.phi
);
}
}