use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use tokio::sync::RwLock;
use super::node_registry::{Node, NodeRegistry};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterMember {
pub node_id: u32,
pub api_address: String,
pub replication_address: String,
pub role: MemberRole,
pub last_wal_offset: u64,
pub last_heartbeat_ms: u64,
pub healthy: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum MemberRole {
Leader,
Follower,
Candidate,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VoteRequest {
pub term: u64,
pub candidate_id: u32,
pub last_wal_offset: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VoteResponse {
pub term: u64,
pub vote_granted: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStatus {
pub term: u64,
pub leader_id: Option<u32>,
pub self_id: u32,
pub self_role: MemberRole,
pub member_count: usize,
pub healthy_count: usize,
pub partition_count: u32,
pub members: Vec<ClusterMember>,
}
pub struct ClusterManager {
self_id: u32,
role: RwLock<MemberRole>,
term: AtomicU64,
voted_for: RwLock<Option<u32>>,
leader_id: RwLock<Option<u32>>,
members: DashMap<u32, ClusterMember>,
registry: Arc<NodeRegistry>,
}
impl ClusterManager {
pub fn new(self_id: u32, partition_count: u32) -> Self {
Self {
self_id,
role: RwLock::new(MemberRole::Follower),
term: AtomicU64::new(0),
voted_for: RwLock::new(None),
leader_id: RwLock::new(None),
members: DashMap::new(),
registry: Arc::new(NodeRegistry::new(partition_count)),
}
}
pub fn registry(&self) -> &Arc<NodeRegistry> {
&self.registry
}
pub fn self_id(&self) -> u32 {
self.self_id
}
pub fn current_term(&self) -> u64 {
self.term.load(Ordering::SeqCst)
}
pub async fn current_role(&self) -> MemberRole {
*self.role.read().await
}
pub async fn leader_id(&self) -> Option<u32> {
*self.leader_id.read().await
}
pub async fn add_member(&self, member: ClusterMember) {
let node_id = member.node_id;
let healthy = member.healthy;
self.registry.register_node(Node {
id: node_id,
address: member.api_address.clone(),
healthy,
assigned_partitions: vec![],
});
self.members.insert(node_id, member);
if node_id == self.self_id {
let mut role = self.role.write().await;
let member_ref = self.members.get(&node_id).unwrap();
*role = member_ref.role;
}
}
pub async fn remove_member(&self, node_id: u32) -> Option<ClusterMember> {
self.registry.unregister_node(node_id);
let removed = self.members.remove(&node_id).map(|(_, m)| m);
let leader = *self.leader_id.read().await;
if leader == Some(node_id) {
*self.leader_id.write().await = None;
}
removed
}
pub fn update_member_heartbeat(&self, node_id: u32, wal_offset: u64, healthy: bool) {
if let Some(mut member) = self.members.get_mut(&node_id) {
member.last_wal_offset = wal_offset;
member.healthy = healthy;
member.last_heartbeat_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
}
self.registry.set_node_health(node_id, healthy);
}
pub fn get_member(&self, node_id: u32) -> Option<ClusterMember> {
self.members.get(&node_id).map(|m| m.clone())
}
pub fn all_members(&self) -> Vec<ClusterMember> {
self.members.iter().map(|m| m.value().clone()).collect()
}
pub fn healthy_members(&self) -> Vec<ClusterMember> {
self.members
.iter()
.filter(|m| m.value().healthy)
.map(|m| m.value().clone())
.collect()
}
pub fn member_count(&self) -> usize {
self.members.len()
}
pub async fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
let current_term = self.term.load(Ordering::SeqCst);
if request.term < current_term {
return VoteResponse {
term: current_term,
vote_granted: false,
};
}
if request.term > current_term {
self.term.store(request.term, Ordering::SeqCst);
*self.voted_for.write().await = None;
*self.role.write().await = MemberRole::Follower;
}
let mut voted_for = self.voted_for.write().await;
let current_term = self.term.load(Ordering::SeqCst);
let can_vote = match *voted_for {
None => true,
Some(id) => id == request.candidate_id,
};
if can_vote {
let self_offset = self
.members
.get(&self.self_id)
.map_or(0, |m| m.last_wal_offset);
if request.last_wal_offset >= self_offset {
*voted_for = Some(request.candidate_id);
return VoteResponse {
term: current_term,
vote_granted: true,
};
}
}
VoteResponse {
term: current_term,
vote_granted: false,
}
}
pub async fn start_election(&self) -> u64 {
let new_term = self.term.fetch_add(1, Ordering::SeqCst) + 1;
*self.role.write().await = MemberRole::Candidate;
*self.voted_for.write().await = Some(self.self_id);
*self.leader_id.write().await = None;
new_term
}
pub async fn become_leader(&self, term: u64) {
let current_term = self.term.load(Ordering::SeqCst);
if term != current_term {
return; }
*self.role.write().await = MemberRole::Leader;
*self.leader_id.write().await = Some(self.self_id);
if let Some(mut member) = self.members.get_mut(&self.self_id) {
member.role = MemberRole::Leader;
}
}
pub async fn accept_leader(&self, leader_id: u32, term: u64) {
let current_term = self.term.load(Ordering::SeqCst);
if term < current_term {
return; }
if term > current_term {
self.term.store(term, Ordering::SeqCst);
*self.voted_for.write().await = None;
}
*self.role.write().await = MemberRole::Follower;
*self.leader_id.write().await = Some(leader_id);
for mut member in self.members.iter_mut() {
member.role = if member.node_id == leader_id {
MemberRole::Leader
} else {
MemberRole::Follower
};
}
}
pub fn select_leader_candidate(&self) -> Option<u32> {
let mut best: Option<(u32, u64)> = None;
for member in &self.members {
if !member.healthy {
continue;
}
match best {
None => best = Some((member.node_id, member.last_wal_offset)),
Some((_, best_offset)) => {
if member.last_wal_offset > best_offset
|| (member.last_wal_offset == best_offset
&& member.node_id < best.unwrap().0)
{
best = Some((member.node_id, member.last_wal_offset));
}
}
}
}
best.map(|(id, _)| id)
}
pub async fn status(&self) -> ClusterStatus {
ClusterStatus {
term: self.term.load(Ordering::SeqCst),
leader_id: *self.leader_id.read().await,
self_id: self.self_id,
self_role: *self.role.read().await,
member_count: self.members.len(),
healthy_count: self.registry.healthy_node_count(),
partition_count: self
.registry
.partition_distribution()
.values()
.flat_map(|v| v.iter())
.count() as u32,
members: self.all_members(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_member(id: u32, role: MemberRole, offset: u64) -> ClusterMember {
ClusterMember {
node_id: id,
api_address: format!("node-{id}:3900"),
replication_address: format!("node-{id}:3910"),
role,
last_wal_offset: offset,
last_heartbeat_ms: 0,
healthy: true,
}
}
#[tokio::test]
async fn test_create_cluster_manager() {
let cm = ClusterManager::new(0, 32);
assert_eq!(cm.self_id(), 0);
assert_eq!(cm.current_term(), 0);
assert_eq!(cm.current_role().await, MemberRole::Follower);
assert_eq!(cm.leader_id().await, None);
assert_eq!(cm.member_count(), 0);
}
#[tokio::test]
async fn test_add_and_remove_members() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
cm.add_member(make_member(1, MemberRole::Follower, 90))
.await;
cm.add_member(make_member(2, MemberRole::Follower, 80))
.await;
assert_eq!(cm.member_count(), 3);
assert_eq!(cm.healthy_members().len(), 3);
let dist = cm.registry().partition_distribution();
assert_eq!(dist.len(), 3);
let removed = cm.remove_member(2).await;
assert!(removed.is_some());
assert_eq!(cm.member_count(), 2);
}
#[tokio::test]
async fn test_heartbeat_update() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(1, MemberRole::Follower, 50))
.await;
cm.update_member_heartbeat(1, 100, true);
let member = cm.get_member(1).unwrap();
assert_eq!(member.last_wal_offset, 100);
assert!(member.last_heartbeat_ms > 0);
cm.update_member_heartbeat(1, 100, false);
let member = cm.get_member(1).unwrap();
assert!(!member.healthy);
}
#[tokio::test]
async fn test_deterministic_leader_selection() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 100))
.await;
cm.add_member(make_member(1, MemberRole::Follower, 200))
.await;
cm.add_member(make_member(2, MemberRole::Follower, 150))
.await;
let candidate = cm.select_leader_candidate();
assert_eq!(candidate, Some(1));
}
#[tokio::test]
async fn test_deterministic_leader_selection_tiebreak() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 100))
.await;
cm.add_member(make_member(1, MemberRole::Follower, 100))
.await;
cm.add_member(make_member(2, MemberRole::Follower, 100))
.await;
let candidate = cm.select_leader_candidate();
assert_eq!(candidate, Some(0));
}
#[tokio::test]
async fn test_leader_selection_skips_unhealthy() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 200))
.await;
cm.add_member(make_member(1, MemberRole::Follower, 100))
.await;
cm.update_member_heartbeat(0, 200, false);
let candidate = cm.select_leader_candidate();
assert_eq!(candidate, Some(1));
}
#[tokio::test]
async fn test_vote_request_grants_vote() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 50))
.await;
let request = VoteRequest {
term: 1,
candidate_id: 1,
last_wal_offset: 100,
};
let response = cm.handle_vote_request(&request).await;
assert!(response.vote_granted);
assert_eq!(response.term, 1);
}
#[tokio::test]
async fn test_vote_request_rejects_stale_term() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 50))
.await;
cm.term.store(5, Ordering::SeqCst);
let request = VoteRequest {
term: 3, candidate_id: 1,
last_wal_offset: 100,
};
let response = cm.handle_vote_request(&request).await;
assert!(!response.vote_granted);
assert_eq!(response.term, 5);
}
#[tokio::test]
async fn test_vote_request_rejects_duplicate_vote() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 50))
.await;
let request1 = VoteRequest {
term: 1,
candidate_id: 1,
last_wal_offset: 100,
};
let response1 = cm.handle_vote_request(&request1).await;
assert!(response1.vote_granted);
let request2 = VoteRequest {
term: 1,
candidate_id: 2,
last_wal_offset: 100,
};
let response2 = cm.handle_vote_request(&request2).await;
assert!(!response2.vote_granted);
}
#[tokio::test]
async fn test_start_election() {
let cm = ClusterManager::new(0, 32);
let new_term = cm.start_election().await;
assert_eq!(new_term, 1);
assert_eq!(cm.current_term(), 1);
assert_eq!(cm.current_role().await, MemberRole::Candidate);
}
#[tokio::test]
async fn test_become_leader() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 100))
.await;
let term = cm.start_election().await;
cm.become_leader(term).await;
assert_eq!(cm.current_role().await, MemberRole::Leader);
assert_eq!(cm.leader_id().await, Some(0));
}
#[tokio::test]
async fn test_accept_leader() {
let cm = ClusterManager::new(1, 32);
cm.add_member(make_member(0, MemberRole::Follower, 100))
.await;
cm.add_member(make_member(1, MemberRole::Follower, 90))
.await;
cm.accept_leader(0, 1).await;
assert_eq!(cm.current_role().await, MemberRole::Follower);
assert_eq!(cm.leader_id().await, Some(0));
assert_eq!(cm.current_term(), 1);
let leader = cm.get_member(0).unwrap();
assert_eq!(leader.role, MemberRole::Leader);
}
#[tokio::test]
async fn test_stale_become_leader_ignored() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Follower, 100))
.await;
let _term = cm.start_election().await;
cm.accept_leader(1, 2).await;
cm.become_leader(1).await;
assert_eq!(cm.current_role().await, MemberRole::Follower);
assert_eq!(cm.leader_id().await, Some(1));
}
#[tokio::test]
async fn test_cluster_status() {
let cm = ClusterManager::new(0, 32);
cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
cm.add_member(make_member(1, MemberRole::Follower, 90))
.await;
let status = cm.status().await;
assert_eq!(status.self_id, 0);
assert_eq!(status.member_count, 2);
assert_eq!(status.healthy_count, 2);
assert_eq!(status.members.len(), 2);
}
#[tokio::test]
async fn test_remove_leader_clears_leader_id() {
let cm = ClusterManager::new(1, 32);
cm.add_member(make_member(0, MemberRole::Leader, 100)).await;
cm.add_member(make_member(1, MemberRole::Follower, 90))
.await;
*cm.leader_id.write().await = Some(0);
cm.remove_member(0).await;
assert_eq!(cm.leader_id().await, None);
}
}