1use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use tokio::sync::mpsc;
15use tracing::{debug, info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ElectionConfig {
20 pub lease_duration_ms: u64,
22 pub renewal_interval_ms: u64,
24 pub election_timeout_ms: u64,
26 pub election_backoff_ms: u64,
28 pub auto_elect: bool,
30 pub priority: u32,
32}
33
34impl Default for ElectionConfig {
35 fn default() -> Self {
36 Self {
37 lease_duration_ms: 15000, renewal_interval_ms: 5000, election_timeout_ms: 10000, election_backoff_ms: 1000, auto_elect: true,
42 priority: 100,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ElectionState {
50 NoLeader,
52 Electing,
54 Leader,
56 Follower,
58 LeaderExpired,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct LeaderInfo {
65 pub leader_id: String,
67 pub fencing_token: u64,
69 pub lease_acquired_at: u64,
71 pub lease_expires_at: u64,
73 pub term: u64,
75}
76
77impl LeaderInfo {
78 pub fn is_valid(&self) -> bool {
80 current_time_ms() < self.lease_expires_at
81 }
82
83 pub fn time_remaining_ms(&self) -> u64 {
85 let now = current_time_ms();
86 self.lease_expires_at.saturating_sub(now)
87 }
88}
89
90#[derive(Debug, Clone)]
92pub enum ElectionEvent {
93 BecameLeader { term: u64, fencing_token: u64 },
95 LostLeadership {
97 term: u64,
98 new_leader: Option<String>,
99 },
100 NewLeader { leader_id: String, term: u64 },
102 LeaseRenewed { expires_at: u64 },
104 ElectionStarted { term: u64 },
106 ElectionFailed { term: u64, reason: String },
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct Vote {
113 pub voter_id: String,
115 pub candidate_id: String,
117 pub term: u64,
119 pub granted: bool,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct VoteRequest {
126 pub candidate_id: String,
128 pub term: u64,
130 pub priority: u32,
132 pub last_fencing_token: u64,
134}
135
136struct ElectionInner {
138 state: ElectionState,
140 leader: Option<LeaderInfo>,
142 current_term: u64,
144 voted_for: Option<String>,
146 votes_received: HashMap<String, bool>,
148 last_activity: u64,
150}
151
152pub struct LeaderElection {
154 node_id: String,
156 group_id: String,
158 config: ElectionConfig,
160 inner: Arc<RwLock<ElectionInner>>,
162 fencing_token: AtomicU64,
164 _running: AtomicBool,
166 event_tx: Option<mpsc::Sender<ElectionEvent>>,
168 members: Arc<RwLock<HashMap<String, MemberInfo>>>,
170}
171
172#[derive(Debug, Clone)]
174struct MemberInfo {
175 _node_id: String,
176 priority: u32,
177 is_alive: bool,
178 last_seen: u64,
179}
180
181impl LeaderElection {
182 pub fn new(
184 node_id: String,
185 group_id: String,
186 config: ElectionConfig,
187 event_tx: Option<mpsc::Sender<ElectionEvent>>,
188 ) -> Self {
189 Self {
190 node_id,
191 group_id,
192 config,
193 inner: Arc::new(RwLock::new(ElectionInner {
194 state: ElectionState::NoLeader,
195 leader: None,
196 current_term: 0,
197 voted_for: None,
198 votes_received: HashMap::new(),
199 last_activity: current_time_ms(),
200 })),
201 fencing_token: AtomicU64::new(0),
202 _running: AtomicBool::new(false),
203 event_tx,
204 members: Arc::new(RwLock::new(HashMap::new())),
205 }
206 }
207
208 pub fn register_member(&self, node_id: String, priority: u32) {
210 self.members.write().insert(
211 node_id.clone(),
212 MemberInfo {
213 _node_id: node_id,
214 priority,
215 is_alive: true,
216 last_seen: current_time_ms(),
217 },
218 );
219 }
220
221 pub fn update_member_liveness(&self, node_id: &str, is_alive: bool) {
223 if let Some(member) = self.members.write().get_mut(node_id) {
224 member.is_alive = is_alive;
225 member.last_seen = current_time_ms();
226 }
227 }
228
229 pub fn remove_member(&self, node_id: &str) {
231 self.members.write().remove(node_id);
232 }
233
234 pub fn state(&self) -> ElectionState {
236 self.inner.read().state
237 }
238
239 pub fn is_leader(&self) -> bool {
241 let inner = self.inner.read();
242 inner.state == ElectionState::Leader
243 && inner.leader.as_ref().map(|l| l.is_valid()).unwrap_or(false)
244 }
245
246 pub fn leader(&self) -> Option<LeaderInfo> {
248 let inner = self.inner.read();
249 inner.leader.clone().filter(|l| l.is_valid())
250 }
251
252 pub fn current_term(&self) -> u64 {
254 self.inner.read().current_term
255 }
256
257 pub fn fencing_token(&self) -> u64 {
259 self.fencing_token.load(Ordering::SeqCst)
260 }
261
262 pub async fn start_election(&self) -> Result<bool, ElectionError> {
264 let term = {
265 let mut inner = self.inner.write();
266
267 inner.current_term += 1;
269 let term = inner.current_term;
270 inner.state = ElectionState::Electing;
271 inner.voted_for = Some(self.node_id.clone());
272 inner.votes_received.clear();
273 inner.votes_received.insert(self.node_id.clone(), true); inner.last_activity = current_time_ms();
275 term
276 };
277
278 info!(
279 node_id = %self.node_id,
280 group_id = %self.group_id,
281 term = term,
282 "Starting election"
283 );
284
285 self.emit_event(ElectionEvent::ElectionStarted { term })
286 .await;
287
288 let _vote_request = VoteRequest {
290 candidate_id: self.node_id.clone(),
291 term,
292 priority: self.config.priority,
293 last_fencing_token: self.fencing_token.load(Ordering::SeqCst),
294 };
295
296 let quorum = {
301 let members = self.members.read();
302 let alive_members: Vec<_> = members.values().filter(|m| m.is_alive).collect();
303 let total_members = alive_members.len() + 1; (total_members / 2) + 1
305 };
306
307 let votes = 1; if votes >= quorum {
312 return self.become_leader(term).await;
313 }
314
315 let should_become_leader = self.should_become_leader();
318
319 if should_become_leader {
320 self.become_leader(term).await
321 } else {
322 {
323 let mut inner = self.inner.write();
324 inner.state = ElectionState::Follower;
325 }
326 self.emit_event(ElectionEvent::ElectionFailed {
327 term,
328 reason: "Did not receive quorum".to_string(),
329 })
330 .await;
331 Ok(false)
332 }
333 }
334
335 fn should_become_leader(&self) -> bool {
337 let members = self.members.read();
338 let my_priority = self.config.priority;
339
340 for member in members.values() {
342 if member.is_alive && member.priority > my_priority {
343 return false;
344 }
345 }
346
347 true
348 }
349
350 async fn become_leader(&self, term: u64) -> Result<bool, ElectionError> {
352 let new_token = self.fencing_token.fetch_add(1, Ordering::SeqCst) + 1;
353 let now = current_time_ms();
354
355 let leader_info = LeaderInfo {
356 leader_id: self.node_id.clone(),
357 fencing_token: new_token,
358 lease_acquired_at: now,
359 lease_expires_at: now + self.config.lease_duration_ms,
360 term,
361 };
362
363 {
364 let mut inner = self.inner.write();
365 inner.state = ElectionState::Leader;
366 inner.leader = Some(leader_info.clone());
367 inner.last_activity = now;
368 }
369
370 info!(
371 node_id = %self.node_id,
372 group_id = %self.group_id,
373 term = term,
374 fencing_token = new_token,
375 "Became leader"
376 );
377
378 self.emit_event(ElectionEvent::BecameLeader {
379 term,
380 fencing_token: new_token,
381 })
382 .await;
383
384 Ok(true)
385 }
386
387 pub fn handle_vote_request(&self, request: &VoteRequest) -> Vote {
389 let mut inner = self.inner.write();
390
391 if request.term > inner.current_term {
393 inner.current_term = request.term;
394 inner.voted_for = None;
395 inner.state = ElectionState::Follower;
396 }
397
398 let granted = if request.term < inner.current_term {
400 false
402 } else if inner.voted_for.is_some()
403 && inner.voted_for.as_ref() != Some(&request.candidate_id)
404 {
405 false
407 } else {
408 inner.voted_for = Some(request.candidate_id.clone());
410 inner.last_activity = current_time_ms();
411 true
412 };
413
414 debug!(
415 node_id = %self.node_id,
416 candidate = %request.candidate_id,
417 term = request.term,
418 granted = granted,
419 "Processed vote request"
420 );
421
422 Vote {
423 voter_id: self.node_id.clone(),
424 candidate_id: request.candidate_id.clone(),
425 term: request.term,
426 granted,
427 }
428 }
429
430 pub async fn handle_vote(&self, vote: Vote) -> Result<bool, ElectionError> {
432 let (granted_votes, quorum) = {
433 let mut inner = self.inner.write();
434
435 if vote.term != inner.current_term {
437 return Ok(false);
438 }
439
440 if inner.state != ElectionState::Electing {
442 return Ok(false);
443 }
444
445 inner
447 .votes_received
448 .insert(vote.voter_id.clone(), vote.granted);
449
450 let granted_votes = inner.votes_received.values().filter(|&&v| v).count();
452 let total_members = self.members.read().len() + 1; let quorum = (total_members / 2) + 1;
454 (granted_votes, quorum)
455 };
456
457 if granted_votes >= quorum {
458 let term = self.inner.read().current_term;
459 return self.become_leader(term).await;
460 }
461
462 Ok(false)
463 }
464
465 pub async fn accept_leader(&self, leader_info: LeaderInfo) -> Result<(), ElectionError> {
467 {
468 let mut inner = self.inner.write();
469
470 if leader_info.term < inner.current_term {
472 return Err(ElectionError::StaleTerm {
473 received: leader_info.term,
474 current: inner.current_term,
475 });
476 }
477
478 inner.current_term = leader_info.term;
479 inner.state = ElectionState::Follower;
480 inner.leader = Some(leader_info.clone());
481 inner.voted_for = None;
482 inner.last_activity = current_time_ms();
483
484 let current_token = self.fencing_token.load(Ordering::SeqCst);
486 if leader_info.fencing_token > current_token {
487 self.fencing_token
488 .store(leader_info.fencing_token, Ordering::SeqCst);
489 }
490 }
491
492 info!(
493 node_id = %self.node_id,
494 leader = %leader_info.leader_id,
495 term = leader_info.term,
496 "Accepted new leader"
497 );
498
499 self.emit_event(ElectionEvent::NewLeader {
500 leader_id: leader_info.leader_id,
501 term: leader_info.term,
502 })
503 .await;
504
505 Ok(())
506 }
507
508 pub async fn renew_lease(&self) -> Result<(), ElectionError> {
510 let expires_at = {
511 let mut inner = self.inner.write();
512
513 if inner.state != ElectionState::Leader {
514 return Err(ElectionError::NotLeader);
515 }
516
517 let now = current_time_ms();
518 let expires_at = if let Some(ref mut leader) = inner.leader {
519 leader.lease_acquired_at = now;
520 leader.lease_expires_at = now + self.config.lease_duration_ms;
521
522 debug!(
523 node_id = %self.node_id,
524 expires_at = leader.lease_expires_at,
525 "Renewed leadership lease"
526 );
527
528 Some(leader.lease_expires_at)
529 } else {
530 None
531 };
532
533 inner.last_activity = now;
534 expires_at
535 };
536
537 if let Some(expires_at) = expires_at {
538 self.emit_event(ElectionEvent::LeaseRenewed { expires_at })
539 .await;
540 }
541
542 Ok(())
543 }
544
545 pub async fn step_down(&self) -> Result<(), ElectionError> {
547 let term = {
548 let mut inner = self.inner.write();
549
550 if inner.state != ElectionState::Leader {
551 return Err(ElectionError::NotLeader);
552 }
553
554 let term = inner.current_term;
555 inner.state = ElectionState::Follower;
556 inner.leader = None;
557 inner.last_activity = current_time_ms();
558 term
559 };
560
561 info!(
562 node_id = %self.node_id,
563 term = term,
564 "Stepped down from leadership"
565 );
566
567 self.emit_event(ElectionEvent::LostLeadership {
568 term,
569 new_leader: None,
570 })
571 .await;
572
573 Ok(())
574 }
575
576 pub fn needs_election(&self) -> bool {
578 let inner = self.inner.read();
579
580 match inner.state {
581 ElectionState::NoLeader => true,
582 ElectionState::LeaderExpired => true,
583 ElectionState::Follower => {
584 inner.leader.as_ref().map(|l| !l.is_valid()).unwrap_or(true)
586 }
587 _ => false,
588 }
589 }
590
591 pub async fn check_lease(&self) {
593 let lost_leadership_term = {
594 let mut inner = self.inner.write();
595
596 if let Some(ref leader) = inner.leader {
597 if !leader.is_valid() {
598 let was_leader = inner.state == ElectionState::Leader;
599 inner.state = ElectionState::LeaderExpired;
600
601 if was_leader {
602 Some(inner.current_term)
603 } else {
604 None
605 }
606 } else {
607 None
608 }
609 } else {
610 None
611 }
612 };
613
614 if let Some(term) = lost_leadership_term {
615 warn!(
616 node_id = %self.node_id,
617 "Leadership lease expired"
618 );
619
620 self.emit_event(ElectionEvent::LostLeadership {
621 term,
622 new_leader: None,
623 })
624 .await;
625 }
626 }
627
628 pub fn validate_fencing_token(&self, token: u64) -> bool {
630 let current = self.fencing_token.load(Ordering::SeqCst);
631 token >= current
632 }
633
634 pub fn stats(&self) -> ElectionStats {
636 let inner = self.inner.read();
637 let members = self.members.read();
638
639 ElectionStats {
640 state: inner.state,
641 current_term: inner.current_term,
642 leader_id: inner.leader.as_ref().map(|l| l.leader_id.clone()),
643 fencing_token: self.fencing_token.load(Ordering::SeqCst),
644 lease_remaining_ms: inner.leader.as_ref().map(|l| l.time_remaining_ms()),
645 member_count: members.len() + 1,
646 alive_members: members.values().filter(|m| m.is_alive).count() + 1,
647 }
648 }
649
650 async fn emit_event(&self, event: ElectionEvent) {
652 if let Some(ref tx) = self.event_tx {
653 let _ = tx.send(event).await;
654 }
655 }
656}
657
658#[derive(Debug, Clone, Serialize, Deserialize)]
660pub struct ElectionStats {
661 pub state: ElectionState,
662 pub current_term: u64,
663 pub leader_id: Option<String>,
664 pub fencing_token: u64,
665 pub lease_remaining_ms: Option<u64>,
666 pub member_count: usize,
667 pub alive_members: usize,
668}
669
670#[derive(Debug, Clone, thiserror::Error)]
672pub enum ElectionError {
673 #[error("Not the leader")]
674 NotLeader,
675
676 #[error("Election already in progress")]
677 ElectionInProgress,
678
679 #[error("Stale term: received {received}, current {current}")]
680 StaleTerm { received: u64, current: u64 },
681
682 #[error("Invalid fencing token: {received} < {current}")]
683 InvalidFencingToken { received: u64, current: u64 },
684
685 #[error("No quorum available")]
686 NoQuorum,
687
688 #[error("Election timeout")]
689 Timeout,
690}
691
692fn current_time_ms() -> u64 {
693 std::time::SystemTime::now()
694 .duration_since(std::time::UNIX_EPOCH)
695 .unwrap_or_default()
696 .as_millis() as u64
697}
698
699pub struct ElectionManager {
701 node_id: String,
703 default_config: ElectionConfig,
705 elections: Arc<RwLock<HashMap<String, Arc<LeaderElection>>>>,
707}
708
709impl ElectionManager {
710 pub fn new(node_id: String, config: ElectionConfig) -> Self {
712 Self {
713 node_id,
714 default_config: config,
715 elections: Arc::new(RwLock::new(HashMap::new())),
716 }
717 }
718
719 pub fn get_or_create_election(
721 &self,
722 group_id: &str,
723 event_tx: Option<mpsc::Sender<ElectionEvent>>,
724 ) -> Arc<LeaderElection> {
725 let mut elections = self.elections.write();
726
727 if let Some(election) = elections.get(group_id) {
728 return election.clone();
729 }
730
731 let election = Arc::new(LeaderElection::new(
732 self.node_id.clone(),
733 group_id.to_string(),
734 self.default_config.clone(),
735 event_tx,
736 ));
737
738 elections.insert(group_id.to_string(), election.clone());
739 election
740 }
741
742 pub fn get_election(&self, group_id: &str) -> Option<Arc<LeaderElection>> {
744 self.elections.read().get(group_id).cloned()
745 }
746
747 pub fn is_leader(&self, group_id: &str) -> bool {
749 self.elections
750 .read()
751 .get(group_id)
752 .map(|e| e.is_leader())
753 .unwrap_or(false)
754 }
755
756 pub fn fencing_token(&self, group_id: &str) -> Option<u64> {
758 self.elections
759 .read()
760 .get(group_id)
761 .map(|e| e.fencing_token())
762 }
763
764 pub fn led_groups(&self) -> Vec<String> {
766 self.elections
767 .read()
768 .iter()
769 .filter(|(_, e)| e.is_leader())
770 .map(|(id, _)| id.clone())
771 .collect()
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use std::time::Duration;
779
780 fn create_test_election(node_id: &str) -> LeaderElection {
781 let config = ElectionConfig::default();
782 LeaderElection::new(node_id.to_string(), "test-group".to_string(), config, None)
783 }
784
785 #[tokio::test]
786 async fn test_single_node_election() {
787 let election = create_test_election("node-1");
788
789 let result = election.start_election().await;
791 assert!(result.is_ok());
792 assert!(result.unwrap());
793 assert!(election.is_leader());
794 assert_eq!(election.state(), ElectionState::Leader);
795 }
796
797 #[tokio::test]
798 async fn test_leader_info() {
799 let election = create_test_election("node-1");
800 election.start_election().await.unwrap();
801
802 let leader = election.leader();
803 assert!(leader.is_some());
804
805 let leader_info = leader.unwrap();
806 assert_eq!(leader_info.leader_id, "node-1");
807 assert!(leader_info.is_valid());
808 assert!(leader_info.fencing_token > 0);
809 }
810
811 #[tokio::test]
812 async fn test_fencing_token_increases() {
813 let election = create_test_election("node-1");
814
815 let token1 = election.fencing_token();
816 election.start_election().await.unwrap();
817 let token2 = election.fencing_token();
818
819 assert!(token2 > token1);
820
821 election.step_down().await.unwrap();
823 election.start_election().await.unwrap();
824 let token3 = election.fencing_token();
825
826 assert!(token3 > token2);
827 }
828
829 #[tokio::test]
830 async fn test_lease_renewal() {
831 let election = create_test_election("node-1");
832 election.start_election().await.unwrap();
833
834 let leader1 = election.leader().unwrap();
835 let expires1 = leader1.lease_expires_at;
836
837 tokio::time::sleep(Duration::from_millis(10)).await;
839 election.renew_lease().await.unwrap();
840
841 let leader2 = election.leader().unwrap();
842 assert!(leader2.lease_expires_at >= expires1);
843 }
844
845 #[tokio::test]
846 async fn test_step_down() {
847 let election = create_test_election("node-1");
848 election.start_election().await.unwrap();
849 assert!(election.is_leader());
850
851 election.step_down().await.unwrap();
852 assert!(!election.is_leader());
853 assert_eq!(election.state(), ElectionState::Follower);
854 }
855
856 #[test]
857 fn test_vote_request_handling() {
858 let election = create_test_election("node-1");
859
860 let request = VoteRequest {
861 candidate_id: "node-2".to_string(),
862 term: 1,
863 priority: 100,
864 last_fencing_token: 0,
865 };
866
867 let vote = election.handle_vote_request(&request);
868 assert!(vote.granted);
869 assert_eq!(vote.voter_id, "node-1");
870 assert_eq!(vote.candidate_id, "node-2");
871
872 let request2 = VoteRequest {
874 candidate_id: "node-3".to_string(),
875 term: 1,
876 priority: 100,
877 last_fencing_token: 0,
878 };
879
880 let vote2 = election.handle_vote_request(&request2);
881 assert!(!vote2.granted); }
883
884 #[test]
885 fn test_vote_request_higher_term() {
886 let election = create_test_election("node-1");
887
888 let request1 = VoteRequest {
890 candidate_id: "node-2".to_string(),
891 term: 1,
892 priority: 100,
893 last_fencing_token: 0,
894 };
895 election.handle_vote_request(&request1);
896
897 let request2 = VoteRequest {
899 candidate_id: "node-3".to_string(),
900 term: 2,
901 priority: 100,
902 last_fencing_token: 0,
903 };
904
905 let vote = election.handle_vote_request(&request2);
906 assert!(vote.granted);
907 }
908
909 #[tokio::test]
910 async fn test_accept_leader() {
911 let election = create_test_election("node-1");
912
913 let leader_info = LeaderInfo {
914 leader_id: "node-2".to_string(),
915 fencing_token: 5,
916 lease_acquired_at: current_time_ms(),
917 lease_expires_at: current_time_ms() + 15000,
918 term: 1,
919 };
920
921 election.accept_leader(leader_info.clone()).await.unwrap();
922
923 assert_eq!(election.state(), ElectionState::Follower);
924 assert!(!election.is_leader());
925
926 let leader = election.leader().unwrap();
927 assert_eq!(leader.leader_id, "node-2");
928 }
929
930 #[test]
931 fn test_validate_fencing_token() {
932 let election = create_test_election("node-1");
933
934 assert!(election.validate_fencing_token(0));
935 assert!(election.validate_fencing_token(100));
936 }
937
938 #[tokio::test]
939 async fn test_needs_election() {
940 let election = create_test_election("node-1");
941
942 assert!(election.needs_election()); election.start_election().await.unwrap();
945 assert!(!election.needs_election()); }
947
948 #[test]
949 fn test_election_stats() {
950 let election = create_test_election("node-1");
951 election.register_member("node-2".to_string(), 100);
952 election.register_member("node-3".to_string(), 100);
953
954 let stats = election.stats();
955 assert_eq!(stats.state, ElectionState::NoLeader);
956 assert_eq!(stats.member_count, 3); assert_eq!(stats.alive_members, 3);
958 }
959
960 #[test]
961 fn test_member_management() {
962 let election = create_test_election("node-1");
963
964 election.register_member("node-2".to_string(), 100);
965 let stats = election.stats();
966 assert_eq!(stats.member_count, 2);
967
968 election.update_member_liveness("node-2", false);
969 let stats = election.stats();
970 assert_eq!(stats.alive_members, 1); election.remove_member("node-2");
973 let stats = election.stats();
974 assert_eq!(stats.member_count, 1);
975 }
976
977 #[tokio::test]
978 async fn test_election_manager() {
979 let manager = ElectionManager::new("node-1".to_string(), ElectionConfig::default());
980
981 let election1 = manager.get_or_create_election("group-1", None);
982 let election2 = manager.get_or_create_election("group-1", None);
983
984 assert!(Arc::ptr_eq(&election1, &election2));
986
987 let election3 = manager.get_or_create_election("group-2", None);
989 assert!(!Arc::ptr_eq(&election1, &election3));
990
991 election1.start_election().await.unwrap();
993 assert!(manager.is_leader("group-1"));
994 assert!(!manager.is_leader("group-2"));
995
996 let led = manager.led_groups();
997 assert_eq!(led.len(), 1);
998 assert_eq!(led[0], "group-1");
999 }
1000
1001 #[tokio::test]
1002 async fn test_priority_based_election() {
1003 let config = ElectionConfig {
1004 priority: 50,
1005 ..Default::default()
1006 };
1007 let election =
1008 LeaderElection::new("node-1".to_string(), "test-group".to_string(), config, None);
1009
1010 election.register_member("node-2".to_string(), 100);
1012 election.update_member_liveness("node-2", true);
1013
1014 }
1018}