use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ElectionConfig {
pub lease_duration_ms: u64,
pub renewal_interval_ms: u64,
pub election_timeout_ms: u64,
pub election_backoff_ms: u64,
pub auto_elect: bool,
pub priority: u32,
}
impl Default for ElectionConfig {
fn default() -> Self {
Self {
lease_duration_ms: 15000, renewal_interval_ms: 5000, election_timeout_ms: 10000, election_backoff_ms: 1000, auto_elect: true,
priority: 100,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ElectionState {
NoLeader,
Electing,
Leader,
Follower,
LeaderExpired,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeaderInfo {
pub leader_id: String,
pub fencing_token: u64,
pub lease_acquired_at: u64,
pub lease_expires_at: u64,
pub term: u64,
}
impl LeaderInfo {
pub fn is_valid(&self) -> bool {
current_time_ms() < self.lease_expires_at
}
pub fn time_remaining_ms(&self) -> u64 {
let now = current_time_ms();
self.lease_expires_at.saturating_sub(now)
}
}
#[derive(Debug, Clone)]
pub enum ElectionEvent {
BecameLeader { term: u64, fencing_token: u64 },
LostLeadership {
term: u64,
new_leader: Option<String>,
},
NewLeader { leader_id: String, term: u64 },
LeaseRenewed { expires_at: u64 },
ElectionStarted { term: u64 },
ElectionFailed { term: u64, reason: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Vote {
pub voter_id: String,
pub candidate_id: String,
pub term: u64,
pub granted: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VoteRequest {
pub candidate_id: String,
pub term: u64,
pub priority: u32,
pub last_fencing_token: u64,
}
struct ElectionInner {
state: ElectionState,
leader: Option<LeaderInfo>,
current_term: u64,
voted_for: Option<String>,
votes_received: HashMap<String, bool>,
last_activity: u64,
}
pub struct LeaderElection {
node_id: String,
group_id: String,
config: ElectionConfig,
inner: Arc<RwLock<ElectionInner>>,
fencing_token: AtomicU64,
_running: AtomicBool,
event_tx: Option<mpsc::Sender<ElectionEvent>>,
members: Arc<RwLock<HashMap<String, MemberInfo>>>,
}
#[derive(Debug, Clone)]
struct MemberInfo {
_node_id: String,
priority: u32,
is_alive: bool,
last_seen: u64,
}
impl LeaderElection {
pub fn new(
node_id: String,
group_id: String,
config: ElectionConfig,
event_tx: Option<mpsc::Sender<ElectionEvent>>,
) -> Self {
Self {
node_id,
group_id,
config,
inner: Arc::new(RwLock::new(ElectionInner {
state: ElectionState::NoLeader,
leader: None,
current_term: 0,
voted_for: None,
votes_received: HashMap::new(),
last_activity: current_time_ms(),
})),
fencing_token: AtomicU64::new(0),
_running: AtomicBool::new(false),
event_tx,
members: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn register_member(&self, node_id: String, priority: u32) {
self.members.write().insert(
node_id.clone(),
MemberInfo {
_node_id: node_id,
priority,
is_alive: true,
last_seen: current_time_ms(),
},
);
}
pub fn update_member_liveness(&self, node_id: &str, is_alive: bool) {
if let Some(member) = self.members.write().get_mut(node_id) {
member.is_alive = is_alive;
member.last_seen = current_time_ms();
}
}
pub fn remove_member(&self, node_id: &str) {
self.members.write().remove(node_id);
}
pub fn state(&self) -> ElectionState {
self.inner.read().state
}
pub fn is_leader(&self) -> bool {
let inner = self.inner.read();
inner.state == ElectionState::Leader
&& inner.leader.as_ref().map(|l| l.is_valid()).unwrap_or(false)
}
pub fn leader(&self) -> Option<LeaderInfo> {
let inner = self.inner.read();
inner.leader.clone().filter(|l| l.is_valid())
}
pub fn current_term(&self) -> u64 {
self.inner.read().current_term
}
pub fn fencing_token(&self) -> u64 {
self.fencing_token.load(Ordering::SeqCst)
}
pub async fn start_election(&self) -> Result<bool, ElectionError> {
let term = {
let mut inner = self.inner.write();
inner.current_term += 1;
let term = inner.current_term;
inner.state = ElectionState::Electing;
inner.voted_for = Some(self.node_id.clone());
inner.votes_received.clear();
inner.votes_received.insert(self.node_id.clone(), true); inner.last_activity = current_time_ms();
term
};
info!(
node_id = %self.node_id,
group_id = %self.group_id,
term = term,
"Starting election"
);
self.emit_event(ElectionEvent::ElectionStarted { term })
.await;
let _vote_request = VoteRequest {
candidate_id: self.node_id.clone(),
term,
priority: self.config.priority,
last_fencing_token: self.fencing_token.load(Ordering::SeqCst),
};
let quorum = {
let members = self.members.read();
let alive_members: Vec<_> = members.values().filter(|m| m.is_alive).collect();
let total_members = alive_members.len() + 1; (total_members / 2) + 1
};
let votes = 1;
if votes >= quorum {
return self.become_leader(term).await;
}
let should_become_leader = self.should_become_leader();
if should_become_leader {
self.become_leader(term).await
} else {
{
let mut inner = self.inner.write();
inner.state = ElectionState::Follower;
}
self.emit_event(ElectionEvent::ElectionFailed {
term,
reason: "Did not receive quorum".to_string(),
})
.await;
Ok(false)
}
}
fn should_become_leader(&self) -> bool {
let members = self.members.read();
let my_priority = self.config.priority;
for member in members.values() {
if member.is_alive && member.priority > my_priority {
return false;
}
}
true
}
async fn become_leader(&self, term: u64) -> Result<bool, ElectionError> {
let new_token = self.fencing_token.fetch_add(1, Ordering::SeqCst) + 1;
let now = current_time_ms();
let leader_info = LeaderInfo {
leader_id: self.node_id.clone(),
fencing_token: new_token,
lease_acquired_at: now,
lease_expires_at: now + self.config.lease_duration_ms,
term,
};
{
let mut inner = self.inner.write();
inner.state = ElectionState::Leader;
inner.leader = Some(leader_info.clone());
inner.last_activity = now;
}
info!(
node_id = %self.node_id,
group_id = %self.group_id,
term = term,
fencing_token = new_token,
"Became leader"
);
self.emit_event(ElectionEvent::BecameLeader {
term,
fencing_token: new_token,
})
.await;
Ok(true)
}
pub fn handle_vote_request(&self, request: &VoteRequest) -> Vote {
let mut inner = self.inner.write();
if request.term > inner.current_term {
inner.current_term = request.term;
inner.voted_for = None;
inner.state = ElectionState::Follower;
}
let granted = if request.term < inner.current_term {
false
} else if inner.voted_for.is_some()
&& inner.voted_for.as_ref() != Some(&request.candidate_id)
{
false
} else {
inner.voted_for = Some(request.candidate_id.clone());
inner.last_activity = current_time_ms();
true
};
debug!(
node_id = %self.node_id,
candidate = %request.candidate_id,
term = request.term,
granted = granted,
"Processed vote request"
);
Vote {
voter_id: self.node_id.clone(),
candidate_id: request.candidate_id.clone(),
term: request.term,
granted,
}
}
pub async fn handle_vote(&self, vote: Vote) -> Result<bool, ElectionError> {
let (granted_votes, quorum) = {
let mut inner = self.inner.write();
if vote.term != inner.current_term {
return Ok(false);
}
if inner.state != ElectionState::Electing {
return Ok(false);
}
inner
.votes_received
.insert(vote.voter_id.clone(), vote.granted);
let granted_votes = inner.votes_received.values().filter(|&&v| v).count();
let total_members = self.members.read().len() + 1; let quorum = (total_members / 2) + 1;
(granted_votes, quorum)
};
if granted_votes >= quorum {
let term = self.inner.read().current_term;
return self.become_leader(term).await;
}
Ok(false)
}
pub async fn accept_leader(&self, leader_info: LeaderInfo) -> Result<(), ElectionError> {
{
let mut inner = self.inner.write();
if leader_info.term < inner.current_term {
return Err(ElectionError::StaleTerm {
received: leader_info.term,
current: inner.current_term,
});
}
inner.current_term = leader_info.term;
inner.state = ElectionState::Follower;
inner.leader = Some(leader_info.clone());
inner.voted_for = None;
inner.last_activity = current_time_ms();
let current_token = self.fencing_token.load(Ordering::SeqCst);
if leader_info.fencing_token > current_token {
self.fencing_token
.store(leader_info.fencing_token, Ordering::SeqCst);
}
}
info!(
node_id = %self.node_id,
leader = %leader_info.leader_id,
term = leader_info.term,
"Accepted new leader"
);
self.emit_event(ElectionEvent::NewLeader {
leader_id: leader_info.leader_id,
term: leader_info.term,
})
.await;
Ok(())
}
pub async fn renew_lease(&self) -> Result<(), ElectionError> {
let expires_at = {
let mut inner = self.inner.write();
if inner.state != ElectionState::Leader {
return Err(ElectionError::NotLeader);
}
let now = current_time_ms();
let expires_at = if let Some(ref mut leader) = inner.leader {
leader.lease_acquired_at = now;
leader.lease_expires_at = now + self.config.lease_duration_ms;
debug!(
node_id = %self.node_id,
expires_at = leader.lease_expires_at,
"Renewed leadership lease"
);
Some(leader.lease_expires_at)
} else {
None
};
inner.last_activity = now;
expires_at
};
if let Some(expires_at) = expires_at {
self.emit_event(ElectionEvent::LeaseRenewed { expires_at })
.await;
}
Ok(())
}
pub async fn step_down(&self) -> Result<(), ElectionError> {
let term = {
let mut inner = self.inner.write();
if inner.state != ElectionState::Leader {
return Err(ElectionError::NotLeader);
}
let term = inner.current_term;
inner.state = ElectionState::Follower;
inner.leader = None;
inner.last_activity = current_time_ms();
term
};
info!(
node_id = %self.node_id,
term = term,
"Stepped down from leadership"
);
self.emit_event(ElectionEvent::LostLeadership {
term,
new_leader: None,
})
.await;
Ok(())
}
pub fn needs_election(&self) -> bool {
let inner = self.inner.read();
match inner.state {
ElectionState::NoLeader => true,
ElectionState::LeaderExpired => true,
ElectionState::Follower => {
inner.leader.as_ref().map(|l| !l.is_valid()).unwrap_or(true)
}
_ => false,
}
}
pub async fn check_lease(&self) {
let lost_leadership_term = {
let mut inner = self.inner.write();
if let Some(ref leader) = inner.leader {
if !leader.is_valid() {
let was_leader = inner.state == ElectionState::Leader;
inner.state = ElectionState::LeaderExpired;
if was_leader {
Some(inner.current_term)
} else {
None
}
} else {
None
}
} else {
None
}
};
if let Some(term) = lost_leadership_term {
warn!(
node_id = %self.node_id,
"Leadership lease expired"
);
self.emit_event(ElectionEvent::LostLeadership {
term,
new_leader: None,
})
.await;
}
}
pub fn validate_fencing_token(&self, token: u64) -> bool {
let current = self.fencing_token.load(Ordering::SeqCst);
token >= current
}
pub fn stats(&self) -> ElectionStats {
let inner = self.inner.read();
let members = self.members.read();
ElectionStats {
state: inner.state,
current_term: inner.current_term,
leader_id: inner.leader.as_ref().map(|l| l.leader_id.clone()),
fencing_token: self.fencing_token.load(Ordering::SeqCst),
lease_remaining_ms: inner.leader.as_ref().map(|l| l.time_remaining_ms()),
member_count: members.len() + 1,
alive_members: members.values().filter(|m| m.is_alive).count() + 1,
}
}
async fn emit_event(&self, event: ElectionEvent) {
if let Some(ref tx) = self.event_tx {
let _ = tx.send(event).await;
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ElectionStats {
pub state: ElectionState,
pub current_term: u64,
pub leader_id: Option<String>,
pub fencing_token: u64,
pub lease_remaining_ms: Option<u64>,
pub member_count: usize,
pub alive_members: usize,
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum ElectionError {
#[error("Not the leader")]
NotLeader,
#[error("Election already in progress")]
ElectionInProgress,
#[error("Stale term: received {received}, current {current}")]
StaleTerm { received: u64, current: u64 },
#[error("Invalid fencing token: {received} < {current}")]
InvalidFencingToken { received: u64, current: u64 },
#[error("No quorum available")]
NoQuorum,
#[error("Election timeout")]
Timeout,
}
fn current_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
pub struct ElectionManager {
node_id: String,
default_config: ElectionConfig,
elections: Arc<RwLock<HashMap<String, Arc<LeaderElection>>>>,
}
impl ElectionManager {
pub fn new(node_id: String, config: ElectionConfig) -> Self {
Self {
node_id,
default_config: config,
elections: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn get_or_create_election(
&self,
group_id: &str,
event_tx: Option<mpsc::Sender<ElectionEvent>>,
) -> Arc<LeaderElection> {
let mut elections = self.elections.write();
if let Some(election) = elections.get(group_id) {
return election.clone();
}
let election = Arc::new(LeaderElection::new(
self.node_id.clone(),
group_id.to_string(),
self.default_config.clone(),
event_tx,
));
elections.insert(group_id.to_string(), election.clone());
election
}
pub fn get_election(&self, group_id: &str) -> Option<Arc<LeaderElection>> {
self.elections.read().get(group_id).cloned()
}
pub fn is_leader(&self, group_id: &str) -> bool {
self.elections
.read()
.get(group_id)
.map(|e| e.is_leader())
.unwrap_or(false)
}
pub fn fencing_token(&self, group_id: &str) -> Option<u64> {
self.elections
.read()
.get(group_id)
.map(|e| e.fencing_token())
}
pub fn led_groups(&self) -> Vec<String> {
self.elections
.read()
.iter()
.filter(|(_, e)| e.is_leader())
.map(|(id, _)| id.clone())
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn create_test_election(node_id: &str) -> LeaderElection {
let config = ElectionConfig::default();
LeaderElection::new(node_id.to_string(), "test-group".to_string(), config, None)
}
#[tokio::test]
async fn test_single_node_election() {
let election = create_test_election("node-1");
let result = election.start_election().await;
assert!(result.is_ok());
assert!(result.unwrap());
assert!(election.is_leader());
assert_eq!(election.state(), ElectionState::Leader);
}
#[tokio::test]
async fn test_leader_info() {
let election = create_test_election("node-1");
election.start_election().await.unwrap();
let leader = election.leader();
assert!(leader.is_some());
let leader_info = leader.unwrap();
assert_eq!(leader_info.leader_id, "node-1");
assert!(leader_info.is_valid());
assert!(leader_info.fencing_token > 0);
}
#[tokio::test]
async fn test_fencing_token_increases() {
let election = create_test_election("node-1");
let token1 = election.fencing_token();
election.start_election().await.unwrap();
let token2 = election.fencing_token();
assert!(token2 > token1);
election.step_down().await.unwrap();
election.start_election().await.unwrap();
let token3 = election.fencing_token();
assert!(token3 > token2);
}
#[tokio::test]
async fn test_lease_renewal() {
let election = create_test_election("node-1");
election.start_election().await.unwrap();
let leader1 = election.leader().unwrap();
let expires1 = leader1.lease_expires_at;
tokio::time::sleep(Duration::from_millis(10)).await;
election.renew_lease().await.unwrap();
let leader2 = election.leader().unwrap();
assert!(leader2.lease_expires_at >= expires1);
}
#[tokio::test]
async fn test_step_down() {
let election = create_test_election("node-1");
election.start_election().await.unwrap();
assert!(election.is_leader());
election.step_down().await.unwrap();
assert!(!election.is_leader());
assert_eq!(election.state(), ElectionState::Follower);
}
#[test]
fn test_vote_request_handling() {
let election = create_test_election("node-1");
let request = VoteRequest {
candidate_id: "node-2".to_string(),
term: 1,
priority: 100,
last_fencing_token: 0,
};
let vote = election.handle_vote_request(&request);
assert!(vote.granted);
assert_eq!(vote.voter_id, "node-1");
assert_eq!(vote.candidate_id, "node-2");
let request2 = VoteRequest {
candidate_id: "node-3".to_string(),
term: 1,
priority: 100,
last_fencing_token: 0,
};
let vote2 = election.handle_vote_request(&request2);
assert!(!vote2.granted); }
#[test]
fn test_vote_request_higher_term() {
let election = create_test_election("node-1");
let request1 = VoteRequest {
candidate_id: "node-2".to_string(),
term: 1,
priority: 100,
last_fencing_token: 0,
};
election.handle_vote_request(&request1);
let request2 = VoteRequest {
candidate_id: "node-3".to_string(),
term: 2,
priority: 100,
last_fencing_token: 0,
};
let vote = election.handle_vote_request(&request2);
assert!(vote.granted);
}
#[tokio::test]
async fn test_accept_leader() {
let election = create_test_election("node-1");
let leader_info = LeaderInfo {
leader_id: "node-2".to_string(),
fencing_token: 5,
lease_acquired_at: current_time_ms(),
lease_expires_at: current_time_ms() + 15000,
term: 1,
};
election.accept_leader(leader_info.clone()).await.unwrap();
assert_eq!(election.state(), ElectionState::Follower);
assert!(!election.is_leader());
let leader = election.leader().unwrap();
assert_eq!(leader.leader_id, "node-2");
}
#[test]
fn test_validate_fencing_token() {
let election = create_test_election("node-1");
assert!(election.validate_fencing_token(0));
assert!(election.validate_fencing_token(100));
}
#[tokio::test]
async fn test_needs_election() {
let election = create_test_election("node-1");
assert!(election.needs_election());
election.start_election().await.unwrap();
assert!(!election.needs_election()); }
#[test]
fn test_election_stats() {
let election = create_test_election("node-1");
election.register_member("node-2".to_string(), 100);
election.register_member("node-3".to_string(), 100);
let stats = election.stats();
assert_eq!(stats.state, ElectionState::NoLeader);
assert_eq!(stats.member_count, 3); assert_eq!(stats.alive_members, 3);
}
#[test]
fn test_member_management() {
let election = create_test_election("node-1");
election.register_member("node-2".to_string(), 100);
let stats = election.stats();
assert_eq!(stats.member_count, 2);
election.update_member_liveness("node-2", false);
let stats = election.stats();
assert_eq!(stats.alive_members, 1);
election.remove_member("node-2");
let stats = election.stats();
assert_eq!(stats.member_count, 1);
}
#[tokio::test]
async fn test_election_manager() {
let manager = ElectionManager::new("node-1".to_string(), ElectionConfig::default());
let election1 = manager.get_or_create_election("group-1", None);
let election2 = manager.get_or_create_election("group-1", None);
assert!(Arc::ptr_eq(&election1, &election2));
let election3 = manager.get_or_create_election("group-2", None);
assert!(!Arc::ptr_eq(&election1, &election3));
election1.start_election().await.unwrap();
assert!(manager.is_leader("group-1"));
assert!(!manager.is_leader("group-2"));
let led = manager.led_groups();
assert_eq!(led.len(), 1);
assert_eq!(led[0], "group-1");
}
#[tokio::test]
async fn test_priority_based_election() {
let config = ElectionConfig {
priority: 50,
..Default::default()
};
let election =
LeaderElection::new("node-1".to_string(), "test-group".to_string(), config, None);
election.register_member("node-2".to_string(), 100);
election.update_member_liveness("node-2", true);
}
}