use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
pub type NodeId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum NodeState {
#[default]
Alive,
Suspect,
Dead,
Leaving,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub id: NodeId,
pub api_addr: String,
pub cluster_addr: String,
pub state: NodeState,
pub incarnation: u64,
#[serde(skip)]
pub last_seen: Option<Instant>,
pub metadata: HashMap<String, String>,
}
impl NodeInfo {
pub fn new(id: NodeId, api_addr: String, cluster_addr: String) -> Self {
Self {
id,
api_addr,
cluster_addr,
state: NodeState::Alive,
incarnation: 0,
last_seen: Some(Instant::now()),
metadata: HashMap::new(),
}
}
pub fn touch(&mut self) {
self.last_seen = Some(Instant::now());
}
pub fn is_alive(&self) -> bool {
self.state == NodeState::Alive
}
pub fn is_available(&self) -> bool {
matches!(self.state, NodeState::Alive | NodeState::Suspect)
}
pub fn time_since_last_seen(&self) -> Option<Duration> {
self.last_seen.map(|t| t.elapsed())
}
}
#[derive(Debug)]
pub struct ClusterNode {
pub info: NodeInfo,
#[allow(dead_code)]
node_timeout: Duration,
}
impl ClusterNode {
pub fn new(api_addr: String, cluster_addr: String, node_timeout: Duration) -> Self {
let id = Uuid::new_v4().to_string();
info!(node_id = %id, api_addr = %api_addr, cluster_addr = %cluster_addr, "Creating cluster node");
Self {
info: NodeInfo::new(id, api_addr, cluster_addr),
node_timeout,
}
}
pub fn with_id(
id: NodeId,
api_addr: String,
cluster_addr: String,
node_timeout: Duration,
) -> Self {
info!(node_id = %id, api_addr = %api_addr, cluster_addr = %cluster_addr, "Creating cluster node with ID");
Self {
info: NodeInfo::new(id, api_addr, cluster_addr),
node_timeout,
}
}
pub fn id(&self) -> &NodeId {
&self.info.id
}
pub fn increment_incarnation(&mut self) {
self.info.incarnation += 1;
}
}
#[derive(Debug)]
pub struct NodeRegistry {
local: ClusterNode,
peers: Arc<RwLock<HashMap<NodeId, NodeInfo>>>,
node_timeout: Duration,
}
impl NodeRegistry {
pub fn new(local: ClusterNode, node_timeout: Duration) -> Self {
Self {
local,
peers: Arc::new(RwLock::new(HashMap::new())),
node_timeout,
}
}
pub fn local_id(&self) -> &NodeId {
self.local.id()
}
pub fn local_info(&self) -> &NodeInfo {
&self.local.info
}
pub fn local_mut(&mut self) -> &mut ClusterNode {
&mut self.local
}
pub async fn upsert_peer(&self, info: NodeInfo) {
let mut peers = self.peers.write().await;
if info.id == self.local.info.id {
return;
}
if let Some(existing) = peers.get(&info.id) {
if info.incarnation > existing.incarnation
|| (info.incarnation == existing.incarnation && info.state == NodeState::Alive)
{
debug!(node_id = %info.id, incarnation = info.incarnation, "Updating peer node");
peers.insert(info.id.clone(), info);
}
} else {
info!(node_id = %info.id, cluster_addr = %info.cluster_addr, "Discovered new peer node");
peers.insert(info.id.clone(), info);
}
}
pub async fn remove_peer(&self, node_id: &NodeId) {
let mut peers = self.peers.write().await;
if peers.remove(node_id).is_some() {
info!(node_id = %node_id, "Removed peer node");
}
}
pub async fn mark_dead(&self, node_id: &NodeId) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(node_id) {
if peer.state != NodeState::Dead {
warn!(node_id = %node_id, "Marking peer as dead");
peer.state = NodeState::Dead;
}
}
}
pub async fn mark_suspect(&self, node_id: &NodeId) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(node_id) {
if peer.state == NodeState::Alive {
debug!(node_id = %node_id, "Marking peer as suspect");
peer.state = NodeState::Suspect;
}
}
}
pub async fn touch_peer(&self, node_id: &NodeId) {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(node_id) {
peer.touch();
if peer.state == NodeState::Suspect {
debug!(node_id = %node_id, "Peer recovered from suspect state");
peer.state = NodeState::Alive;
}
}
}
pub async fn get_peer(&self, node_id: &NodeId) -> Option<NodeInfo> {
let peers = self.peers.read().await;
peers.get(node_id).cloned()
}
pub async fn alive_peers(&self) -> Vec<NodeInfo> {
let peers = self.peers.read().await;
peers.values().filter(|p| p.is_alive()).cloned().collect()
}
pub async fn available_peers(&self) -> Vec<NodeInfo> {
let peers = self.peers.read().await;
peers
.values()
.filter(|p| p.is_available())
.cloned()
.collect()
}
pub async fn all_peers(&self) -> Vec<NodeInfo> {
let peers = self.peers.read().await;
peers.values().cloned().collect()
}
pub async fn alive_count(&self) -> usize {
let peers = self.peers.read().await;
peers.values().filter(|p| p.is_alive()).count()
}
pub async fn check_health(&self) {
let mut peers = self.peers.write().await;
for peer in peers.values_mut() {
if let Some(elapsed) = peer.time_since_last_seen() {
if elapsed > self.node_timeout * 2 && peer.state != NodeState::Dead {
warn!(node_id = %peer.id, elapsed = ?elapsed, "Peer timed out, marking as dead");
peer.state = NodeState::Dead;
} else if elapsed > self.node_timeout && peer.state == NodeState::Alive {
debug!(node_id = %peer.id, elapsed = ?elapsed, "Peer missed heartbeat, marking as suspect");
peer.state = NodeState::Suspect;
}
}
}
}
pub async fn cluster_state(&self) -> ClusterState {
let peers = self.peers.read().await;
let alive = peers
.values()
.filter(|p| p.state == NodeState::Alive)
.count();
let suspect = peers
.values()
.filter(|p| p.state == NodeState::Suspect)
.count();
let dead = peers
.values()
.filter(|p| p.state == NodeState::Dead)
.count();
ClusterState {
local_id: self.local.info.id.clone(),
total_nodes: peers.len() + 1, alive_nodes: alive + 1, suspect_nodes: suspect,
dead_nodes: dead,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ClusterState {
pub local_id: NodeId,
pub total_nodes: usize,
pub alive_nodes: usize,
pub suspect_nodes: usize,
pub dead_nodes: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_info_creation() {
let info = NodeInfo::new(
"node-1".to_string(),
"127.0.0.1:9000".to_string(),
"127.0.0.1:9001".to_string(),
);
assert_eq!(info.id, "node-1");
assert!(info.is_alive());
assert!(info.is_available());
}
#[test]
fn test_node_state_transitions() {
let mut info = NodeInfo::new(
"node-1".to_string(),
"127.0.0.1:9000".to_string(),
"127.0.0.1:9001".to_string(),
);
assert!(info.is_alive());
assert!(info.is_available());
info.state = NodeState::Suspect;
assert!(!info.is_alive());
assert!(info.is_available());
info.state = NodeState::Dead;
assert!(!info.is_alive());
assert!(!info.is_available());
}
#[tokio::test]
async fn test_node_registry() {
let local = ClusterNode::with_id(
"local".to_string(),
"127.0.0.1:9000".to_string(),
"127.0.0.1:9001".to_string(),
Duration::from_secs(10),
);
let registry = NodeRegistry::new(local, Duration::from_secs(10));
let peer = NodeInfo::new(
"peer-1".to_string(),
"127.0.0.2:9000".to_string(),
"127.0.0.2:9001".to_string(),
);
registry.upsert_peer(peer).await;
assert_eq!(registry.alive_count().await, 1);
registry.mark_suspect(&"peer-1".to_string()).await;
let peers = registry.available_peers().await;
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].state, NodeState::Suspect);
registry.mark_dead(&"peer-1".to_string()).await;
assert_eq!(registry.alive_count().await, 0);
}
#[tokio::test]
async fn test_registry_ignores_local() {
let local = ClusterNode::with_id(
"local".to_string(),
"127.0.0.1:9000".to_string(),
"127.0.0.1:9001".to_string(),
Duration::from_secs(10),
);
let registry = NodeRegistry::new(local, Duration::from_secs(10));
let self_peer = NodeInfo::new(
"local".to_string(),
"127.0.0.1:9000".to_string(),
"127.0.0.1:9001".to_string(),
);
registry.upsert_peer(self_peer).await;
assert_eq!(registry.alive_count().await, 0);
}
}