use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::time::Instant;
pub type NodeId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum NodeState {
Alive,
Suspect,
Dead,
Leaving,
#[default]
Unknown,
}
impl NodeState {
pub fn is_healthy(&self) -> bool {
matches!(self, NodeState::Alive)
}
pub fn is_reachable(&self) -> bool {
matches!(self, NodeState::Alive | NodeState::Suspect)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct NodeCapabilities {
pub voter: bool,
pub leader_eligible: bool,
pub replica_eligible: bool,
}
impl NodeCapabilities {
pub fn full() -> Self {
Self {
voter: true,
leader_eligible: true,
replica_eligible: true,
}
}
pub fn observer() -> Self {
Self {
voter: false,
leader_eligible: false,
replica_eligible: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeInfo {
pub id: NodeId,
pub name: Option<String>,
pub rack: Option<String>,
pub client_addr: SocketAddr,
pub cluster_addr: SocketAddr,
pub capabilities: NodeCapabilities,
pub version: String,
pub tags: std::collections::HashMap<String, String>,
}
impl NodeInfo {
pub fn new(id: impl Into<String>, client_addr: SocketAddr, cluster_addr: SocketAddr) -> Self {
Self {
id: id.into(),
name: None,
rack: None,
client_addr,
cluster_addr,
capabilities: NodeCapabilities::full(),
version: env!("CARGO_PKG_VERSION").to_string(),
tags: std::collections::HashMap::new(),
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
self.rack = Some(rack.into());
self
}
pub fn with_capabilities(mut self, capabilities: NodeCapabilities) -> Self {
self.capabilities = capabilities;
self
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone)]
pub struct Node {
pub info: NodeInfo,
pub state: NodeState,
pub incarnation: u64,
pub last_seen: Instant,
pub partition_leader_count: u32,
pub partition_replica_count: u32,
pub is_raft_leader: bool,
}
impl Node {
pub fn new(info: NodeInfo) -> Self {
Self {
info,
state: NodeState::Unknown,
incarnation: 0,
last_seen: Instant::now(),
partition_leader_count: 0,
partition_replica_count: 0,
is_raft_leader: false,
}
}
pub fn touch(&mut self) {
self.last_seen = Instant::now();
}
pub fn mark_alive(&mut self, incarnation: u64) {
self.state = NodeState::Alive;
self.incarnation = incarnation;
self.touch();
}
pub fn mark_suspect(&mut self) {
if self.state == NodeState::Alive || self.state == NodeState::Unknown {
self.state = NodeState::Suspect;
}
}
pub fn mark_dead(&mut self) {
self.state = NodeState::Dead;
}
pub fn mark_leaving(&mut self) {
self.state = NodeState::Leaving;
}
pub fn is_healthy(&self) -> bool {
self.state.is_healthy()
}
pub fn id(&self) -> &str {
&self.info.id
}
pub fn cluster_addr(&self) -> SocketAddr {
self.info.cluster_addr
}
pub fn client_addr(&self) -> SocketAddr {
self.info.client_addr
}
pub fn load_score(&self) -> u32 {
self.partition_leader_count * 3 + self.partition_replica_count
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeGossipState {
pub id: NodeId,
pub state: NodeState,
pub incarnation: u64,
pub cluster_addr: SocketAddr,
pub client_addr: SocketAddr,
pub rack: Option<String>,
pub capabilities: NodeCapabilities,
}
impl From<&Node> for NodeGossipState {
fn from(node: &Node) -> Self {
Self {
id: node.info.id.clone(),
state: node.state,
incarnation: node.incarnation,
cluster_addr: node.info.cluster_addr,
client_addr: node.info.client_addr,
rack: node.info.rack.clone(),
capabilities: node.info.capabilities,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_state_transitions() {
let info = NodeInfo::new(
"node-1",
"127.0.0.1:9092".parse().unwrap(),
"127.0.0.1:9093".parse().unwrap(),
);
let mut node = Node::new(info);
assert_eq!(node.state, NodeState::Unknown);
assert!(!node.is_healthy());
node.mark_alive(1);
assert_eq!(node.state, NodeState::Alive);
assert!(node.is_healthy());
node.mark_suspect();
assert_eq!(node.state, NodeState::Suspect);
assert!(!node.is_healthy());
assert!(node.state.is_reachable());
node.mark_dead();
assert_eq!(node.state, NodeState::Dead);
assert!(!node.state.is_reachable());
}
#[test]
fn test_load_score() {
let info = NodeInfo::new(
"node-1",
"127.0.0.1:9092".parse().unwrap(),
"127.0.0.1:9093".parse().unwrap(),
);
let mut node = Node::new(info);
node.partition_leader_count = 2;
node.partition_replica_count = 4;
assert_eq!(node.load_score(), 2 * 3 + 4);
}
#[test]
fn test_node_capabilities() {
let full = NodeCapabilities::full();
assert!(full.voter && full.leader_eligible && full.replica_eligible);
let observer = NodeCapabilities::observer();
assert!(!observer.voter && !observer.leader_eligible && observer.replica_eligible);
}
}