use std::collections::HashMap;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NodeStatus {
Alive,
Suspected,
Dead,
Left,
}
#[derive(Clone, Debug)]
pub struct NodeInfo {
pub id: String,
pub address: String,
pub status: NodeStatus,
pub heartbeat: u64,
pub last_seen: u64,
pub metadata: HashMap<String, String>,
}
impl NodeInfo {
fn new(id: String, address: String, now: u64) -> Self {
Self {
id,
address,
status: NodeStatus::Alive,
heartbeat: 0,
last_seen: now,
metadata: HashMap::new(),
}
}
}
#[derive(Clone, Debug)]
pub struct GossipConfig {
pub fanout: usize,
pub interval_ms: u64,
pub suspicion_timeout_ms: u64,
pub dead_timeout_ms: u64,
}
impl Default for GossipConfig {
fn default() -> Self {
Self {
fanout: 3,
interval_ms: 200,
suspicion_timeout_ms: 1000,
dead_timeout_ms: 5000,
}
}
}
#[derive(Clone, Debug)]
pub struct GossipMessage {
pub sender_id: String,
pub nodes: Vec<NodeInfo>,
pub timestamp: u64,
}
pub struct GossipProtocol {
local_id: String,
members: HashMap<String, NodeInfo>,
config: GossipConfig,
generation: u64,
}
impl GossipProtocol {
pub fn new(local_id: String, address: String, config: GossipConfig) -> Self {
let mut members = HashMap::new();
let local = NodeInfo::new(local_id.clone(), address, 0);
members.insert(local_id.clone(), local);
Self {
local_id,
members,
config,
generation: 0,
}
}
pub fn join(&mut self, seed_id: String, seed_address: String) {
self.members
.entry(seed_id.clone())
.or_insert_with(|| NodeInfo::new(seed_id, seed_address, 0));
}
pub fn leave(&mut self) {
if let Some(info) = self.members.get_mut(&self.local_id) {
info.status = NodeStatus::Left;
}
}
pub fn heartbeat(&mut self, current_time_ms: u64) {
if let Some(info) = self.members.get_mut(&self.local_id) {
info.heartbeat += 1;
info.last_seen = current_time_ms;
}
self.generation += 1;
}
pub fn tick(&mut self, current_time_ms: u64) -> Vec<String> {
self.heartbeat(current_time_ms);
let mut candidates: Vec<String> = self
.members
.values()
.filter(|n| {
n.id != self.local_id
&& (n.status == NodeStatus::Alive || n.status == NodeStatus::Suspected)
})
.map(|n| n.id.clone())
.collect();
candidates.sort();
if !candidates.is_empty() {
let offset = (self.generation as usize) % candidates.len();
candidates.rotate_left(offset);
}
candidates.truncate(self.config.fanout);
candidates
}
pub fn receive_gossip(&mut self, msg: GossipMessage, current_time_ms: u64) {
for remote in msg.nodes {
if remote.id == self.local_id {
continue;
}
let accept = match self.members.get(&remote.id) {
None => true,
Some(local) => {
remote.heartbeat > local.heartbeat
|| status_severity(&remote.status) > status_severity(&local.status)
}
};
if accept {
let mut info = remote.clone();
info.last_seen = current_time_ms;
self.members.insert(remote.id, info);
}
}
}
pub fn update_suspicion(&mut self, current_time_ms: u64) {
for info in self.members.values_mut() {
if info.id == self.local_id {
continue;
}
if info.status == NodeStatus::Alive {
let elapsed = current_time_ms.saturating_sub(info.last_seen);
if elapsed >= self.config.suspicion_timeout_ms {
info.status = NodeStatus::Suspected;
}
}
}
}
pub fn promote_dead(&mut self, current_time_ms: u64) {
for info in self.members.values_mut() {
if info.id == self.local_id {
continue;
}
if info.status == NodeStatus::Suspected {
let elapsed = current_time_ms.saturating_sub(info.last_seen);
if elapsed >= self.config.dead_timeout_ms {
info.status = NodeStatus::Dead;
}
}
}
}
pub fn remove_dead(&mut self, current_time_ms: u64) {
self.members.retain(|id, info| {
if *id == self.local_id {
return true;
}
if info.status == NodeStatus::Dead {
let elapsed = current_time_ms.saturating_sub(info.last_seen);
return elapsed < self.config.dead_timeout_ms * 2;
}
true
});
}
pub fn create_gossip_message(&self) -> GossipMessage {
GossipMessage {
sender_id: self.local_id.clone(),
nodes: self.members.values().cloned().collect(),
timestamp: self
.members
.get(&self.local_id)
.map_or(0, |n| n.last_seen),
}
}
pub fn members(&self) -> Vec<&NodeInfo> {
self.members.values().collect()
}
pub fn alive_members(&self) -> Vec<&NodeInfo> {
self.members
.values()
.filter(|n| n.status == NodeStatus::Alive)
.collect()
}
pub fn suspected_members(&self) -> Vec<&NodeInfo> {
self.members
.values()
.filter(|n| n.status == NodeStatus::Suspected)
.collect()
}
pub fn dead_members(&self) -> Vec<&NodeInfo> {
self.members
.values()
.filter(|n| n.status == NodeStatus::Dead)
.collect()
}
pub fn member_count(&self) -> usize {
self.members.len()
}
pub fn get_member(&self, id: &str) -> Option<&NodeInfo> {
self.members.get(id)
}
}
fn status_severity(s: &NodeStatus) -> u8 {
match s {
NodeStatus::Alive => 0,
NodeStatus::Suspected => 1,
NodeStatus::Dead => 2,
NodeStatus::Left => 3,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_config() -> GossipConfig {
GossipConfig {
fanout: 2,
interval_ms: 100,
suspicion_timeout_ms: 1_000,
dead_timeout_ms: 5_000,
}
}
fn node(id: &str) -> GossipProtocol {
GossipProtocol::new(
id.to_owned(),
format!("127.0.0.1:700{}", id.len()),
default_config(),
)
}
#[test]
fn test_new_node_has_one_member() {
let n = node("node1");
assert_eq!(n.member_count(), 1);
assert!(n.get_member("node1").is_some());
}
#[test]
fn test_local_node_is_alive() {
let n = node("node1");
assert_eq!(
n.get_member("node1").unwrap().status,
NodeStatus::Alive
);
}
#[test]
fn test_join_adds_member() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
assert_eq!(n.member_count(), 2);
assert!(n.get_member("node2").is_some());
}
#[test]
fn test_join_idempotent() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.join("node2".into(), "127.0.0.1:7002".into());
assert_eq!(n.member_count(), 2);
}
#[test]
fn test_join_preserves_address() {
let mut n = node("node1");
n.join("node2".into(), "10.0.0.2:9000".into());
assert_eq!(n.get_member("node2").unwrap().address, "10.0.0.2:9000");
}
#[test]
fn test_leave_marks_left() {
let mut n = node("node1");
n.leave();
assert_eq!(
n.get_member("node1").unwrap().status,
NodeStatus::Left
);
}
#[test]
fn test_heartbeat_increments_counter() {
let mut n = node("node1");
n.heartbeat(1000);
assert_eq!(n.get_member("node1").unwrap().heartbeat, 1);
}
#[test]
fn test_heartbeat_updates_last_seen() {
let mut n = node("node1");
n.heartbeat(5000);
assert_eq!(n.get_member("node1").unwrap().last_seen, 5000);
}
#[test]
fn test_tick_no_peers_returns_empty() {
let mut n = node("node1");
let peers = n.tick(1000);
assert!(peers.is_empty());
}
#[test]
fn test_tick_respects_fanout() {
let mut n = node("node1");
for i in 2..=6 {
n.join(format!("node{}", i), format!("127.0.0.1:700{}", i));
}
let peers = n.tick(1000);
assert!(peers.len() <= 2);
}
#[test]
fn test_tick_excludes_local_node() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
let peers = n.tick(1000);
assert!(!peers.contains(&"node1".to_owned()));
}
#[test]
fn test_tick_excludes_dead_nodes() {
let mut n = node("node1");
n.join("dead_node".into(), "127.0.0.1:7099".into());
if let Some(m) = n.members.get_mut("dead_node") {
m.status = NodeStatus::Dead;
}
let peers = n.tick(1000);
assert!(!peers.contains(&"dead_node".to_owned()));
}
#[test]
fn test_tick_includes_suspected_nodes() {
let mut n = node("node1");
n.join("suspected_node".into(), "127.0.0.1:7050".into());
if let Some(m) = n.members.get_mut("suspected_node") {
m.status = NodeStatus::Suspected;
}
let peers = n.tick(1000);
assert!(peers.contains(&"suspected_node".to_owned()));
}
#[test]
fn test_receive_gossip_adds_new_node() {
let mut n = node("node1");
let msg = GossipMessage {
sender_id: "node2".into(),
nodes: vec![NodeInfo {
id: "node3".into(),
address: "127.0.0.1:7003".into(),
status: NodeStatus::Alive,
heartbeat: 5,
last_seen: 0,
metadata: HashMap::new(),
}],
timestamp: 0,
};
n.receive_gossip(msg, 1000);
assert!(n.get_member("node3").is_some());
}
#[test]
fn test_receive_gossip_updates_higher_heartbeat() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
let msg = GossipMessage {
sender_id: "node3".into(),
nodes: vec![NodeInfo {
id: "node2".into(),
address: "127.0.0.1:7002".into(),
status: NodeStatus::Alive,
heartbeat: 99,
last_seen: 0,
metadata: HashMap::new(),
}],
timestamp: 0,
};
n.receive_gossip(msg, 2000);
assert_eq!(n.get_member("node2").unwrap().heartbeat, 99);
}
#[test]
fn test_receive_gossip_ignores_stale_heartbeat() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.heartbeat = 50;
}
let msg = GossipMessage {
sender_id: "node3".into(),
nodes: vec![NodeInfo {
id: "node2".into(),
address: "127.0.0.1:7002".into(),
status: NodeStatus::Alive,
heartbeat: 10, last_seen: 0,
metadata: HashMap::new(),
}],
timestamp: 0,
};
n.receive_gossip(msg, 2000);
assert_eq!(n.get_member("node2").unwrap().heartbeat, 50);
}
#[test]
fn test_receive_gossip_never_overwrites_self() {
let mut n = node("node1");
n.heartbeat(1000);
let hb_before = n.get_member("node1").unwrap().heartbeat;
let msg = GossipMessage {
sender_id: "node2".into(),
nodes: vec![NodeInfo {
id: "node1".into(),
address: "127.0.0.1:9999".into(),
status: NodeStatus::Dead, heartbeat: 999,
last_seen: 0,
metadata: HashMap::new(),
}],
timestamp: 0,
};
n.receive_gossip(msg, 2000);
assert_eq!(n.get_member("node1").unwrap().heartbeat, hb_before);
assert_eq!(
n.get_member("node1").unwrap().status,
NodeStatus::Alive
);
}
#[test]
fn test_update_suspicion_promotes_after_timeout() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.update_suspicion(1001);
assert_eq!(
n.get_member("node2").unwrap().status,
NodeStatus::Suspected
);
}
#[test]
fn test_update_suspicion_not_before_timeout() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.update_suspicion(500);
assert_eq!(
n.get_member("node2").unwrap().status,
NodeStatus::Alive
);
}
#[test]
fn test_update_suspicion_does_not_affect_self() {
let mut n = node("node1");
n.update_suspicion(99_999);
assert_eq!(
n.get_member("node1").unwrap().status,
NodeStatus::Alive
);
}
#[test]
fn test_promote_dead_after_dead_timeout() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Suspected;
}
n.promote_dead(6_000);
assert_eq!(
n.get_member("node2").unwrap().status,
NodeStatus::Dead
);
}
#[test]
fn test_promote_dead_not_before_timeout() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Suspected;
}
n.promote_dead(4_999);
assert_eq!(
n.get_member("node2").unwrap().status,
NodeStatus::Suspected
);
}
#[test]
fn test_remove_dead_cleans_old_dead_nodes() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Dead;
m.last_seen = 0;
}
n.remove_dead(10_001);
assert!(n.get_member("node2").is_none());
}
#[test]
fn test_remove_dead_keeps_recent_dead() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Dead;
m.last_seen = 8_000;
}
n.remove_dead(9_000); assert!(n.get_member("node2").is_some());
}
#[test]
fn test_create_gossip_message_includes_self() {
let n = node("node1");
let msg = n.create_gossip_message();
assert!(msg.nodes.iter().any(|ni| ni.id == "node1"));
assert_eq!(msg.sender_id, "node1");
}
#[test]
fn test_create_gossip_message_includes_all_members() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.join("node3".into(), "127.0.0.1:7003".into());
let msg = n.create_gossip_message();
assert_eq!(msg.nodes.len(), 3);
}
#[test]
fn test_alive_members_count() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
assert_eq!(n.alive_members().len(), 2);
}
#[test]
fn test_suspected_members_count() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Suspected;
}
assert_eq!(n.suspected_members().len(), 1);
assert_eq!(n.alive_members().len(), 1); }
#[test]
fn test_dead_members_count() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Dead;
}
assert_eq!(n.dead_members().len(), 1);
assert_eq!(n.alive_members().len(), 1);
}
#[test]
fn test_heartbeat_propagated_via_gossip() {
let mut node1 = node("node1");
let mut node2 = GossipProtocol::new(
"node2".into(),
"127.0.0.1:7002".into(),
default_config(),
);
node2.heartbeat(500);
node2.heartbeat(1000);
let msg = node2.create_gossip_message();
node1.receive_gossip(msg, 1500);
let hb = node1.get_member("node2").unwrap().heartbeat;
assert_eq!(hb, 2);
}
#[test]
fn test_metadata_preserved_in_gossip() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
let mut node_with_meta = NodeInfo::new("node2".into(), "127.0.0.1:7002".into(), 0);
node_with_meta.heartbeat = 1;
node_with_meta
.metadata
.insert("dc".into(), "us-east".into());
let msg = GossipMessage {
sender_id: "node2".into(),
nodes: vec![node_with_meta],
timestamp: 0,
};
n.receive_gossip(msg, 1000);
let dc = n
.get_member("node2")
.unwrap()
.metadata
.get("dc")
.map(|s| s.as_str());
assert_eq!(dc, Some("us-east"));
}
#[test]
fn test_join_multiple_nodes() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.join("node3".into(), "127.0.0.1:7003".into());
n.join("node4".into(), "127.0.0.1:7004".into());
assert_eq!(n.member_count(), 4);
}
#[test]
fn test_heartbeat_updates_last_seen_to_timestamp() {
let mut n = node("node1");
n.heartbeat(9000);
assert_eq!(n.get_member("node1").unwrap().last_seen, 9000);
}
#[test]
fn test_member_count_after_join() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
assert_eq!(n.member_count(), 2);
}
#[test]
fn test_get_member_returns_none_for_unknown() {
let n = node("node1");
assert!(n.get_member("unknown").is_none());
}
#[test]
fn test_node_status_equality() {
assert_eq!(NodeStatus::Alive, NodeStatus::Alive);
assert_ne!(NodeStatus::Alive, NodeStatus::Dead);
assert_ne!(NodeStatus::Suspected, NodeStatus::Left);
}
#[test]
fn test_gossip_config_default() {
let cfg = GossipConfig::default();
assert_eq!(cfg.fanout, 3);
assert!(cfg.interval_ms > 0);
assert!(cfg.suspicion_timeout_ms > 0);
assert!(cfg.dead_timeout_ms > cfg.suspicion_timeout_ms);
}
#[test]
fn test_receive_gossip_updates_last_seen() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
let msg = GossipMessage {
sender_id: "node3".into(),
nodes: vec![NodeInfo {
id: "node2".into(),
address: "127.0.0.1:7002".into(),
status: NodeStatus::Alive,
heartbeat: 10,
last_seen: 0,
metadata: HashMap::new(),
}],
timestamp: 500,
};
n.receive_gossip(msg, 3000);
assert_eq!(n.get_member("node2").unwrap().last_seen, 3000);
}
#[test]
fn test_tick_multiple_times_rotates_peers() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.join("node3".into(), "127.0.0.1:7003".into());
n.join("node4".into(), "127.0.0.1:7004".into());
let tick1 = n.tick(1000);
let tick2 = n.tick(2000);
assert!(tick1.len() <= 2);
assert!(tick2.len() <= 2);
}
#[test]
fn test_alive_members_excludes_dead() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
n.join("node3".into(), "127.0.0.1:7003".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Dead;
}
assert_eq!(n.alive_members().len(), 2); }
#[test]
fn test_left_members_not_in_alive() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Left;
}
let alive: Vec<&NodeInfo> = n
.alive_members()
.into_iter()
.filter(|ni| ni.id == "node2")
.collect();
assert!(alive.is_empty());
}
#[test]
fn test_update_suspicion_only_alive_nodes() {
let mut n = node("node1");
n.join("node2".into(), "127.0.0.1:7002".into());
if let Some(m) = n.members.get_mut("node2") {
m.status = NodeStatus::Suspected;
}
n.update_suspicion(99_999);
assert_eq!(
n.get_member("node2").unwrap().status,
NodeStatus::Suspected
);
}
#[test]
fn test_gossip_message_timestamp() {
let mut n = node("node1");
n.heartbeat(7777);
let msg = n.create_gossip_message();
assert_eq!(msg.timestamp, 7777);
}
#[test]
fn test_generation_increments_on_tick() {
let mut n = node("node1");
let gen_before = n.generation;
n.tick(1000);
assert!(n.generation > gen_before);
}
}