use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, error, info, warn};
use super::{NodeHealth, NodeInfo, NodeRole, NodeStatus};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GossipConfig {
pub protocol_period_ms: u64,
pub fanout: usize,
pub ping_timeout_ms: u64,
pub indirect_probes: usize,
pub indirect_ping_timeout_ms: u64,
pub suspicion_mult: u32,
pub min_suspicion_timeout_ms: u64,
pub rejoin_interval_rounds: u64,
pub dead_node_gc_timeout_ms: u64,
pub max_gossip_messages: usize,
pub gossip_port: u16,
pub max_packet_size: usize,
pub seed_nodes: Vec<String>,
}
impl Default for GossipConfig {
fn default() -> Self {
Self {
protocol_period_ms: 1000,
fanout: 3,
ping_timeout_ms: 500,
indirect_probes: 3,
indirect_ping_timeout_ms: 1000,
suspicion_mult: 6,
min_suspicion_timeout_ms: 30_000, rejoin_interval_rounds: 30, dead_node_gc_timeout_ms: 300_000, max_gossip_messages: 10,
gossip_port: 7947,
max_packet_size: 65507, seed_nodes: Vec::new(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MemberState {
Alive,
Suspect,
Dead,
Left,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GossipMember {
pub node_id: String,
pub address: SocketAddr,
pub api_address: String,
pub role: NodeRole,
pub state: MemberState,
pub incarnation: u64,
pub last_updated_ms: u64,
pub suspect_time_ms: Option<u64>,
pub metadata: HashMap<String, String>,
}
impl GossipMember {
pub fn new(node_id: String, address: SocketAddr, api_address: String, role: NodeRole) -> Self {
Self {
node_id,
address,
api_address,
role,
state: MemberState::Alive,
incarnation: 0,
last_updated_ms: current_time_ms(),
suspect_time_ms: None,
metadata: HashMap::new(),
}
}
pub fn to_node_info(&self) -> NodeInfo {
let mut info = NodeInfo::new(self.node_id.clone(), self.api_address.clone(), self.role);
info.health = NodeHealth {
status: match self.state {
MemberState::Alive => NodeStatus::Healthy,
MemberState::Suspect => NodeStatus::Suspect,
MemberState::Dead | MemberState::Left => NodeStatus::Offline,
},
last_healthy_ms: self.last_updated_ms,
..Default::default()
};
info.metadata = self.metadata.clone();
info
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GossipMessage {
Ping {
seq_no: u64,
from: String,
updates: Vec<MemberStateUpdate>,
},
Ack {
seq_no: u64,
from: String,
updates: Vec<MemberStateUpdate>,
},
PingReq {
seq_no: u64,
from: String,
target: String,
target_addr: SocketAddr,
updates: Vec<MemberStateUpdate>,
},
IndirectAck {
seq_no: u64,
from: String,
target: String,
updates: Vec<MemberStateUpdate>,
},
Join { member: GossipMember },
JoinAck { members: Vec<GossipMember> },
Leave { node_id: String, incarnation: u64 },
SyncRequest { from: String },
SyncResponse { members: Vec<GossipMember> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemberStateUpdate {
pub node_id: String,
pub state: MemberState,
pub incarnation: u64,
pub address: Option<SocketAddr>,
pub api_address: Option<String>,
pub role: Option<NodeRole>,
}
#[derive(Debug, Clone)]
pub enum GossipEvent {
NodeJoined(GossipMember),
NodeLeft(String),
NodeFailed(String),
NodeRecovered(String),
NodeUpdated(GossipMember),
}
pub struct GossipProtocol {
config: GossipConfig,
local_member: GossipMember,
members: Arc<RwLock<HashMap<String, GossipMember>>>,
_seq_no: AtomicU64,
pending_pings: Arc<RwLock<HashMap<u64, PendingPing>>>,
update_queue: Arc<RwLock<VecDeque<MemberStateUpdate>>>,
probe_index: Arc<RwLock<usize>>,
running: Arc<AtomicBool>,
event_tx: mpsc::Sender<GossipEvent>,
}
struct GossipContext {
members: Arc<RwLock<HashMap<String, GossipMember>>>,
pending_pings: Arc<RwLock<HashMap<u64, PendingPing>>>,
update_queue: Arc<RwLock<VecDeque<MemberStateUpdate>>>,
config: GossipConfig,
local_member: GossipMember,
event_tx: mpsc::Sender<GossipEvent>,
}
struct PendingPing {
target: String,
sent_at: Instant,
_is_indirect: bool,
}
impl GossipProtocol {
pub fn new(
config: GossipConfig,
local_member: GossipMember,
event_tx: mpsc::Sender<GossipEvent>,
) -> Self {
Self {
config,
local_member,
members: Arc::new(RwLock::new(HashMap::new())),
_seq_no: AtomicU64::new(0),
pending_pings: Arc::new(RwLock::new(HashMap::new())),
update_queue: Arc::new(RwLock::new(VecDeque::new())),
probe_index: Arc::new(RwLock::new(0)),
running: Arc::new(AtomicBool::new(false)),
event_tx,
}
}
pub async fn start(&self) -> Result<(), GossipError> {
if self.running.swap(true, Ordering::SeqCst) {
return Err(GossipError::AlreadyRunning);
}
let bind_addr = format!("0.0.0.0:{}", self.config.gossip_port);
let socket = Arc::new(
UdpSocket::bind(&bind_addr)
.await
.map_err(|e| GossipError::BindError(e.to_string()))?,
);
info!(
node_id = %self.local_member.node_id,
address = %bind_addr,
"Gossip protocol started"
);
{
let mut members = self.members.write().await;
members.insert(self.local_member.node_id.clone(), self.local_member.clone());
}
self.join_seeds(&socket).await;
let socket_clone = socket.clone();
let running = self.running.clone();
let members = self.members.clone();
let pending_pings = self.pending_pings.clone();
let update_queue = self.update_queue.clone();
let config = self.config.clone();
let local_member = self.local_member.clone();
let event_tx = self.event_tx.clone();
let probe_index = self.probe_index.clone();
let recv_ctx = GossipContext {
members: members.clone(),
pending_pings: pending_pings.clone(),
update_queue: update_queue.clone(),
config: config.clone(),
local_member: local_member.clone(),
event_tx: event_tx.clone(),
};
let socket_recv = socket.clone();
tokio::spawn(async move {
Self::receiver_loop(socket_recv, recv_ctx).await;
});
let loop_ctx = GossipContext {
members,
pending_pings,
update_queue,
config,
local_member,
event_tx,
};
tokio::spawn(async move {
Self::protocol_loop(socket_clone, running, loop_ctx, probe_index).await;
});
Ok(())
}
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
info!(
node_id = %self.local_member.node_id,
"Gossip protocol stopped"
);
}
pub async fn get_members(&self) -> Vec<GossipMember> {
let members = self.members.read().await;
members.values().cloned().collect()
}
pub async fn get_alive_members(&self) -> Vec<GossipMember> {
let members = self.members.read().await;
members
.values()
.filter(|m| m.state == MemberState::Alive)
.cloned()
.collect()
}
pub async fn get_member(&self, node_id: &str) -> Option<GossipMember> {
let members = self.members.read().await;
members.get(node_id).cloned()
}
pub async fn update_metadata(&self, key: String, value: String) {
let mut members = self.members.write().await;
if let Some(member) = members.get_mut(&self.local_member.node_id) {
member.metadata.insert(key, value);
member.incarnation += 1;
member.last_updated_ms = current_time_ms();
}
}
pub async fn leave(&self) -> Result<(), GossipError> {
let members = self.members.read().await;
let local = members
.get(&self.local_member.node_id)
.ok_or(GossipError::NotFound)?;
let leave_msg = GossipMessage::Leave {
node_id: self.local_member.node_id.clone(),
incarnation: local.incarnation + 1,
};
let msg_bytes = serialize_message(&leave_msg)?;
let socket = UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|e| GossipError::BindError(e.to_string()))?;
for member in members.values() {
if member.node_id != self.local_member.node_id {
let _ = socket.send_to(&msg_bytes, member.address).await;
}
}
self.stop();
Ok(())
}
async fn join_seeds(&self, socket: &UdpSocket) {
for seed in &self.config.seed_nodes {
let resolved = if let Ok(addr) = seed.parse::<SocketAddr>() {
Some(addr)
} else if let Ok(mut addrs) = tokio::net::lookup_host(seed.as_str()).await {
addrs.next()
} else {
warn!(seed = %seed, "Failed to resolve seed node address");
None
};
if let Some(addr) = resolved {
let join_msg = GossipMessage::Join {
member: self.local_member.clone(),
};
if let Ok(bytes) = serialize_message(&join_msg) {
info!(seed = %seed, resolved = %addr, "Sending join request to seed node");
let _ = socket.send_to(&bytes, addr).await;
}
}
}
}
async fn receiver_loop(socket: Arc<UdpSocket>, ctx: GossipContext) {
let mut buf = vec![0u8; ctx.config.max_packet_size];
loop {
match socket.recv_from(&mut buf).await {
Ok((len, src)) => {
if let Ok(msg) = deserialize_message(&buf[..len]) {
Self::handle_message(&socket, msg, src, &ctx).await;
}
}
Err(e) => {
error!(error = %e, "Error receiving gossip message");
}
}
}
}
async fn handle_message(
socket: &UdpSocket,
msg: GossipMessage,
src: SocketAddr,
ctx: &GossipContext,
) {
match msg {
GossipMessage::Ping {
seq_no,
from,
updates,
} => {
Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
{
let mut members_guard = ctx.members.write().await;
if let Some(member) = members_guard.get_mut(&from) {
member.last_updated_ms = current_time_ms();
}
}
let reply_updates =
Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
let ack = GossipMessage::Ack {
seq_no,
from: ctx.local_member.node_id.clone(),
updates: reply_updates,
};
if let Ok(bytes) = serialize_message(&ack) {
let _ = socket.send_to(&bytes, src).await;
}
debug!(from = %from, seq = seq_no, "Received ping, sent ack");
}
GossipMessage::Ack {
seq_no,
from,
updates,
} => {
Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
{
let mut members_guard = ctx.members.write().await;
if let Some(member) = members_guard.get_mut(&from) {
member.last_updated_ms = current_time_ms();
}
}
let mut pending = ctx.pending_pings.write().await;
if pending.remove(&seq_no).is_some() {
debug!(from = %from, seq = seq_no, "Received ack");
}
}
GossipMessage::PingReq {
seq_no,
from,
target,
target_addr,
updates,
} => {
Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
let ping = GossipMessage::Ping {
seq_no,
from: ctx.local_member.node_id.clone(),
updates: Vec::new(),
};
if let Ok(bytes) = serialize_message(&ping) {
let _ = socket.send_to(&bytes, target_addr).await;
}
debug!(from = %from, target = %target, "Forwarding indirect ping");
}
GossipMessage::IndirectAck {
seq_no,
from,
target,
updates,
} => {
Self::apply_updates(&ctx.members, &updates, &ctx.config, &ctx.event_tx).await;
let mut pending = ctx.pending_pings.write().await;
pending.remove(&seq_no);
debug!(from = %from, target = %target, "Received indirect ack");
}
GossipMessage::Join { member } => {
info!(node_id = %member.node_id, address = %member.address, "Node joining cluster");
let mut members_guard = ctx.members.write().await;
let was_dead_or_left = members_guard
.get(&member.node_id)
.map(|m| matches!(m.state, MemberState::Dead | MemberState::Left))
.unwrap_or(false);
let is_new = !members_guard.contains_key(&member.node_id);
members_guard.insert(member.node_id.clone(), member.clone());
let all_members: Vec<GossipMember> = members_guard.values().cloned().collect();
drop(members_guard);
let join_ack = GossipMessage::JoinAck {
members: all_members,
};
if let Ok(bytes) = serialize_message(&join_ack) {
let _ = socket.send_to(&bytes, src).await;
}
if is_new {
let _ = ctx.event_tx.send(GossipEvent::NodeJoined(member)).await;
} else if was_dead_or_left {
info!(node_id = %member.node_id, "Dead/left node rejoined cluster");
let _ = ctx
.event_tx
.send(GossipEvent::NodeRecovered(member.node_id.clone()))
.await;
}
}
GossipMessage::JoinAck {
members: new_members,
} => {
let mut members_guard = ctx.members.write().await;
for member in new_members {
if !members_guard.contains_key(&member.node_id) {
members_guard.insert(member.node_id.clone(), member.clone());
let _ = ctx.event_tx.send(GossipEvent::NodeJoined(member)).await;
}
}
info!(count = members_guard.len(), "Received cluster membership");
}
GossipMessage::Leave {
node_id,
incarnation,
} => {
let mut members_guard = ctx.members.write().await;
if let Some(member) = members_guard.get_mut(&node_id) {
if incarnation >= member.incarnation {
member.state = MemberState::Left;
member.incarnation = incarnation;
info!(node_id = %node_id, "Node left cluster gracefully");
let _ = ctx.event_tx.send(GossipEvent::NodeLeft(node_id)).await;
}
}
}
GossipMessage::SyncRequest { from } => {
let members_guard = ctx.members.read().await;
let all_members: Vec<GossipMember> = members_guard.values().cloned().collect();
let sync_response = GossipMessage::SyncResponse {
members: all_members,
};
if let Ok(bytes) = serialize_message(&sync_response) {
let _ = socket.send_to(&bytes, src).await;
}
debug!(from = %from, "Handled sync request");
}
GossipMessage::SyncResponse {
members: new_members,
} => {
let mut members_guard = ctx.members.write().await;
for member in new_members {
members_guard
.entry(member.node_id.clone())
.and_modify(|existing| {
if member.incarnation > existing.incarnation {
*existing = member.clone();
}
})
.or_insert(member);
}
}
}
}
async fn protocol_loop(
socket: Arc<UdpSocket>,
running: Arc<AtomicBool>,
ctx: GossipContext,
probe_index: Arc<RwLock<usize>>,
) {
let protocol_period = Duration::from_millis(ctx.config.protocol_period_ms);
let mut seq_counter = 0u64;
let mut round_counter = 0u64;
while running.load(Ordering::SeqCst) {
tokio::time::sleep(protocol_period).await;
round_counter += 1;
if ctx.config.rejoin_interval_rounds > 0
&& round_counter.is_multiple_of(ctx.config.rejoin_interval_rounds)
{
let alive_count = {
let members_guard = ctx.members.read().await;
members_guard
.values()
.filter(|m| {
m.node_id != ctx.local_member.node_id && m.state == MemberState::Alive
})
.count()
};
if alive_count < ctx.config.seed_nodes.len() {
for seed in &ctx.config.seed_nodes {
let resolved = if let Ok(addr) = seed.parse::<SocketAddr>() {
Some(addr)
} else if let Ok(mut addrs) = tokio::net::lookup_host(seed.as_str()).await {
addrs.next()
} else {
None
};
if let Some(addr) = resolved {
let join_msg = GossipMessage::Join {
member: ctx.local_member.clone(),
};
if let Ok(bytes) = serialize_message(&join_msg) {
debug!(seed = %seed, "Periodic seed re-join attempt");
let _ = socket.send_to(&bytes, addr).await;
}
}
}
}
}
if ctx.config.dead_node_gc_timeout_ms > 0 && round_counter.is_multiple_of(60) {
let now = current_time_ms();
let mut members_guard = ctx.members.write().await;
let gc_candidates: Vec<String> = members_guard
.values()
.filter(|m| {
matches!(m.state, MemberState::Dead | MemberState::Left)
&& now.saturating_sub(m.last_updated_ms)
> ctx.config.dead_node_gc_timeout_ms
})
.map(|m| m.node_id.clone())
.collect();
for node_id in &gc_candidates {
members_guard.remove(node_id);
info!(node_id = %node_id, "Garbage collected dead/left node from member list");
}
drop(members_guard);
}
let target = {
let members_guard = ctx.members.read().await;
let other_members: Vec<_> = members_guard
.values()
.filter(|m| {
m.node_id != ctx.local_member.node_id
&& matches!(m.state, MemberState::Alive | MemberState::Suspect)
})
.collect();
if other_members.is_empty() {
continue;
}
let mut idx = probe_index.write().await;
*idx = (*idx + 1) % other_members.len();
other_members[*idx].clone()
};
seq_counter += 1;
let updates =
Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
let ping = GossipMessage::Ping {
seq_no: seq_counter,
from: ctx.local_member.node_id.clone(),
updates,
};
if let Ok(bytes) = serialize_message(&ping) {
let _ = socket.send_to(&bytes, target.address).await;
let mut pending = ctx.pending_pings.write().await;
pending.insert(
seq_counter,
PendingPing {
target: target.node_id.clone(),
sent_at: Instant::now(),
_is_indirect: false,
},
);
}
tokio::time::sleep(Duration::from_millis(ctx.config.ping_timeout_ms)).await;
let timed_out = {
let pending = ctx.pending_pings.read().await;
pending
.iter()
.filter(|(_, p)| {
p.sent_at.elapsed() > Duration::from_millis(ctx.config.ping_timeout_ms)
})
.map(|(seq, p)| (*seq, p.target.clone()))
.collect::<Vec<_>>()
};
for (seq, target_id) in timed_out {
Self::try_indirect_probes(&socket, &ctx, &target_id, seq).await;
}
Self::check_suspicions(&ctx.members, &ctx.config, &ctx.event_tx).await;
{
let mut members_guard = ctx.members.write().await;
if let Some(local) = members_guard.get_mut(&ctx.local_member.node_id) {
if matches!(local.state, MemberState::Suspect | MemberState::Dead) {
let old_state = local.state;
local.incarnation += 1;
local.state = MemberState::Alive;
local.suspect_time_ms = None;
local.last_updated_ms = current_time_ms();
warn!(
old_state = ?old_state,
new_incarnation = local.incarnation,
"Self-refutation: local node was marked {:?}, reasserting Alive", old_state
);
let mut queue = ctx.update_queue.write().await;
queue.push_back(MemberStateUpdate {
node_id: ctx.local_member.node_id.clone(),
state: MemberState::Alive,
incarnation: local.incarnation,
address: Some(local.address),
api_address: Some(local.api_address.clone()),
role: Some(local.role),
});
}
}
}
}
}
async fn try_indirect_probes(
socket: &UdpSocket,
ctx: &GossipContext,
target_id: &str,
_seq: u64,
) {
let members_guard = ctx.members.read().await;
let target = match members_guard.get(target_id) {
Some(t) => t.clone(),
None => return,
};
let indirect_nodes: Vec<_> = members_guard
.values()
.filter(|m| {
m.node_id != ctx.local_member.node_id
&& m.node_id != target_id
&& m.state == MemberState::Alive
})
.take(ctx.config.indirect_probes)
.cloned()
.collect();
drop(members_guard);
for node in indirect_nodes {
let updates =
Self::get_updates(&ctx.update_queue, ctx.config.max_gossip_messages).await;
let ping_req = GossipMessage::PingReq {
seq_no: rand::random(),
from: ctx.local_member.node_id.clone(),
target: target_id.to_string(),
target_addr: target.address,
updates,
};
if let Ok(bytes) = serialize_message(&ping_req) {
let _ = socket.send_to(&bytes, node.address).await;
}
}
tokio::time::sleep(Duration::from_millis(ctx.config.indirect_ping_timeout_ms)).await;
let still_pending = {
let pending = ctx.pending_pings.read().await;
pending.values().any(|p| p.target == target_id)
};
if still_pending {
let mut members_guard = ctx.members.write().await;
if let Some(member) = members_guard.get_mut(target_id) {
if member.state == MemberState::Alive {
member.state = MemberState::Suspect;
member.suspect_time_ms = Some(current_time_ms());
warn!(node_id = %target_id, "Node marked as suspect");
}
}
}
let mut pending = ctx.pending_pings.write().await;
pending.retain(|_, p| p.target != target_id);
}
async fn check_suspicions(
members: &RwLock<HashMap<String, GossipMember>>,
config: &GossipConfig,
event_tx: &mpsc::Sender<GossipEvent>,
) {
let now = current_time_ms();
let members_guard = members.read().await;
let member_count = members_guard.len().max(1);
let calculated_timeout = (config.suspicion_mult as u64)
* config.protocol_period_ms
* ((member_count as f64 + 1.0).ln().ceil() as u64).max(1);
let suspicion_timeout_ms = calculated_timeout.max(config.min_suspicion_timeout_ms);
let suspects: Vec<_> = members_guard
.values()
.filter(|m| {
m.state == MemberState::Suspect
&& m.suspect_time_ms
.is_some_and(|t| now - t > suspicion_timeout_ms)
})
.map(|m| m.node_id.clone())
.collect();
drop(members_guard);
for node_id in suspects {
let mut members_guard = members.write().await;
if let Some(member) = members_guard.get_mut(&node_id) {
member.state = MemberState::Dead;
error!(node_id = %node_id, "Node marked as dead");
let _ = event_tx.send(GossipEvent::NodeFailed(node_id)).await;
}
}
}
async fn apply_updates(
members: &RwLock<HashMap<String, GossipMember>>,
updates: &[MemberStateUpdate],
_config: &GossipConfig,
event_tx: &mpsc::Sender<GossipEvent>,
) {
let mut members_guard = members.write().await;
for update in updates {
if let Some(member) = members_guard.get_mut(&update.node_id) {
if update.incarnation > member.incarnation
|| (update.incarnation == member.incarnation
&& update.state as u8 > member.state as u8)
{
let old_state = member.state;
member.state = update.state;
member.incarnation = update.incarnation;
member.last_updated_ms = current_time_ms();
if update.state == MemberState::Suspect {
member.suspect_time_ms = Some(current_time_ms());
}
if let Some(addr) = update.address {
member.address = addr;
}
if let Some(ref api_addr) = update.api_address {
member.api_address = api_addr.clone();
}
if let Some(role) = update.role {
member.role = role;
}
if old_state != update.state {
match update.state {
MemberState::Dead => {
let _ = event_tx
.send(GossipEvent::NodeFailed(update.node_id.clone()))
.await;
}
MemberState::Left => {
let _ = event_tx
.send(GossipEvent::NodeLeft(update.node_id.clone()))
.await;
}
MemberState::Alive
if matches!(
old_state,
MemberState::Suspect | MemberState::Dead
) =>
{
let _ = event_tx
.send(GossipEvent::NodeRecovered(update.node_id.clone()))
.await;
}
_ => {}
}
}
}
} else if update.state == MemberState::Alive {
let member = GossipMember {
node_id: update.node_id.clone(),
address: update.address.unwrap_or_else(|| {
"0.0.0.0:0"
.parse()
.unwrap_or_else(|_| std::net::SocketAddr::from(([0, 0, 0, 0], 0)))
}),
api_address: update.api_address.clone().unwrap_or_default(),
role: update.role.unwrap_or(NodeRole::Replica),
state: MemberState::Alive,
incarnation: update.incarnation,
last_updated_ms: current_time_ms(),
suspect_time_ms: None,
metadata: HashMap::new(),
};
members_guard.insert(update.node_id.clone(), member.clone());
let _ = event_tx.send(GossipEvent::NodeJoined(member)).await;
}
}
}
async fn get_updates(
queue: &RwLock<VecDeque<MemberStateUpdate>>,
max_count: usize,
) -> Vec<MemberStateUpdate> {
let mut queue_guard = queue.write().await;
let mut updates = Vec::with_capacity(max_count);
for _ in 0..max_count {
if let Some(update) = queue_guard.pop_front() {
updates.push(update);
} else {
break;
}
}
updates
}
}
#[derive(Debug, thiserror::Error)]
pub enum GossipError {
#[error("Gossip protocol already running")]
AlreadyRunning,
#[error("Failed to bind socket: {0}")]
BindError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Not found")]
NotFound,
}
fn current_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
fn serialize_message(msg: &GossipMessage) -> Result<Vec<u8>, GossipError> {
serde_json::to_vec(msg).map_err(|e| GossipError::SerializationError(e.to_string()))
}
fn deserialize_message(data: &[u8]) -> Result<GossipMessage, GossipError> {
serde_json::from_slice(data).map_err(|e| GossipError::SerializationError(e.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gossip_member_creation() {
let member = GossipMember::new(
"node-1".to_string(),
"127.0.0.1:7947".parse().unwrap(),
"127.0.0.1:3000".to_string(),
NodeRole::Primary,
);
assert_eq!(member.node_id, "node-1");
assert_eq!(member.state, MemberState::Alive);
assert_eq!(member.incarnation, 0);
}
#[test]
fn test_member_to_node_info() {
let member = GossipMember::new(
"node-1".to_string(),
"127.0.0.1:7947".parse().unwrap(),
"127.0.0.1:3000".to_string(),
NodeRole::Primary,
);
let node_info = member.to_node_info();
assert_eq!(node_info.node_id, "node-1");
assert_eq!(node_info.address, "127.0.0.1:3000");
assert_eq!(node_info.role, NodeRole::Primary);
assert_eq!(node_info.health.status, NodeStatus::Healthy);
}
#[test]
fn test_message_serialization() {
let msg = GossipMessage::Ping {
seq_no: 1,
from: "node-1".to_string(),
updates: vec![],
};
let bytes = serialize_message(&msg).unwrap();
let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
match deserialized {
GossipMessage::Ping { seq_no, from, .. } => {
assert_eq!(seq_no, 1);
assert_eq!(from, "node-1");
}
_ => panic!("Wrong message type"),
}
}
#[test]
fn test_gossip_config_defaults() {
let config = GossipConfig::default();
assert_eq!(config.protocol_period_ms, 1000);
assert_eq!(config.fanout, 3);
assert_eq!(config.ping_timeout_ms, 500);
assert_eq!(config.indirect_probes, 3);
assert_eq!(config.gossip_port, 7947);
}
#[test]
fn test_member_state_update() {
let update = MemberStateUpdate {
node_id: "node-1".to_string(),
state: MemberState::Suspect,
incarnation: 5,
address: Some("127.0.0.1:7947".parse().unwrap()),
api_address: Some("127.0.0.1:3000".to_string()),
role: Some(NodeRole::Replica),
};
assert_eq!(update.node_id, "node-1");
assert_eq!(update.state, MemberState::Suspect);
assert_eq!(update.incarnation, 5);
}
#[tokio::test]
async fn test_gossip_protocol_creation() {
let config = GossipConfig::default();
let member = GossipMember::new(
"node-1".to_string(),
"127.0.0.1:7947".parse().unwrap(),
"127.0.0.1:3000".to_string(),
NodeRole::Primary,
);
let (tx, _rx) = mpsc::channel(100);
let protocol = GossipProtocol::new(config, member, tx);
assert!(protocol.get_members().await.is_empty());
}
#[test]
fn test_gossip_error_display() {
let err = GossipError::AlreadyRunning;
assert!(err.to_string().contains("already running"));
let err = GossipError::BindError("address in use".to_string());
assert!(err.to_string().contains("address in use"));
let err = GossipError::SerializationError("invalid json".to_string());
assert!(err.to_string().contains("invalid json"));
let err = GossipError::NotFound;
assert!(err.to_string().contains("Not found"));
}
#[test]
fn test_member_state_transitions() {
assert_eq!(MemberState::Alive as u8, 0);
assert_eq!(MemberState::Suspect as u8, 1);
assert_eq!(MemberState::Dead as u8, 2);
assert_eq!(MemberState::Left as u8, 3);
}
#[test]
fn test_gossip_message_variants() {
let ping = GossipMessage::Ping {
seq_no: 42,
from: "node-1".to_string(),
updates: vec![],
};
let bytes = serialize_message(&ping).unwrap();
let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
match deserialized {
GossipMessage::Ping { seq_no, from, .. } => {
assert_eq!(seq_no, 42);
assert_eq!(from, "node-1");
}
_ => panic!("Expected Ping"),
}
let ack = GossipMessage::Ack {
seq_no: 42,
from: "node-2".to_string(),
updates: vec![],
};
let bytes = serialize_message(&ack).unwrap();
let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
match deserialized {
GossipMessage::Ack { seq_no, from, .. } => {
assert_eq!(seq_no, 42);
assert_eq!(from, "node-2");
}
_ => panic!("Expected Ack"),
}
let ping_req = GossipMessage::PingReq {
seq_no: 100,
from: "node-1".to_string(),
target: "node-3".to_string(),
target_addr: "127.0.0.1:7949".parse().unwrap(),
updates: vec![],
};
let bytes = serialize_message(&ping_req).unwrap();
let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
match deserialized {
GossipMessage::PingReq {
seq_no,
from,
target,
..
} => {
assert_eq!(seq_no, 100);
assert_eq!(from, "node-1");
assert_eq!(target, "node-3");
}
_ => panic!("Expected PingReq"),
}
}
#[test]
fn test_gossip_message_with_updates() {
let update = MemberStateUpdate {
node_id: "node-2".to_string(),
state: MemberState::Alive,
incarnation: 1,
address: Some("127.0.0.1:7948".parse().unwrap()),
api_address: Some("127.0.0.1:3001".to_string()),
role: Some(NodeRole::Replica),
};
let ping = GossipMessage::Ping {
seq_no: 1,
from: "node-1".to_string(),
updates: vec![update],
};
let bytes = serialize_message(&ping).unwrap();
let deserialized: GossipMessage = deserialize_message(&bytes).unwrap();
match deserialized {
GossipMessage::Ping { updates, .. } => {
assert_eq!(updates.len(), 1);
assert_eq!(updates[0].node_id, "node-2");
assert_eq!(updates[0].state, MemberState::Alive);
assert_eq!(updates[0].incarnation, 1);
}
_ => panic!("Expected Ping"),
}
}
#[test]
fn test_gossip_config_custom() {
let config = GossipConfig {
protocol_period_ms: 500,
fanout: 5,
ping_timeout_ms: 250,
indirect_probes: 2,
indirect_ping_timeout_ms: 750,
suspicion_mult: 3,
min_suspicion_timeout_ms: 15_000,
rejoin_interval_rounds: 20,
dead_node_gc_timeout_ms: 120_000,
max_gossip_messages: 15,
gossip_port: 8000,
max_packet_size: 32768,
seed_nodes: vec!["127.0.0.1:8001".to_string(), "127.0.0.1:8002".to_string()],
};
assert_eq!(config.protocol_period_ms, 500);
assert_eq!(config.fanout, 5);
assert_eq!(config.ping_timeout_ms, 250);
assert_eq!(config.indirect_probes, 2);
assert_eq!(config.indirect_ping_timeout_ms, 750);
assert_eq!(config.suspicion_mult, 3);
assert_eq!(config.max_gossip_messages, 15);
assert_eq!(config.gossip_port, 8000);
assert_eq!(config.max_packet_size, 32768);
assert_eq!(config.seed_nodes.len(), 2);
}
#[tokio::test]
async fn test_gossip_protocol_metadata() {
let config = GossipConfig::default();
let member = GossipMember::new(
"node-1".to_string(),
"127.0.0.1:7947".parse().unwrap(),
"127.0.0.1:3000".to_string(),
NodeRole::Primary,
);
let (tx, _rx) = mpsc::channel(100);
let protocol = GossipProtocol::new(config, member, tx);
protocol
.update_metadata("key1".to_string(), "value1".to_string())
.await;
protocol
.update_metadata("key2".to_string(), "value2".to_string())
.await;
}
#[test]
fn test_gossip_member_clone() {
let member = GossipMember::new(
"node-1".to_string(),
"127.0.0.1:7947".parse().unwrap(),
"127.0.0.1:3000".to_string(),
NodeRole::Primary,
);
let cloned = member.clone();
assert_eq!(cloned.node_id, member.node_id);
assert_eq!(cloned.address, member.address);
assert_eq!(cloned.api_address, member.api_address);
assert_eq!(cloned.role, member.role);
assert_eq!(cloned.state, member.state);
assert_eq!(cloned.incarnation, member.incarnation);
}
#[test]
fn test_gossip_event_variants() {
let member = GossipMember::new(
"node-1".to_string(),
"127.0.0.1:7947".parse().unwrap(),
"127.0.0.1:3000".to_string(),
NodeRole::Primary,
);
let event = GossipEvent::NodeJoined(member);
match event {
GossipEvent::NodeJoined(m) => assert_eq!(m.node_id, "node-1"),
_ => panic!("Expected NodeJoined"),
}
let event = GossipEvent::NodeLeft("node-2".to_string());
match event {
GossipEvent::NodeLeft(id) => assert_eq!(id, "node-2"),
_ => panic!("Expected NodeLeft"),
}
let event = GossipEvent::NodeFailed("node-3".to_string());
match event {
GossipEvent::NodeFailed(id) => assert_eq!(id, "node-3"),
_ => panic!("Expected NodeFailed"),
}
let event = GossipEvent::NodeRecovered("node-4".to_string());
match event {
GossipEvent::NodeRecovered(id) => assert_eq!(id, "node-4"),
_ => panic!("Expected NodeRecovered"),
}
}
}