Skip to main content

noxu_rep/elections/
election.rs

1//! Election state machine.
2//!
3//! orchestrates a single
4//! election round: proposing candidacy, collecting votes, evaluating competing
5//! proposals, checking quorum, and recording the outcome.
6//!
7//! The state machine progresses through [`ElectionState`] values:
8//!
9//! ```text
10//! Idle -> Proposing -> Voting -> Complete | Failed
11//! ```
12//!
13//! Thread safety is achieved via `noxu_sync::Mutex` on interior fields so
14//! that vote recording can happen concurrently from multiple network threads.
15
16use hashbrown::HashMap;
17
18use noxu_sync::Mutex;
19
20use super::election_config::ElectionConfig;
21use super::proposal::Proposal;
22
23/// Lifecycle state of an election round.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ElectionState {
26    /// No election in progress.
27    Idle,
28    /// This node has broadcast its proposal and is collecting promises.
29    Proposing,
30    /// Votes are being collected (phase 2 in Paxos terminology).
31    Voting,
32    /// The election completed successfully (won or lost).
33    Complete,
34    /// The election failed (timeout, insufficient votes, etc.).
35    Failed,
36}
37
38/// The result of an election round.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ElectionOutcome {
41    /// This node (or the specified node) won the election.
42    Won {
43        /// Name of the new master.
44        master: String,
45        /// The term in which the election was won.
46        term: u64,
47    },
48    /// Another node won the election.
49    Lost {
50        /// Name of the new master.
51        master: String,
52        /// The term in which the election was won.
53        term: u64,
54    },
55    /// Not enough votes were received to reach quorum.
56    NoQuorum {
57        /// Number of affirmative votes received.
58        votes_received: u32,
59        /// Number of affirmative votes required.
60        votes_needed: u32,
61    },
62    /// The election timed out before reaching a conclusion.
63    Timeout,
64}
65
66/// Manages a single election round.
67///
68/// An `Election` is created for each election attempt. It holds the
69/// configuration, the proposal this node is running on, collected votes, and
70/// the final outcome.
71///
72/// All public methods are safe to call from multiple threads concurrently.
73pub struct Election {
74    config: ElectionConfig,
75    state: Mutex<ElectionState>,
76    term: Mutex<u64>,
77    /// Map of voter name -> whether they granted their vote.
78    votes: Mutex<HashMap<String, bool>>,
79    /// Our proposal for this election round.
80    proposal: Mutex<Option<Proposal>>,
81    /// Final outcome once the election completes.
82    outcome: Mutex<Option<ElectionOutcome>>,
83}
84
85impl Election {
86    /// Create a new election with the given configuration.
87    pub fn new(config: ElectionConfig) -> Self {
88        Self {
89            config,
90            state: Mutex::new(ElectionState::Idle),
91            term: Mutex::new(0),
92            votes: Mutex::new(HashMap::new()),
93            proposal: Mutex::new(None),
94            outcome: Mutex::new(None),
95        }
96    }
97
98    /// Returns the current election state.
99    pub fn get_state(&self) -> ElectionState {
100        *self.state.lock()
101    }
102
103    /// Returns the current term number.
104    pub fn get_term(&self) -> u64 {
105        *self.term.lock()
106    }
107
108    /// Increments the term number and returns the new value.
109    pub fn increment_term(&self) -> u64 {
110        let mut term = self.term.lock();
111        *term += 1;
112        *term
113    }
114
115    /// Start an election with the given proposal.
116    ///
117    /// Transitions state from `Idle` to `Proposing` and records the proposal.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the election is not in the `Idle` state.
122    pub fn start_election(
123        &self,
124        proposal: Proposal,
125    ) -> crate::error::Result<()> {
126        let mut state = self.state.lock();
127        if *state != ElectionState::Idle {
128            return Err(crate::error::RepError::ElectionFailed(format!(
129                "cannot start election: state is {:?}, expected Idle",
130                *state
131            )));
132        }
133        *state = ElectionState::Proposing;
134
135        let mut term = self.term.lock();
136        *term = proposal.term;
137
138        *self.proposal.lock() = Some(proposal);
139        self.votes.lock().clear();
140        *self.outcome.lock() = None;
141
142        Ok(())
143    }
144
145    /// Record a vote from another node.
146    ///
147    /// The election must be in `Proposing` or `Voting` state. After recording
148    /// the first vote the state transitions to `Voting`.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the election is in `Idle`, `Complete`, or `Failed`
153    /// state.
154    pub fn record_vote(
155        &self,
156        voter: &str,
157        granted: bool,
158    ) -> crate::error::Result<()> {
159        let mut state = self.state.lock();
160        match *state {
161            ElectionState::Proposing | ElectionState::Voting => {
162                // Transition to Voting on first vote.
163                if *state == ElectionState::Proposing {
164                    *state = ElectionState::Voting;
165                }
166            }
167            other => {
168                return Err(crate::error::RepError::ElectionFailed(format!(
169                    "cannot record vote: state is {:?}",
170                    other
171                )));
172            }
173        }
174        drop(state);
175
176        self.votes.lock().insert(voter.to_string(), granted);
177        Ok(())
178    }
179
180    /// Check whether enough affirmative votes have been received to meet
181    /// quorum.
182    ///
183    /// Returns `Some(ElectionOutcome)` when a determination can be made:
184    /// - `Won` if affirmative votes >= `quorum_size`
185    /// - `NoQuorum` is NOT returned here (the caller should use this together
186    ///   with a timeout to decide when to give up).
187    ///
188    /// Returns `None` if quorum has not yet been reached.
189    pub fn check_quorum(&self, quorum_size: u32) -> Option<ElectionOutcome> {
190        let votes = self.votes.lock();
191        let yes_count = votes.values().filter(|&&v| v).count() as u32;
192
193        if yes_count >= quorum_size {
194            let proposal = self.proposal.lock();
195            if let Some(ref p) = *proposal {
196                return Some(ElectionOutcome::Won {
197                    master: p.node_name.clone(),
198                    term: p.term,
199                });
200            }
201        }
202
203        // Check if it's impossible to reach quorum (all votes in, not enough
204        // yes). This is an optimization  -  in practice the caller also uses a
205        // timeout.
206        let total = votes.len() as u32;
207        let no_count = total - yes_count;
208        // If remaining possible votes can't make up the deficit, report
209        // NoQuorum. But we don't know the total electorate size here, so we
210        // only report when we have enough yes votes. The caller handles
211        // timeout-based NoQuorum.
212        let _ = no_count; // acknowledge but don't act on it here
213
214        None
215    }
216
217    /// Evaluate an incoming proposal from another candidate.
218    ///
219    /// Returns `true` (vote "yes") if:
220    /// - We have no proposal of our own, OR
221    /// - The incoming proposal is better than ours according to the
222    ///   [`Proposal`] ordering.
223    ///
224    /// Returns `false` (vote "no") otherwise.
225    pub fn evaluate_proposal(&self, incoming: &Proposal) -> bool {
226        let proposal = self.proposal.lock();
227        match &*proposal {
228            None => true,
229            Some(ours) => incoming.is_better_than(ours),
230        }
231    }
232
233    /// Complete the election with the given outcome.
234    ///
235    /// Transitions state to `Complete` (for `Won`/`Lost`) or `Failed` (for
236    /// `NoQuorum`/`Timeout`).
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the election is already in `Complete` or `Failed`
241    /// state.
242    pub fn complete(
243        &self,
244        outcome: ElectionOutcome,
245    ) -> crate::error::Result<()> {
246        let mut state = self.state.lock();
247        match *state {
248            ElectionState::Complete | ElectionState::Failed => {
249                return Err(crate::error::RepError::ElectionFailed(format!(
250                    "election already concluded: state is {:?}",
251                    *state
252                )));
253            }
254            _ => {}
255        }
256
257        *state = match &outcome {
258            ElectionOutcome::Won { .. } | ElectionOutcome::Lost { .. } => {
259                ElectionState::Complete
260            }
261            ElectionOutcome::NoQuorum { .. } | ElectionOutcome::Timeout => {
262                ElectionState::Failed
263            }
264        };
265
266        *self.outcome.lock() = Some(outcome);
267        Ok(())
268    }
269
270    /// Reset the election for a new round.
271    ///
272    /// Clears all state and returns to `Idle`.
273    pub fn reset(&self) {
274        *self.state.lock() = ElectionState::Idle;
275        self.votes.lock().clear();
276        *self.proposal.lock() = None;
277        *self.outcome.lock() = None;
278    }
279
280    /// Returns the election outcome, if the election has concluded.
281    pub fn get_outcome(&self) -> Option<ElectionOutcome> {
282        self.outcome.lock().clone()
283    }
284
285    /// Returns the election configuration.
286    pub fn config(&self) -> &ElectionConfig {
287        &self.config
288    }
289
290    /// Returns the number of affirmative votes received so far.
291    pub fn yes_votes(&self) -> u32 {
292        self.votes.lock().values().filter(|&&v| v).count() as u32
293    }
294
295    /// Returns the total number of votes received so far.
296    pub fn total_votes(&self) -> u32 {
297        self.votes.lock().len() as u32
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    fn default_config() -> ElectionConfig {
306        ElectionConfig::new()
307    }
308
309    fn make_proposal(
310        name: &str,
311        vlsn: u64,
312        priority: u32,
313        term: u64,
314    ) -> Proposal {
315        Proposal::with_timestamp(name.into(), vlsn, priority, term, 0)
316    }
317
318    // --- State transitions ---
319
320    #[test]
321    fn test_initial_state_is_idle() {
322        let e = Election::new(default_config());
323        assert_eq!(e.get_state(), ElectionState::Idle);
324        assert_eq!(e.get_term(), 0);
325        assert!(e.get_outcome().is_none());
326    }
327
328    #[test]
329    fn test_start_election_transitions_to_proposing() {
330        let e = Election::new(default_config());
331        let p = make_proposal("node1", 100, 1, 1);
332        e.start_election(p).unwrap();
333        assert_eq!(e.get_state(), ElectionState::Proposing);
334        assert_eq!(e.get_term(), 1);
335    }
336
337    #[test]
338    fn test_start_election_fails_if_not_idle() {
339        let e = Election::new(default_config());
340        let p = make_proposal("node1", 100, 1, 1);
341        e.start_election(p.clone()).unwrap();
342        assert!(e.start_election(p).is_err());
343    }
344
345    #[test]
346    fn test_record_vote_transitions_to_voting() {
347        let e = Election::new(default_config());
348        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
349        e.record_vote("voter1", true).unwrap();
350        assert_eq!(e.get_state(), ElectionState::Voting);
351    }
352
353    #[test]
354    fn test_record_vote_fails_if_idle() {
355        let e = Election::new(default_config());
356        assert!(e.record_vote("voter1", true).is_err());
357    }
358
359    // --- Vote counting ---
360
361    #[test]
362    fn test_yes_and_total_votes() {
363        let e = Election::new(default_config());
364        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
365        e.record_vote("v1", true).unwrap();
366        e.record_vote("v2", false).unwrap();
367        e.record_vote("v3", true).unwrap();
368
369        assert_eq!(e.yes_votes(), 2);
370        assert_eq!(e.total_votes(), 3);
371    }
372
373    #[test]
374    fn test_duplicate_voter_overwrites() {
375        let e = Election::new(default_config());
376        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
377        e.record_vote("v1", false).unwrap();
378        e.record_vote("v1", true).unwrap();
379
380        assert_eq!(e.yes_votes(), 1);
381        assert_eq!(e.total_votes(), 1);
382    }
383
384    // --- Quorum ---
385
386    #[test]
387    fn test_check_quorum_reached() {
388        let e = Election::new(default_config());
389        e.start_election(make_proposal("node1", 100, 1, 5)).unwrap();
390        e.record_vote("v1", true).unwrap();
391        e.record_vote("v2", true).unwrap();
392
393        let result = e.check_quorum(2);
394        assert!(result.is_some());
395        match result.unwrap() {
396            ElectionOutcome::Won { master, term } => {
397                assert_eq!(master, "node1");
398                assert_eq!(term, 5);
399            }
400            other => panic!("expected Won, got {:?}", other),
401        }
402    }
403
404    #[test]
405    fn test_check_quorum_not_yet_reached() {
406        let e = Election::new(default_config());
407        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
408        e.record_vote("v1", true).unwrap();
409
410        assert!(e.check_quorum(3).is_none());
411    }
412
413    #[test]
414    fn test_check_quorum_no_votes_false_dont_count() {
415        let e = Election::new(default_config());
416        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
417        e.record_vote("v1", true).unwrap();
418        e.record_vote("v2", false).unwrap();
419        e.record_vote("v3", false).unwrap();
420
421        assert!(e.check_quorum(2).is_none());
422    }
423
424    // --- Evaluate proposals ---
425
426    #[test]
427    fn test_evaluate_no_own_proposal_votes_yes() {
428        let e = Election::new(default_config());
429        let incoming = make_proposal("other", 100, 1, 1);
430        assert!(e.evaluate_proposal(&incoming));
431    }
432
433    #[test]
434    fn test_evaluate_better_incoming_votes_yes() {
435        let e = Election::new(default_config());
436        let ours = make_proposal("node1", 100, 1, 1);
437        e.start_election(ours).unwrap();
438
439        let better = make_proposal("node2", 200, 1, 1); // higher VLSN
440        assert!(e.evaluate_proposal(&better));
441    }
442
443    #[test]
444    fn test_evaluate_worse_incoming_votes_no() {
445        let e = Election::new(default_config());
446        let ours = make_proposal("node1", 200, 5, 2);
447        e.start_election(ours).unwrap();
448
449        let worse = make_proposal("node2", 100, 1, 1); // lower VLSN
450        assert!(!e.evaluate_proposal(&worse));
451    }
452
453    #[test]
454    fn test_evaluate_equal_proposal_votes_no() {
455        let e = Election::new(default_config());
456        let ours = make_proposal("node1", 100, 1, 1);
457        e.start_election(ours).unwrap();
458
459        let same = make_proposal("node1", 100, 1, 1);
460        assert!(!e.evaluate_proposal(&same));
461    }
462
463    // --- Complete ---
464
465    #[test]
466    fn test_complete_won() {
467        let e = Election::new(default_config());
468        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
469
470        let outcome = ElectionOutcome::Won { master: "node1".into(), term: 1 };
471        e.complete(outcome.clone()).unwrap();
472        assert_eq!(e.get_state(), ElectionState::Complete);
473        assert_eq!(e.get_outcome(), Some(outcome));
474    }
475
476    #[test]
477    fn test_complete_lost() {
478        let e = Election::new(default_config());
479        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
480
481        let outcome = ElectionOutcome::Lost { master: "node2".into(), term: 1 };
482        e.complete(outcome).unwrap();
483        assert_eq!(e.get_state(), ElectionState::Complete);
484    }
485
486    #[test]
487    fn test_complete_no_quorum() {
488        let e = Election::new(default_config());
489        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
490
491        let outcome =
492            ElectionOutcome::NoQuorum { votes_received: 1, votes_needed: 3 };
493        e.complete(outcome).unwrap();
494        assert_eq!(e.get_state(), ElectionState::Failed);
495    }
496
497    #[test]
498    fn test_complete_timeout() {
499        let e = Election::new(default_config());
500        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
501        e.complete(ElectionOutcome::Timeout).unwrap();
502        assert_eq!(e.get_state(), ElectionState::Failed);
503    }
504
505    #[test]
506    fn test_complete_twice_fails() {
507        let e = Election::new(default_config());
508        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
509        e.complete(ElectionOutcome::Timeout).unwrap();
510        assert!(e.complete(ElectionOutcome::Timeout).is_err());
511    }
512
513    // --- Reset ---
514
515    #[test]
516    fn test_reset() {
517        let e = Election::new(default_config());
518        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
519        e.record_vote("v1", true).unwrap();
520        e.complete(ElectionOutcome::Won { master: "node1".into(), term: 1 })
521            .unwrap();
522
523        e.reset();
524        assert_eq!(e.get_state(), ElectionState::Idle);
525        assert!(e.get_outcome().is_none());
526        assert_eq!(e.total_votes(), 0);
527    }
528
529    #[test]
530    fn test_reset_allows_new_election() {
531        let e = Election::new(default_config());
532        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
533        e.complete(ElectionOutcome::Timeout).unwrap();
534
535        e.reset();
536        // Should be able to start a new election.
537        e.start_election(make_proposal("node1", 100, 1, 2)).unwrap();
538        assert_eq!(e.get_state(), ElectionState::Proposing);
539        assert_eq!(e.get_term(), 2);
540    }
541
542    // --- Term management ---
543
544    #[test]
545    fn test_increment_term() {
546        let e = Election::new(default_config());
547        assert_eq!(e.get_term(), 0);
548        assert_eq!(e.increment_term(), 1);
549        assert_eq!(e.increment_term(), 2);
550        assert_eq!(e.get_term(), 2);
551    }
552
553    // --- Full election simulation ---
554
555    #[test]
556    fn test_full_election_three_node_cluster() {
557        let e = Election::new(default_config());
558
559        // Node1 starts an election in term 1.
560        let proposal = make_proposal("node1", 150, 5, 1);
561        e.start_election(proposal).unwrap();
562
563        // Node1 votes for itself.
564        e.record_vote("node1", true).unwrap();
565
566        // Node2 votes yes (its VLSN is lower).
567        e.record_vote("node2", true).unwrap();
568
569        // Quorum is 2 out of 3.
570        let result = e.check_quorum(2).unwrap();
571        match &result {
572            ElectionOutcome::Won { master, term } => {
573                assert_eq!(master, "node1");
574                assert_eq!(*term, 1);
575            }
576            other => panic!("expected Won, got {:?}", other),
577        }
578
579        e.complete(result).unwrap();
580        assert_eq!(e.get_state(), ElectionState::Complete);
581    }
582
583    #[test]
584    fn test_full_election_lost() {
585        let e = Election::new(default_config());
586
587        // Our node has low VLSN.
588        let our_proposal = make_proposal("node1", 50, 1, 1);
589        e.start_election(our_proposal).unwrap();
590
591        // A competing proposal from node2 with higher VLSN arrives.
592        let competing = make_proposal("node2", 200, 1, 1);
593        assert!(e.evaluate_proposal(&competing)); // we'd vote yes for them
594
595        // We get rejected by other voters.
596        e.record_vote("node2", false).unwrap();
597        e.record_vote("node3", false).unwrap();
598
599        // No quorum reached.
600        assert!(e.check_quorum(2).is_none());
601
602        // Complete as lost.
603        e.complete(ElectionOutcome::Lost { master: "node2".into(), term: 1 })
604            .unwrap();
605        assert_eq!(e.get_state(), ElectionState::Complete);
606    }
607
608    #[test]
609    fn test_designated_primary_self_election() {
610        // In a 2-node group with designated_primary, the primary can
611        // self-elect with just its own vote (quorum of 1).
612        let config = ElectionConfig::builder().designated_primary(true).build();
613        let e = Election::new(config);
614
615        let proposal = make_proposal("primary", 100, 1, 1);
616        e.start_election(proposal).unwrap();
617        e.record_vote("primary", true).unwrap();
618
619        // Designated primary: quorum of 1 in a 2-node group.
620        let result = e.check_quorum(1).unwrap();
621        match result {
622            ElectionOutcome::Won { master, term } => {
623                assert_eq!(master, "primary");
624                assert_eq!(term, 1);
625            }
626            other => panic!("expected Won, got {:?}", other),
627        }
628
629        assert!(e.config().designated_primary());
630    }
631
632    #[test]
633    fn test_multiple_rounds() {
634        let e = Election::new(default_config());
635
636        // Round 1: timeout.
637        e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
638        e.complete(ElectionOutcome::Timeout).unwrap();
639        assert_eq!(e.get_state(), ElectionState::Failed);
640
641        // Round 2: success.
642        e.reset();
643        let new_term = e.increment_term();
644        e.start_election(make_proposal("node1", 100, 1, new_term)).unwrap();
645        e.record_vote("node1", true).unwrap();
646        e.record_vote("node2", true).unwrap();
647
648        let result = e.check_quorum(2).unwrap();
649        e.complete(result).unwrap();
650        assert_eq!(e.get_state(), ElectionState::Complete);
651    }
652
653    // --- Send + Sync ---
654
655    #[test]
656    fn test_send_sync() {
657        fn assert_send_sync<T: Send + Sync>() {}
658        assert_send_sync::<Election>();
659    }
660}