use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use chitchat::transport::UdpTransport;
use chitchat::{
spawn_chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, FailureDetectorConfig,
};
use engenho_types::primitives::Quantity;
use serde::{Deserialize, Serialize};
use tokio::sync::{watch, Mutex};
use crate::NodeId;
#[derive(Clone, Debug)]
pub struct GossipConfig {
pub node_id: NodeId,
pub gossip_addr: SocketAddr,
pub seed_nodes: Vec<String>,
pub cluster_id: String,
pub gossip_interval: Duration,
pub phi_threshold: f64,
pub marked_for_deletion_grace_period: Duration,
pub initial_state: NodeState,
}
impl GossipConfig {
#[must_use]
pub fn new(
node_id: NodeId,
gossip_addr: SocketAddr,
initial_state: NodeState,
) -> Self {
Self {
node_id,
gossip_addr,
seed_nodes: Vec::new(),
cluster_id: "engenho-default".to_string(),
gossip_interval: Duration::from_millis(500),
phi_threshold: 8.0,
marked_for_deletion_grace_period: Duration::from_secs(30),
initial_state,
}
}
#[must_use]
pub fn with_seeds(mut self, seeds: Vec<String>) -> Self {
self.seed_nodes = seeds;
self
}
#[must_use]
pub fn with_cluster_id(mut self, cluster_id: impl Into<String>) -> Self {
self.cluster_id = cluster_id.into();
self
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeState {
pub node_id: NodeId,
pub gossip_addr: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub raft_addr: Option<String>,
pub roles: BTreeSet<NodeRole>,
pub capacity: NodeCapacity,
pub k8s_version: String,
pub uptime_sec: u64,
pub membership_generation: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NodeRole {
ApiServer,
Etcd,
Scheduler,
ControllerManager,
Worker,
Quarantined,
Observer,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeCapacity {
pub cpu: Quantity,
pub memory: Quantity,
pub storage: Quantity,
pub pods: u32,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NodeHealth {
Healthy,
Suspect,
Dead,
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct MembershipView {
pub members: Vec<MembershipEntry>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MembershipEntry {
pub node_id: NodeId,
pub gossip_addr: String,
pub state: NodeState,
}
#[derive(Debug, thiserror::Error)]
pub enum MembershipError {
#[error("chitchat startup failed: {0}")]
StartFailed(#[source] anyhow::Error),
#[error("could not encode NodeState: {0}")]
Encode(#[from] serde_json::Error),
#[error("invalid config: {0}")]
Invalid(String),
}
pub struct GossipMesh {
handle: ChitchatHandle,
view_tx: Arc<Mutex<watch::Sender<MembershipView>>>,
view_rx: watch::Receiver<MembershipView>,
local_node_id: NodeId,
}
const STATE_KEY: &str = "engenho.revoada.node";
impl GossipMesh {
pub async fn start(config: GossipConfig) -> Result<Self, MembershipError> {
let chitchat_id = ChitchatId::new(
config.node_id.to_hex(),
unix_seconds(),
config.gossip_addr,
);
let cc_config = ChitchatConfig {
chitchat_id: chitchat_id.clone(),
cluster_id: config.cluster_id.clone(),
gossip_interval: config.gossip_interval,
listen_addr: config.gossip_addr,
seed_nodes: config.seed_nodes.clone(),
failure_detector_config: FailureDetectorConfig {
phi_threshold: config.phi_threshold,
initial_interval: config.gossip_interval,
..Default::default()
},
marked_for_deletion_grace_period: config.marked_for_deletion_grace_period,
catchup_callback: None,
extra_liveness_predicate: None,
};
let initial_blob = serde_json::to_string(&config.initial_state)?;
let initial_kvs = vec![(STATE_KEY.to_string(), initial_blob)];
let transport = UdpTransport;
let handle = spawn_chitchat(cc_config, initial_kvs, &transport)
.await
.map_err(MembershipError::StartFailed)?;
let live_watcher = {
let cc = handle.chitchat();
let cc = cc.lock().await;
cc.live_nodes_watcher()
};
let (view_tx, view_rx) = watch::channel(MembershipView::default());
let view_tx_arc = Arc::new(Mutex::new(view_tx));
let view_tx_clone = view_tx_arc.clone();
tokio::spawn(async move {
let mut rx = live_watcher;
loop {
let snapshot = rx.borrow().clone();
let view = build_view(&snapshot);
{
let sender = view_tx_clone.lock().await;
let _ = sender.send(view);
}
if rx.changed().await.is_err() {
break;
}
}
});
let _ = chitchat_id; Ok(Self {
handle,
view_tx: view_tx_arc,
view_rx,
local_node_id: config.node_id,
})
}
#[must_use]
pub fn subscribe(&self) -> watch::Receiver<MembershipView> {
self.view_rx.clone()
}
#[must_use]
pub fn current_view(&self) -> MembershipView {
self.view_rx.borrow().clone()
}
pub async fn local_state(&self) -> Result<Option<NodeState>, MembershipError> {
let cc = self.handle.chitchat();
let cc = cc.lock().await;
let chitchat_id = cc.self_chitchat_id().clone();
if let Some(node) = cc.node_state(&chitchat_id) {
if let Some(blob) = node.get(STATE_KEY) {
let state: NodeState = serde_json::from_str(blob)?;
return Ok(Some(state));
}
}
Ok(None)
}
pub async fn update_local_state(&self, state: &NodeState) -> Result<(), MembershipError> {
let blob = serde_json::to_string(state)?;
let cc = self.handle.chitchat();
let mut cc = cc.lock().await;
cc.self_node_state().set(STATE_KEY, blob);
Ok(())
}
#[must_use]
pub async fn peers(&self) -> Vec<MembershipEntry> {
let view = self.current_view();
view.members
.into_iter()
.filter(|m| m.node_id != self.local_node_id)
.collect()
}
#[must_use]
pub fn local_node_id(&self) -> NodeId {
self.local_node_id
}
pub async fn wait_for_members(
&self,
target: usize,
timeout: Duration,
) -> Result<MembershipView, MembershipError> {
let mut rx = self.view_rx.clone();
let deadline = tokio::time::Instant::now() + timeout;
loop {
let view = rx.borrow().clone();
if view.members.len() >= target {
return Ok(view);
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(MembershipError::Invalid(format!(
"wait_for_members({target}) timed out at {} live members",
view.members.len()
)));
}
match tokio::time::timeout(remaining, rx.changed()).await {
Ok(Ok(())) => continue,
Ok(Err(_)) => {
return Err(MembershipError::Invalid("watch closed".into()));
}
Err(_) => {
return Err(MembershipError::Invalid(format!(
"wait_for_members({target}) timed out at {} live members",
view.members.len()
)));
}
}
}
}
pub async fn shutdown(self) -> Result<(), MembershipError> {
let _ = self.view_tx.lock().await; self.handle
.shutdown()
.await
.map_err(MembershipError::StartFailed)?;
Ok(())
}
}
fn build_view(
snapshot: &std::collections::BTreeMap<ChitchatId, chitchat::NodeState>,
) -> MembershipView {
let mut members = Vec::with_capacity(snapshot.len());
for (cid, node) in snapshot {
let Some(blob) = node.get(STATE_KEY) else {
continue;
};
let Ok(state) = serde_json::from_str::<NodeState>(blob) else {
continue;
};
members.push(MembershipEntry {
node_id: state.node_id,
gossip_addr: cid.gossip_advertise_addr.to_string(),
state,
});
}
members.sort_by(|a, b| a.node_id.cmp(&b.node_id));
MembershipView { members }
}
fn unix_seconds() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn node_role_serializes_snake_case() {
assert_eq!(
serde_json::to_string(&NodeRole::ApiServer).unwrap(),
"\"api_server\""
);
assert_eq!(
serde_json::to_string(&NodeRole::ControllerManager).unwrap(),
"\"controller_manager\""
);
}
#[test]
fn node_state_round_trips() {
let state = NodeState {
node_id: NodeId::new([1; 32]),
gossip_addr: "192.168.64.10:7800".into(),
raft_addr: Some("192.168.64.10:7900".into()),
roles: [NodeRole::ApiServer, NodeRole::Etcd, NodeRole::Worker]
.into_iter()
.collect(),
capacity: NodeCapacity {
cpu: Quantity::from_str("4").unwrap(),
memory: Quantity::from_str("8Gi").unwrap(),
storage: Quantity::from_str("50Gi").unwrap(),
pods: 32,
},
k8s_version: "v1.34.0".into(),
uptime_sec: 3600,
membership_generation: 7,
};
let json = serde_json::to_string(&state).unwrap();
let back: NodeState = serde_json::from_str(&json).unwrap();
assert_eq!(back, state);
}
#[test]
fn gossip_config_builder() {
let cfg = GossipConfig::new(
NodeId::new([1; 32]),
"127.0.0.1:7800".parse().unwrap(),
test_state(NodeId::new([1; 32])),
)
.with_seeds(vec!["127.0.0.1:7801".into()])
.with_cluster_id("test-cluster");
assert_eq!(cfg.seed_nodes, vec!["127.0.0.1:7801".to_string()]);
assert_eq!(cfg.cluster_id, "test-cluster");
assert_eq!(cfg.phi_threshold, 8.0);
}
pub(crate) fn test_state(node_id: NodeId) -> NodeState {
NodeState {
node_id,
gossip_addr: "127.0.0.1:0".into(),
raft_addr: None,
roles: [NodeRole::Worker].into_iter().collect(),
capacity: NodeCapacity {
cpu: Quantity::from_str("4").unwrap(),
memory: Quantity::from_str("8Gi").unwrap(),
storage: Quantity::from_str("50Gi").unwrap(),
pods: 32,
},
k8s_version: "v1.34.0".into(),
uptime_sec: 0,
membership_generation: 0,
}
}
}