Skip to main content

engine/distributed/
election.rs

1//! Leader Election for Distributed Write Coordination
2//!
3//! Implements leader election using a lease-based approach with:
4//! - Lease acquisition and renewal
5//! - Automatic failover on leader failure
6//! - Fencing tokens to prevent split-brain
7//! - Integration with gossip for failure detection
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::mpsc;
15use tracing::{debug, info, warn};
16
17/// Configuration for leader election
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ElectionConfig {
20    /// Lease duration in milliseconds
21    pub lease_duration_ms: u64,
22    /// How often to renew the lease (ms before expiry)
23    pub renewal_interval_ms: u64,
24    /// Timeout for election attempts (ms)
25    pub election_timeout_ms: u64,
26    /// Minimum time between election attempts (ms)
27    pub election_backoff_ms: u64,
28    /// Enable automatic leader election
29    pub auto_elect: bool,
30    /// Priority boost for certain nodes (higher = more likely to become leader)
31    pub priority: u32,
32}
33
34impl Default for ElectionConfig {
35    fn default() -> Self {
36        Self {
37            lease_duration_ms: 15000,   // 15 second lease
38            renewal_interval_ms: 5000,  // Renew 5 seconds before expiry
39            election_timeout_ms: 10000, // 10 second election timeout
40            election_backoff_ms: 1000,  // 1 second between attempts
41            auto_elect: true,
42            priority: 100,
43        }
44    }
45}
46
47/// State of a leader election
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ElectionState {
50    /// No leader elected yet
51    NoLeader,
52    /// Election in progress
53    Electing,
54    /// This node is the leader
55    Leader,
56    /// Another node is the leader
57    Follower,
58    /// Leader lease expired, election needed
59    LeaderExpired,
60}
61
62/// Information about the current leader
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct LeaderInfo {
65    /// Node ID of the leader
66    pub leader_id: String,
67    /// Fencing token (monotonically increasing)
68    pub fencing_token: u64,
69    /// When the lease was acquired
70    pub lease_acquired_at: u64,
71    /// When the lease expires
72    pub lease_expires_at: u64,
73    /// Term number (increments with each election)
74    pub term: u64,
75}
76
77impl LeaderInfo {
78    /// Check if the lease is still valid
79    pub fn is_valid(&self) -> bool {
80        current_time_ms() < self.lease_expires_at
81    }
82
83    /// Time remaining on lease in milliseconds
84    pub fn time_remaining_ms(&self) -> u64 {
85        let now = current_time_ms();
86        self.lease_expires_at.saturating_sub(now)
87    }
88}
89
90/// Events emitted by the election system
91#[derive(Debug, Clone)]
92pub enum ElectionEvent {
93    /// This node became the leader
94    BecameLeader { term: u64, fencing_token: u64 },
95    /// This node lost leadership
96    LostLeadership {
97        term: u64,
98        new_leader: Option<String>,
99    },
100    /// A new leader was elected (for followers)
101    NewLeader { leader_id: String, term: u64 },
102    /// Leader lease renewed
103    LeaseRenewed { expires_at: u64 },
104    /// Election started
105    ElectionStarted { term: u64 },
106    /// Election failed
107    ElectionFailed { term: u64, reason: String },
108}
109
110/// Vote in an election
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct Vote {
113    /// Node casting the vote
114    pub voter_id: String,
115    /// Candidate being voted for
116    pub candidate_id: String,
117    /// Term of the election
118    pub term: u64,
119    /// Whether the vote is granted
120    pub granted: bool,
121}
122
123/// Request for votes in an election
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct VoteRequest {
126    /// Candidate requesting votes
127    pub candidate_id: String,
128    /// Term for this election
129    pub term: u64,
130    /// Priority of the candidate
131    pub priority: u32,
132    /// Last known fencing token
133    pub last_fencing_token: u64,
134}
135
136/// Internal election state
137struct ElectionInner {
138    /// Current election state
139    state: ElectionState,
140    /// Current leader info (if known)
141    leader: Option<LeaderInfo>,
142    /// Current term
143    current_term: u64,
144    /// Who we voted for in current term
145    voted_for: Option<String>,
146    /// Votes received (when candidate)
147    votes_received: HashMap<String, bool>,
148    /// Last activity timestamp
149    last_activity: u64,
150}
151
152/// Leader election coordinator
153pub struct LeaderElection {
154    /// This node's ID
155    node_id: String,
156    /// Election group/scope (e.g., shard ID)
157    group_id: String,
158    /// Configuration
159    config: ElectionConfig,
160    /// Internal state
161    inner: Arc<RwLock<ElectionInner>>,
162    /// Global fencing token counter
163    fencing_token: AtomicU64,
164    /// Whether currently running (reserved for background election loop)
165    _running: AtomicBool,
166    /// Event sender
167    event_tx: Option<mpsc::Sender<ElectionEvent>>,
168    /// Known cluster members
169    members: Arc<RwLock<HashMap<String, MemberInfo>>>,
170}
171
172/// Information about a cluster member for election purposes
173#[derive(Debug, Clone)]
174struct MemberInfo {
175    _node_id: String,
176    priority: u32,
177    is_alive: bool,
178    last_seen: u64,
179}
180
181impl LeaderElection {
182    /// Create a new leader election coordinator
183    pub fn new(
184        node_id: String,
185        group_id: String,
186        config: ElectionConfig,
187        event_tx: Option<mpsc::Sender<ElectionEvent>>,
188    ) -> Self {
189        Self {
190            node_id,
191            group_id,
192            config,
193            inner: Arc::new(RwLock::new(ElectionInner {
194                state: ElectionState::NoLeader,
195                leader: None,
196                current_term: 0,
197                voted_for: None,
198                votes_received: HashMap::new(),
199                last_activity: current_time_ms(),
200            })),
201            fencing_token: AtomicU64::new(0),
202            _running: AtomicBool::new(false),
203            event_tx,
204            members: Arc::new(RwLock::new(HashMap::new())),
205        }
206    }
207
208    /// Register a cluster member
209    pub fn register_member(&self, node_id: String, priority: u32) {
210        self.members.write().insert(
211            node_id.clone(),
212            MemberInfo {
213                _node_id: node_id,
214                priority,
215                is_alive: true,
216                last_seen: current_time_ms(),
217            },
218        );
219    }
220
221    /// Update member liveness
222    pub fn update_member_liveness(&self, node_id: &str, is_alive: bool) {
223        if let Some(member) = self.members.write().get_mut(node_id) {
224            member.is_alive = is_alive;
225            member.last_seen = current_time_ms();
226        }
227    }
228
229    /// Remove a member
230    pub fn remove_member(&self, node_id: &str) {
231        self.members.write().remove(node_id);
232    }
233
234    /// Get current election state
235    pub fn state(&self) -> ElectionState {
236        self.inner.read().state
237    }
238
239    /// Check if this node is the leader
240    pub fn is_leader(&self) -> bool {
241        let inner = self.inner.read();
242        inner.state == ElectionState::Leader
243            && inner.leader.as_ref().map(|l| l.is_valid()).unwrap_or(false)
244    }
245
246    /// Get current leader info
247    pub fn leader(&self) -> Option<LeaderInfo> {
248        let inner = self.inner.read();
249        inner.leader.clone().filter(|l| l.is_valid())
250    }
251
252    /// Get current term
253    pub fn current_term(&self) -> u64 {
254        self.inner.read().current_term
255    }
256
257    /// Get fencing token (for write operations)
258    pub fn fencing_token(&self) -> u64 {
259        self.fencing_token.load(Ordering::SeqCst)
260    }
261
262    /// Start an election
263    pub async fn start_election(&self) -> Result<bool, ElectionError> {
264        let term = {
265            let mut inner = self.inner.write();
266
267            // Increment term
268            inner.current_term += 1;
269            let term = inner.current_term;
270            inner.state = ElectionState::Electing;
271            inner.voted_for = Some(self.node_id.clone());
272            inner.votes_received.clear();
273            inner.votes_received.insert(self.node_id.clone(), true); // Vote for self
274            inner.last_activity = current_time_ms();
275            term
276        };
277
278        info!(
279            node_id = %self.node_id,
280            group_id = %self.group_id,
281            term = term,
282            "Starting election"
283        );
284
285        self.emit_event(ElectionEvent::ElectionStarted { term })
286            .await;
287
288        // Request votes from all members
289        let _vote_request = VoteRequest {
290            candidate_id: self.node_id.clone(),
291            term,
292            priority: self.config.priority,
293            last_fencing_token: self.fencing_token.load(Ordering::SeqCst),
294        };
295
296        // In a real implementation, this would send RPC to other nodes
297        // For now, we simulate immediate success if we're the only live member
298        // or if we have quorum of votes
299
300        let quorum = {
301            let members = self.members.read();
302            let alive_members: Vec<_> = members.values().filter(|m| m.is_alive).collect();
303            let total_members = alive_members.len() + 1; // +1 for self
304            (total_members / 2) + 1
305        };
306
307        // Simulate receiving our own vote
308        let votes = 1; // Self vote
309
310        // Check if we can win with just our vote (single node cluster)
311        if votes >= quorum {
312            return self.become_leader(term).await;
313        }
314
315        // In production, wait for vote responses here
316        // For now, if we're the highest priority live node, become leader
317        let should_become_leader = self.should_become_leader();
318
319        if should_become_leader {
320            self.become_leader(term).await
321        } else {
322            {
323                let mut inner = self.inner.write();
324                inner.state = ElectionState::Follower;
325            }
326            self.emit_event(ElectionEvent::ElectionFailed {
327                term,
328                reason: "Did not receive quorum".to_string(),
329            })
330            .await;
331            Ok(false)
332        }
333    }
334
335    /// Check if this node should become leader based on priority
336    fn should_become_leader(&self) -> bool {
337        let members = self.members.read();
338        let my_priority = self.config.priority;
339
340        // Check if any alive member has higher priority
341        for member in members.values() {
342            if member.is_alive && member.priority > my_priority {
343                return false;
344            }
345        }
346
347        true
348    }
349
350    /// Become the leader
351    async fn become_leader(&self, term: u64) -> Result<bool, ElectionError> {
352        let new_token = self.fencing_token.fetch_add(1, Ordering::SeqCst) + 1;
353        let now = current_time_ms();
354
355        let leader_info = LeaderInfo {
356            leader_id: self.node_id.clone(),
357            fencing_token: new_token,
358            lease_acquired_at: now,
359            lease_expires_at: now + self.config.lease_duration_ms,
360            term,
361        };
362
363        {
364            let mut inner = self.inner.write();
365            inner.state = ElectionState::Leader;
366            inner.leader = Some(leader_info.clone());
367            inner.last_activity = now;
368        }
369
370        info!(
371            node_id = %self.node_id,
372            group_id = %self.group_id,
373            term = term,
374            fencing_token = new_token,
375            "Became leader"
376        );
377
378        self.emit_event(ElectionEvent::BecameLeader {
379            term,
380            fencing_token: new_token,
381        })
382        .await;
383
384        Ok(true)
385    }
386
387    /// Process a vote request from another candidate
388    pub fn handle_vote_request(&self, request: &VoteRequest) -> Vote {
389        let mut inner = self.inner.write();
390
391        // If request is from a higher term, update our term
392        if request.term > inner.current_term {
393            inner.current_term = request.term;
394            inner.voted_for = None;
395            inner.state = ElectionState::Follower;
396        }
397
398        // Decide whether to grant vote
399        let granted = if request.term < inner.current_term {
400            // Old term, reject
401            false
402        } else if inner.voted_for.is_some()
403            && inner.voted_for.as_ref() != Some(&request.candidate_id)
404        {
405            // Already voted for someone else
406            false
407        } else {
408            // Grant vote
409            inner.voted_for = Some(request.candidate_id.clone());
410            inner.last_activity = current_time_ms();
411            true
412        };
413
414        debug!(
415            node_id = %self.node_id,
416            candidate = %request.candidate_id,
417            term = request.term,
418            granted = granted,
419            "Processed vote request"
420        );
421
422        Vote {
423            voter_id: self.node_id.clone(),
424            candidate_id: request.candidate_id.clone(),
425            term: request.term,
426            granted,
427        }
428    }
429
430    /// Process a vote response
431    pub async fn handle_vote(&self, vote: Vote) -> Result<bool, ElectionError> {
432        let (granted_votes, quorum) = {
433            let mut inner = self.inner.write();
434
435            // Ignore votes from old terms
436            if vote.term != inner.current_term {
437                return Ok(false);
438            }
439
440            // Only process if we're still electing
441            if inner.state != ElectionState::Electing {
442                return Ok(false);
443            }
444
445            // Record the vote
446            inner
447                .votes_received
448                .insert(vote.voter_id.clone(), vote.granted);
449
450            // Count votes
451            let granted_votes = inner.votes_received.values().filter(|&&v| v).count();
452            let total_members = self.members.read().len() + 1; // +1 for self
453            let quorum = (total_members / 2) + 1;
454            (granted_votes, quorum)
455        };
456
457        if granted_votes >= quorum {
458            let term = self.inner.read().current_term;
459            return self.become_leader(term).await;
460        }
461
462        Ok(false)
463    }
464
465    /// Accept a new leader (for followers)
466    pub async fn accept_leader(&self, leader_info: LeaderInfo) -> Result<(), ElectionError> {
467        {
468            let mut inner = self.inner.write();
469
470            // Only accept if term is >= our current term
471            if leader_info.term < inner.current_term {
472                return Err(ElectionError::StaleTerm {
473                    received: leader_info.term,
474                    current: inner.current_term,
475                });
476            }
477
478            inner.current_term = leader_info.term;
479            inner.state = ElectionState::Follower;
480            inner.leader = Some(leader_info.clone());
481            inner.voted_for = None;
482            inner.last_activity = current_time_ms();
483
484            // Update fencing token if higher
485            let current_token = self.fencing_token.load(Ordering::SeqCst);
486            if leader_info.fencing_token > current_token {
487                self.fencing_token
488                    .store(leader_info.fencing_token, Ordering::SeqCst);
489            }
490        }
491
492        info!(
493            node_id = %self.node_id,
494            leader = %leader_info.leader_id,
495            term = leader_info.term,
496            "Accepted new leader"
497        );
498
499        self.emit_event(ElectionEvent::NewLeader {
500            leader_id: leader_info.leader_id,
501            term: leader_info.term,
502        })
503        .await;
504
505        Ok(())
506    }
507
508    /// Renew leadership lease (call periodically when leader)
509    pub async fn renew_lease(&self) -> Result<(), ElectionError> {
510        let expires_at = {
511            let mut inner = self.inner.write();
512
513            if inner.state != ElectionState::Leader {
514                return Err(ElectionError::NotLeader);
515            }
516
517            let now = current_time_ms();
518            let expires_at = if let Some(ref mut leader) = inner.leader {
519                leader.lease_acquired_at = now;
520                leader.lease_expires_at = now + self.config.lease_duration_ms;
521
522                debug!(
523                    node_id = %self.node_id,
524                    expires_at = leader.lease_expires_at,
525                    "Renewed leadership lease"
526                );
527
528                Some(leader.lease_expires_at)
529            } else {
530                None
531            };
532
533            inner.last_activity = now;
534            expires_at
535        };
536
537        if let Some(expires_at) = expires_at {
538            self.emit_event(ElectionEvent::LeaseRenewed { expires_at })
539                .await;
540        }
541
542        Ok(())
543    }
544
545    /// Step down from leadership
546    pub async fn step_down(&self) -> Result<(), ElectionError> {
547        let term = {
548            let mut inner = self.inner.write();
549
550            if inner.state != ElectionState::Leader {
551                return Err(ElectionError::NotLeader);
552            }
553
554            let term = inner.current_term;
555            inner.state = ElectionState::Follower;
556            inner.leader = None;
557            inner.last_activity = current_time_ms();
558            term
559        };
560
561        info!(
562            node_id = %self.node_id,
563            term = term,
564            "Stepped down from leadership"
565        );
566
567        self.emit_event(ElectionEvent::LostLeadership {
568            term,
569            new_leader: None,
570        })
571        .await;
572
573        Ok(())
574    }
575
576    /// Check if election is needed (leader lease expired)
577    pub fn needs_election(&self) -> bool {
578        let inner = self.inner.read();
579
580        match inner.state {
581            ElectionState::NoLeader => true,
582            ElectionState::LeaderExpired => true,
583            ElectionState::Follower => {
584                // Check if leader lease expired
585                inner.leader.as_ref().map(|l| !l.is_valid()).unwrap_or(true)
586            }
587            _ => false,
588        }
589    }
590
591    /// Check lease and update state if expired
592    pub async fn check_lease(&self) {
593        let lost_leadership_term = {
594            let mut inner = self.inner.write();
595
596            if let Some(ref leader) = inner.leader {
597                if !leader.is_valid() {
598                    let was_leader = inner.state == ElectionState::Leader;
599                    inner.state = ElectionState::LeaderExpired;
600
601                    if was_leader {
602                        Some(inner.current_term)
603                    } else {
604                        None
605                    }
606                } else {
607                    None
608                }
609            } else {
610                None
611            }
612        };
613
614        if let Some(term) = lost_leadership_term {
615            warn!(
616                node_id = %self.node_id,
617                "Leadership lease expired"
618            );
619
620            self.emit_event(ElectionEvent::LostLeadership {
621                term,
622                new_leader: None,
623            })
624            .await;
625        }
626    }
627
628    /// Validate a fencing token for write operations
629    pub fn validate_fencing_token(&self, token: u64) -> bool {
630        let current = self.fencing_token.load(Ordering::SeqCst);
631        token >= current
632    }
633
634    /// Get election statistics
635    pub fn stats(&self) -> ElectionStats {
636        let inner = self.inner.read();
637        let members = self.members.read();
638
639        ElectionStats {
640            state: inner.state,
641            current_term: inner.current_term,
642            leader_id: inner.leader.as_ref().map(|l| l.leader_id.clone()),
643            fencing_token: self.fencing_token.load(Ordering::SeqCst),
644            lease_remaining_ms: inner.leader.as_ref().map(|l| l.time_remaining_ms()),
645            member_count: members.len() + 1,
646            alive_members: members.values().filter(|m| m.is_alive).count() + 1,
647        }
648    }
649
650    /// Emit an election event
651    async fn emit_event(&self, event: ElectionEvent) {
652        if let Some(ref tx) = self.event_tx {
653            let _ = tx.send(event).await;
654        }
655    }
656}
657
658/// Statistics about election state
659#[derive(Debug, Clone, Serialize, Deserialize)]
660pub struct ElectionStats {
661    pub state: ElectionState,
662    pub current_term: u64,
663    pub leader_id: Option<String>,
664    pub fencing_token: u64,
665    pub lease_remaining_ms: Option<u64>,
666    pub member_count: usize,
667    pub alive_members: usize,
668}
669
670/// Errors that can occur during election
671#[derive(Debug, Clone, thiserror::Error)]
672pub enum ElectionError {
673    #[error("Not the leader")]
674    NotLeader,
675
676    #[error("Election already in progress")]
677    ElectionInProgress,
678
679    #[error("Stale term: received {received}, current {current}")]
680    StaleTerm { received: u64, current: u64 },
681
682    #[error("Invalid fencing token: {received} < {current}")]
683    InvalidFencingToken { received: u64, current: u64 },
684
685    #[error("No quorum available")]
686    NoQuorum,
687
688    #[error("Election timeout")]
689    Timeout,
690}
691
692fn current_time_ms() -> u64 {
693    std::time::SystemTime::now()
694        .duration_since(std::time::UNIX_EPOCH)
695        .unwrap_or_default()
696        .as_millis() as u64
697}
698
699/// Leader election manager for multiple groups
700pub struct ElectionManager {
701    /// Node ID
702    node_id: String,
703    /// Default configuration
704    default_config: ElectionConfig,
705    /// Elections by group ID
706    elections: Arc<RwLock<HashMap<String, Arc<LeaderElection>>>>,
707}
708
709impl ElectionManager {
710    /// Create a new election manager
711    pub fn new(node_id: String, config: ElectionConfig) -> Self {
712        Self {
713            node_id,
714            default_config: config,
715            elections: Arc::new(RwLock::new(HashMap::new())),
716        }
717    }
718
719    /// Get or create election for a group
720    pub fn get_or_create_election(
721        &self,
722        group_id: &str,
723        event_tx: Option<mpsc::Sender<ElectionEvent>>,
724    ) -> Arc<LeaderElection> {
725        let mut elections = self.elections.write();
726
727        if let Some(election) = elections.get(group_id) {
728            return election.clone();
729        }
730
731        let election = Arc::new(LeaderElection::new(
732            self.node_id.clone(),
733            group_id.to_string(),
734            self.default_config.clone(),
735            event_tx,
736        ));
737
738        elections.insert(group_id.to_string(), election.clone());
739        election
740    }
741
742    /// Get election for a group
743    pub fn get_election(&self, group_id: &str) -> Option<Arc<LeaderElection>> {
744        self.elections.read().get(group_id).cloned()
745    }
746
747    /// Check if this node is leader for a group
748    pub fn is_leader(&self, group_id: &str) -> bool {
749        self.elections
750            .read()
751            .get(group_id)
752            .map(|e| e.is_leader())
753            .unwrap_or(false)
754    }
755
756    /// Get fencing token for a group
757    pub fn fencing_token(&self, group_id: &str) -> Option<u64> {
758        self.elections
759            .read()
760            .get(group_id)
761            .map(|e| e.fencing_token())
762    }
763
764    /// Get all groups where this node is leader
765    pub fn led_groups(&self) -> Vec<String> {
766        self.elections
767            .read()
768            .iter()
769            .filter(|(_, e)| e.is_leader())
770            .map(|(id, _)| id.clone())
771            .collect()
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    use std::time::Duration;
779
780    fn create_test_election(node_id: &str) -> LeaderElection {
781        let config = ElectionConfig::default();
782        LeaderElection::new(node_id.to_string(), "test-group".to_string(), config, None)
783    }
784
785    #[tokio::test]
786    async fn test_single_node_election() {
787        let election = create_test_election("node-1");
788
789        // Single node should win election immediately
790        let result = election.start_election().await;
791        assert!(result.is_ok());
792        assert!(result.unwrap());
793        assert!(election.is_leader());
794        assert_eq!(election.state(), ElectionState::Leader);
795    }
796
797    #[tokio::test]
798    async fn test_leader_info() {
799        let election = create_test_election("node-1");
800        election.start_election().await.unwrap();
801
802        let leader = election.leader();
803        assert!(leader.is_some());
804
805        let leader_info = leader.unwrap();
806        assert_eq!(leader_info.leader_id, "node-1");
807        assert!(leader_info.is_valid());
808        assert!(leader_info.fencing_token > 0);
809    }
810
811    #[tokio::test]
812    async fn test_fencing_token_increases() {
813        let election = create_test_election("node-1");
814
815        let token1 = election.fencing_token();
816        election.start_election().await.unwrap();
817        let token2 = election.fencing_token();
818
819        assert!(token2 > token1);
820
821        // Step down and re-elect
822        election.step_down().await.unwrap();
823        election.start_election().await.unwrap();
824        let token3 = election.fencing_token();
825
826        assert!(token3 > token2);
827    }
828
829    #[tokio::test]
830    async fn test_lease_renewal() {
831        let election = create_test_election("node-1");
832        election.start_election().await.unwrap();
833
834        let leader1 = election.leader().unwrap();
835        let expires1 = leader1.lease_expires_at;
836
837        // Wait a bit and renew
838        tokio::time::sleep(Duration::from_millis(10)).await;
839        election.renew_lease().await.unwrap();
840
841        let leader2 = election.leader().unwrap();
842        assert!(leader2.lease_expires_at >= expires1);
843    }
844
845    #[tokio::test]
846    async fn test_step_down() {
847        let election = create_test_election("node-1");
848        election.start_election().await.unwrap();
849        assert!(election.is_leader());
850
851        election.step_down().await.unwrap();
852        assert!(!election.is_leader());
853        assert_eq!(election.state(), ElectionState::Follower);
854    }
855
856    #[test]
857    fn test_vote_request_handling() {
858        let election = create_test_election("node-1");
859
860        let request = VoteRequest {
861            candidate_id: "node-2".to_string(),
862            term: 1,
863            priority: 100,
864            last_fencing_token: 0,
865        };
866
867        let vote = election.handle_vote_request(&request);
868        assert!(vote.granted);
869        assert_eq!(vote.voter_id, "node-1");
870        assert_eq!(vote.candidate_id, "node-2");
871
872        // Same term, already voted
873        let request2 = VoteRequest {
874            candidate_id: "node-3".to_string(),
875            term: 1,
876            priority: 100,
877            last_fencing_token: 0,
878        };
879
880        let vote2 = election.handle_vote_request(&request2);
881        assert!(!vote2.granted); // Already voted for node-2
882    }
883
884    #[test]
885    fn test_vote_request_higher_term() {
886        let election = create_test_election("node-1");
887
888        // First vote at term 1
889        let request1 = VoteRequest {
890            candidate_id: "node-2".to_string(),
891            term: 1,
892            priority: 100,
893            last_fencing_token: 0,
894        };
895        election.handle_vote_request(&request1);
896
897        // Vote at higher term should be granted
898        let request2 = VoteRequest {
899            candidate_id: "node-3".to_string(),
900            term: 2,
901            priority: 100,
902            last_fencing_token: 0,
903        };
904
905        let vote = election.handle_vote_request(&request2);
906        assert!(vote.granted);
907    }
908
909    #[tokio::test]
910    async fn test_accept_leader() {
911        let election = create_test_election("node-1");
912
913        let leader_info = LeaderInfo {
914            leader_id: "node-2".to_string(),
915            fencing_token: 5,
916            lease_acquired_at: current_time_ms(),
917            lease_expires_at: current_time_ms() + 15000,
918            term: 1,
919        };
920
921        election.accept_leader(leader_info.clone()).await.unwrap();
922
923        assert_eq!(election.state(), ElectionState::Follower);
924        assert!(!election.is_leader());
925
926        let leader = election.leader().unwrap();
927        assert_eq!(leader.leader_id, "node-2");
928    }
929
930    #[test]
931    fn test_validate_fencing_token() {
932        let election = create_test_election("node-1");
933
934        assert!(election.validate_fencing_token(0));
935        assert!(election.validate_fencing_token(100));
936    }
937
938    #[tokio::test]
939    async fn test_needs_election() {
940        let election = create_test_election("node-1");
941
942        assert!(election.needs_election()); // No leader yet
943
944        election.start_election().await.unwrap();
945        assert!(!election.needs_election()); // We're the leader
946    }
947
948    #[test]
949    fn test_election_stats() {
950        let election = create_test_election("node-1");
951        election.register_member("node-2".to_string(), 100);
952        election.register_member("node-3".to_string(), 100);
953
954        let stats = election.stats();
955        assert_eq!(stats.state, ElectionState::NoLeader);
956        assert_eq!(stats.member_count, 3); // 2 registered + self
957        assert_eq!(stats.alive_members, 3);
958    }
959
960    #[test]
961    fn test_member_management() {
962        let election = create_test_election("node-1");
963
964        election.register_member("node-2".to_string(), 100);
965        let stats = election.stats();
966        assert_eq!(stats.member_count, 2);
967
968        election.update_member_liveness("node-2", false);
969        let stats = election.stats();
970        assert_eq!(stats.alive_members, 1); // Only self is alive
971
972        election.remove_member("node-2");
973        let stats = election.stats();
974        assert_eq!(stats.member_count, 1);
975    }
976
977    #[tokio::test]
978    async fn test_election_manager() {
979        let manager = ElectionManager::new("node-1".to_string(), ElectionConfig::default());
980
981        let election1 = manager.get_or_create_election("group-1", None);
982        let election2 = manager.get_or_create_election("group-1", None);
983
984        // Should return same instance
985        assert!(Arc::ptr_eq(&election1, &election2));
986
987        // Different group
988        let election3 = manager.get_or_create_election("group-2", None);
989        assert!(!Arc::ptr_eq(&election1, &election3));
990
991        // Become leader for group-1
992        election1.start_election().await.unwrap();
993        assert!(manager.is_leader("group-1"));
994        assert!(!manager.is_leader("group-2"));
995
996        let led = manager.led_groups();
997        assert_eq!(led.len(), 1);
998        assert_eq!(led[0], "group-1");
999    }
1000
1001    #[tokio::test]
1002    async fn test_priority_based_election() {
1003        let config = ElectionConfig {
1004            priority: 50,
1005            ..Default::default()
1006        };
1007        let election =
1008            LeaderElection::new("node-1".to_string(), "test-group".to_string(), config, None);
1009
1010        // Register higher priority node
1011        election.register_member("node-2".to_string(), 100);
1012        election.update_member_liveness("node-2", true);
1013
1014        // Lower priority node should not become leader when higher priority is alive
1015        // Note: In this test, since we can't actually communicate with node-2,
1016        // the election will still succeed. In production, vote responses would matter.
1017    }
1018}