1use hashbrown::HashMap;
17
18use noxu_sync::Mutex;
19
20use super::election_config::ElectionConfig;
21use super::proposal::Proposal;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ElectionState {
26 Idle,
28 Proposing,
30 Voting,
32 Complete,
34 Failed,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ElectionOutcome {
41 Won {
43 master: String,
45 term: u64,
47 },
48 Lost {
50 master: String,
52 term: u64,
54 },
55 NoQuorum {
57 votes_received: u32,
59 votes_needed: u32,
61 },
62 Timeout,
64}
65
66pub struct Election {
74 config: ElectionConfig,
75 state: Mutex<ElectionState>,
76 term: Mutex<u64>,
77 votes: Mutex<HashMap<String, bool>>,
79 proposal: Mutex<Option<Proposal>>,
81 outcome: Mutex<Option<ElectionOutcome>>,
83}
84
85impl Election {
86 pub fn new(config: ElectionConfig) -> Self {
88 Self {
89 config,
90 state: Mutex::new(ElectionState::Idle),
91 term: Mutex::new(0),
92 votes: Mutex::new(HashMap::new()),
93 proposal: Mutex::new(None),
94 outcome: Mutex::new(None),
95 }
96 }
97
98 pub fn get_state(&self) -> ElectionState {
100 *self.state.lock()
101 }
102
103 pub fn get_term(&self) -> u64 {
105 *self.term.lock()
106 }
107
108 pub fn increment_term(&self) -> u64 {
110 let mut term = self.term.lock();
111 *term += 1;
112 *term
113 }
114
115 pub fn start_election(
123 &self,
124 proposal: Proposal,
125 ) -> crate::error::Result<()> {
126 let mut state = self.state.lock();
127 if *state != ElectionState::Idle {
128 return Err(crate::error::RepError::ElectionFailed(format!(
129 "cannot start election: state is {:?}, expected Idle",
130 *state
131 )));
132 }
133 *state = ElectionState::Proposing;
134
135 let mut term = self.term.lock();
136 *term = proposal.term;
137
138 *self.proposal.lock() = Some(proposal);
139 self.votes.lock().clear();
140 *self.outcome.lock() = None;
141
142 Ok(())
143 }
144
145 pub fn record_vote(
155 &self,
156 voter: &str,
157 granted: bool,
158 ) -> crate::error::Result<()> {
159 let mut state = self.state.lock();
160 match *state {
161 ElectionState::Proposing | ElectionState::Voting => {
162 if *state == ElectionState::Proposing {
164 *state = ElectionState::Voting;
165 }
166 }
167 other => {
168 return Err(crate::error::RepError::ElectionFailed(format!(
169 "cannot record vote: state is {:?}",
170 other
171 )));
172 }
173 }
174 drop(state);
175
176 self.votes.lock().insert(voter.to_string(), granted);
177 Ok(())
178 }
179
180 pub fn check_quorum(&self, quorum_size: u32) -> Option<ElectionOutcome> {
190 let votes = self.votes.lock();
191 let yes_count = votes.values().filter(|&&v| v).count() as u32;
192
193 if yes_count >= quorum_size {
194 let proposal = self.proposal.lock();
195 if let Some(ref p) = *proposal {
196 return Some(ElectionOutcome::Won {
197 master: p.node_name.clone(),
198 term: p.term,
199 });
200 }
201 }
202
203 let total = votes.len() as u32;
207 let no_count = total - yes_count;
208 let _ = no_count; None
215 }
216
217 pub fn evaluate_proposal(&self, incoming: &Proposal) -> bool {
226 let proposal = self.proposal.lock();
227 match &*proposal {
228 None => true,
229 Some(ours) => incoming.is_better_than(ours),
230 }
231 }
232
233 pub fn complete(
243 &self,
244 outcome: ElectionOutcome,
245 ) -> crate::error::Result<()> {
246 let mut state = self.state.lock();
247 match *state {
248 ElectionState::Complete | ElectionState::Failed => {
249 return Err(crate::error::RepError::ElectionFailed(format!(
250 "election already concluded: state is {:?}",
251 *state
252 )));
253 }
254 _ => {}
255 }
256
257 *state = match &outcome {
258 ElectionOutcome::Won { .. } | ElectionOutcome::Lost { .. } => {
259 ElectionState::Complete
260 }
261 ElectionOutcome::NoQuorum { .. } | ElectionOutcome::Timeout => {
262 ElectionState::Failed
263 }
264 };
265
266 *self.outcome.lock() = Some(outcome);
267 Ok(())
268 }
269
270 pub fn reset(&self) {
274 *self.state.lock() = ElectionState::Idle;
275 self.votes.lock().clear();
276 *self.proposal.lock() = None;
277 *self.outcome.lock() = None;
278 }
279
280 pub fn get_outcome(&self) -> Option<ElectionOutcome> {
282 self.outcome.lock().clone()
283 }
284
285 pub fn config(&self) -> &ElectionConfig {
287 &self.config
288 }
289
290 pub fn yes_votes(&self) -> u32 {
292 self.votes.lock().values().filter(|&&v| v).count() as u32
293 }
294
295 pub fn total_votes(&self) -> u32 {
297 self.votes.lock().len() as u32
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 fn default_config() -> ElectionConfig {
306 ElectionConfig::new()
307 }
308
309 fn make_proposal(
310 name: &str,
311 vlsn: u64,
312 priority: u32,
313 term: u64,
314 ) -> Proposal {
315 Proposal::with_timestamp(name.into(), vlsn, priority, term, 0)
316 }
317
318 #[test]
321 fn test_initial_state_is_idle() {
322 let e = Election::new(default_config());
323 assert_eq!(e.get_state(), ElectionState::Idle);
324 assert_eq!(e.get_term(), 0);
325 assert!(e.get_outcome().is_none());
326 }
327
328 #[test]
329 fn test_start_election_transitions_to_proposing() {
330 let e = Election::new(default_config());
331 let p = make_proposal("node1", 100, 1, 1);
332 e.start_election(p).unwrap();
333 assert_eq!(e.get_state(), ElectionState::Proposing);
334 assert_eq!(e.get_term(), 1);
335 }
336
337 #[test]
338 fn test_start_election_fails_if_not_idle() {
339 let e = Election::new(default_config());
340 let p = make_proposal("node1", 100, 1, 1);
341 e.start_election(p.clone()).unwrap();
342 assert!(e.start_election(p).is_err());
343 }
344
345 #[test]
346 fn test_record_vote_transitions_to_voting() {
347 let e = Election::new(default_config());
348 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
349 e.record_vote("voter1", true).unwrap();
350 assert_eq!(e.get_state(), ElectionState::Voting);
351 }
352
353 #[test]
354 fn test_record_vote_fails_if_idle() {
355 let e = Election::new(default_config());
356 assert!(e.record_vote("voter1", true).is_err());
357 }
358
359 #[test]
362 fn test_yes_and_total_votes() {
363 let e = Election::new(default_config());
364 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
365 e.record_vote("v1", true).unwrap();
366 e.record_vote("v2", false).unwrap();
367 e.record_vote("v3", true).unwrap();
368
369 assert_eq!(e.yes_votes(), 2);
370 assert_eq!(e.total_votes(), 3);
371 }
372
373 #[test]
374 fn test_duplicate_voter_overwrites() {
375 let e = Election::new(default_config());
376 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
377 e.record_vote("v1", false).unwrap();
378 e.record_vote("v1", true).unwrap();
379
380 assert_eq!(e.yes_votes(), 1);
381 assert_eq!(e.total_votes(), 1);
382 }
383
384 #[test]
387 fn test_check_quorum_reached() {
388 let e = Election::new(default_config());
389 e.start_election(make_proposal("node1", 100, 1, 5)).unwrap();
390 e.record_vote("v1", true).unwrap();
391 e.record_vote("v2", true).unwrap();
392
393 let result = e.check_quorum(2);
394 assert!(result.is_some());
395 match result.unwrap() {
396 ElectionOutcome::Won { master, term } => {
397 assert_eq!(master, "node1");
398 assert_eq!(term, 5);
399 }
400 other => panic!("expected Won, got {:?}", other),
401 }
402 }
403
404 #[test]
405 fn test_check_quorum_not_yet_reached() {
406 let e = Election::new(default_config());
407 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
408 e.record_vote("v1", true).unwrap();
409
410 assert!(e.check_quorum(3).is_none());
411 }
412
413 #[test]
414 fn test_check_quorum_no_votes_false_dont_count() {
415 let e = Election::new(default_config());
416 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
417 e.record_vote("v1", true).unwrap();
418 e.record_vote("v2", false).unwrap();
419 e.record_vote("v3", false).unwrap();
420
421 assert!(e.check_quorum(2).is_none());
422 }
423
424 #[test]
427 fn test_evaluate_no_own_proposal_votes_yes() {
428 let e = Election::new(default_config());
429 let incoming = make_proposal("other", 100, 1, 1);
430 assert!(e.evaluate_proposal(&incoming));
431 }
432
433 #[test]
434 fn test_evaluate_better_incoming_votes_yes() {
435 let e = Election::new(default_config());
436 let ours = make_proposal("node1", 100, 1, 1);
437 e.start_election(ours).unwrap();
438
439 let better = make_proposal("node2", 200, 1, 1); assert!(e.evaluate_proposal(&better));
441 }
442
443 #[test]
444 fn test_evaluate_worse_incoming_votes_no() {
445 let e = Election::new(default_config());
446 let ours = make_proposal("node1", 200, 5, 2);
447 e.start_election(ours).unwrap();
448
449 let worse = make_proposal("node2", 100, 1, 1); assert!(!e.evaluate_proposal(&worse));
451 }
452
453 #[test]
454 fn test_evaluate_equal_proposal_votes_no() {
455 let e = Election::new(default_config());
456 let ours = make_proposal("node1", 100, 1, 1);
457 e.start_election(ours).unwrap();
458
459 let same = make_proposal("node1", 100, 1, 1);
460 assert!(!e.evaluate_proposal(&same));
461 }
462
463 #[test]
466 fn test_complete_won() {
467 let e = Election::new(default_config());
468 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
469
470 let outcome = ElectionOutcome::Won { master: "node1".into(), term: 1 };
471 e.complete(outcome.clone()).unwrap();
472 assert_eq!(e.get_state(), ElectionState::Complete);
473 assert_eq!(e.get_outcome(), Some(outcome));
474 }
475
476 #[test]
477 fn test_complete_lost() {
478 let e = Election::new(default_config());
479 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
480
481 let outcome = ElectionOutcome::Lost { master: "node2".into(), term: 1 };
482 e.complete(outcome).unwrap();
483 assert_eq!(e.get_state(), ElectionState::Complete);
484 }
485
486 #[test]
487 fn test_complete_no_quorum() {
488 let e = Election::new(default_config());
489 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
490
491 let outcome =
492 ElectionOutcome::NoQuorum { votes_received: 1, votes_needed: 3 };
493 e.complete(outcome).unwrap();
494 assert_eq!(e.get_state(), ElectionState::Failed);
495 }
496
497 #[test]
498 fn test_complete_timeout() {
499 let e = Election::new(default_config());
500 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
501 e.complete(ElectionOutcome::Timeout).unwrap();
502 assert_eq!(e.get_state(), ElectionState::Failed);
503 }
504
505 #[test]
506 fn test_complete_twice_fails() {
507 let e = Election::new(default_config());
508 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
509 e.complete(ElectionOutcome::Timeout).unwrap();
510 assert!(e.complete(ElectionOutcome::Timeout).is_err());
511 }
512
513 #[test]
516 fn test_reset() {
517 let e = Election::new(default_config());
518 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
519 e.record_vote("v1", true).unwrap();
520 e.complete(ElectionOutcome::Won { master: "node1".into(), term: 1 })
521 .unwrap();
522
523 e.reset();
524 assert_eq!(e.get_state(), ElectionState::Idle);
525 assert!(e.get_outcome().is_none());
526 assert_eq!(e.total_votes(), 0);
527 }
528
529 #[test]
530 fn test_reset_allows_new_election() {
531 let e = Election::new(default_config());
532 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
533 e.complete(ElectionOutcome::Timeout).unwrap();
534
535 e.reset();
536 e.start_election(make_proposal("node1", 100, 1, 2)).unwrap();
538 assert_eq!(e.get_state(), ElectionState::Proposing);
539 assert_eq!(e.get_term(), 2);
540 }
541
542 #[test]
545 fn test_increment_term() {
546 let e = Election::new(default_config());
547 assert_eq!(e.get_term(), 0);
548 assert_eq!(e.increment_term(), 1);
549 assert_eq!(e.increment_term(), 2);
550 assert_eq!(e.get_term(), 2);
551 }
552
553 #[test]
556 fn test_full_election_three_node_cluster() {
557 let e = Election::new(default_config());
558
559 let proposal = make_proposal("node1", 150, 5, 1);
561 e.start_election(proposal).unwrap();
562
563 e.record_vote("node1", true).unwrap();
565
566 e.record_vote("node2", true).unwrap();
568
569 let result = e.check_quorum(2).unwrap();
571 match &result {
572 ElectionOutcome::Won { master, term } => {
573 assert_eq!(master, "node1");
574 assert_eq!(*term, 1);
575 }
576 other => panic!("expected Won, got {:?}", other),
577 }
578
579 e.complete(result).unwrap();
580 assert_eq!(e.get_state(), ElectionState::Complete);
581 }
582
583 #[test]
584 fn test_full_election_lost() {
585 let e = Election::new(default_config());
586
587 let our_proposal = make_proposal("node1", 50, 1, 1);
589 e.start_election(our_proposal).unwrap();
590
591 let competing = make_proposal("node2", 200, 1, 1);
593 assert!(e.evaluate_proposal(&competing)); e.record_vote("node2", false).unwrap();
597 e.record_vote("node3", false).unwrap();
598
599 assert!(e.check_quorum(2).is_none());
601
602 e.complete(ElectionOutcome::Lost { master: "node2".into(), term: 1 })
604 .unwrap();
605 assert_eq!(e.get_state(), ElectionState::Complete);
606 }
607
608 #[test]
609 fn test_designated_primary_self_election() {
610 let config = ElectionConfig::builder().designated_primary(true).build();
613 let e = Election::new(config);
614
615 let proposal = make_proposal("primary", 100, 1, 1);
616 e.start_election(proposal).unwrap();
617 e.record_vote("primary", true).unwrap();
618
619 let result = e.check_quorum(1).unwrap();
621 match result {
622 ElectionOutcome::Won { master, term } => {
623 assert_eq!(master, "primary");
624 assert_eq!(term, 1);
625 }
626 other => panic!("expected Won, got {:?}", other),
627 }
628
629 assert!(e.config().designated_primary());
630 }
631
632 #[test]
633 fn test_multiple_rounds() {
634 let e = Election::new(default_config());
635
636 e.start_election(make_proposal("node1", 100, 1, 1)).unwrap();
638 e.complete(ElectionOutcome::Timeout).unwrap();
639 assert_eq!(e.get_state(), ElectionState::Failed);
640
641 e.reset();
643 let new_term = e.increment_term();
644 e.start_election(make_proposal("node1", 100, 1, new_term)).unwrap();
645 e.record_vote("node1", true).unwrap();
646 e.record_vote("node2", true).unwrap();
647
648 let result = e.check_quorum(2).unwrap();
649 e.complete(result).unwrap();
650 assert_eq!(e.get_state(), ElectionState::Complete);
651 }
652
653 #[test]
656 fn test_send_sync() {
657 fn assert_send_sync<T: Send + Sync>() {}
658 assert_send_sync::<Election>();
659 }
660}