reddb_server/replication/election.rs
1//! Term-based, quorum-gated automatic election (issue #834, PRD #819, ADR 0030).
2//!
3//! This is the consensus core that turns a primary loss into an automatic,
4//! *safe* promotion. It lives in the first-party-but-decoupled control-plane
5//! supervisor (ADR 0030) — distinct from the data path — and reuses the two
6//! pieces the rest of replication already built:
7//!
8//! * the **commit watermark** ([`super::commit_waiter`] / [`super::quorum`]) —
9//! the highest LSN durably replicated to a quorum that intersects every
10//! possible election majority. Nothing at or below it may ever be rolled
11//! back; and
12//! * the **FAILOVER handover machinery** ([`super::failover`]) — once a
13//! candidate wins, promotion is driven through the same coordinated
14//! role-swap, not a parallel state machine.
15//!
16//! ## The five hard requirements (ADR 0030, issue #834)
17//!
18//! 1. **Dry-run probe.** A candidate first asks "*would* you vote for me?"
19//! without bumping any term. Only a real election bumps the term. This
20//! keeps a flapping candidate from burning through terms and lets the
21//! supervisor probe liveness cheaply.
22//! 2. **Durable last-vote.** A voter persists `(term, voted_for)` *before*
23//! acknowledging a grant, so a voter that crashes and restarts mid-term
24//! never double-votes — the second request in the same term for a
25//! different candidate is refused from disk.
26//! 3. **Watermark vote rule (the safety core).** A voter MUST refuse any
27//! candidate whose log does not cover the commit watermark. An
28//! acknowledged synchronous write sits at or below the watermark, so a
29//! winner necessarily carries it — the write provably survives the
30//! failover. This is the one rule that may not be relaxed.
31//! 4. **Randomized election timeouts.** Candidates wait a randomized
32//! interval before standing, so split votes are rare and self-correcting.
33//! 5. **Membership rules.** A quorum is a *majority of voting members*.
34//! **Witness** members ([#836]) hold no data but vote, so `2 data + 1
35//! witness` is a valid HA shape. A **catching-up** replica is
36//! *non-voting* until it reaches a healthy state — it neither votes nor
37//! stands.
38//!
39//! ## No two primaries in a term
40//!
41//! This invariant is structural, not probabilistic:
42//!
43//! * a win requires a strict majority of voting members, and two strict
44//! majorities of the same set always intersect; and
45//! * the shared voter in any two majorities votes at most once per term
46//! (durable last-vote), so it cannot grant two different candidates the
47//! same term.
48//!
49//! Therefore at most one candidate can collect a majority in a given term,
50//! even under an arbitrary network partition. The partition tests exercise
51//! exactly this.
52//!
53//! ## Module shape
54//!
55//! Like [`super::failover`], the candidate-side [`ElectionCoordinator::run`]
56//! is a **pure state machine**: the clock, the per-peer vote RPC, the
57//! durable term bump, and the promotion are injected behind
58//! [`ElectionTransport`], so the whole election is exercised
59//! deterministically with a scripted fake — no clock, no network, no engine.
60//! The voter-side [`Voter`] wraps a [`LastVoteStore`] (durable on disk in
61//! production, in-memory in tests) and applies the vote rule.
62//!
63//! [#836]: https://github.com/reddb-io/reddb/issues/836
64
65use std::time::Duration;
66
67use crate::serde_json::{self, Value as JsonValue};
68
69// ---------------------------------------------------------------
70// Membership model
71// ---------------------------------------------------------------
72
73/// Whether a member holds data (and can therefore be promoted to primary)
74/// or is a vote-only witness (ADR 0030 — "a node that runs only the
75/// supervisor module").
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum MemberKind {
78 /// Holds data; can vote and can stand for election.
79 Data,
80 /// Vote-only witness ([#836]); counts toward quorum, never primary.
81 ///
82 /// [#836]: https://github.com/reddb-io/reddb/issues/836
83 Witness,
84}
85
86/// Whether a member currently participates in voting.
87///
88/// A data replica that is still catching up (has not reached a healthy,
89/// watermark-covering state) is [`VotingState::CatchingUp`] and is excluded
90/// from the voter set entirely — it neither votes nor counts toward the
91/// majority denominator. Once healthy it becomes [`VotingState::Voting`].
92/// Witnesses are always [`VotingState::Voting`].
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum VotingState {
95 /// Healthy member that participates in quorum.
96 Voting,
97 /// Replica still syncing; non-voting until healthy.
98 CatchingUp,
99}
100
101/// A cluster member as seen by the supervisor's membership view.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Member {
104 /// Stable node identity (matches the replica registry / ack id).
105 pub id: String,
106 pub kind: MemberKind,
107 pub state: VotingState,
108}
109
110impl Member {
111 pub fn data_voting(id: impl Into<String>) -> Self {
112 Self {
113 id: id.into(),
114 kind: MemberKind::Data,
115 state: VotingState::Voting,
116 }
117 }
118
119 pub fn data_catching_up(id: impl Into<String>) -> Self {
120 Self {
121 id: id.into(),
122 kind: MemberKind::Data,
123 state: VotingState::CatchingUp,
124 }
125 }
126
127 pub fn witness(id: impl Into<String>) -> Self {
128 Self {
129 id: id.into(),
130 kind: MemberKind::Witness,
131 state: VotingState::Voting,
132 }
133 }
134
135 /// Does this member count toward quorum? Only healthy members vote;
136 /// a catching-up replica is non-voting (ADR 0030).
137 pub fn is_voter(&self) -> bool {
138 matches!(self.state, VotingState::Voting)
139 }
140
141 /// May this member stand for election? Only a healthy, data-bearing
142 /// member can become primary — a witness holds no data and a
143 /// catching-up replica is not healthy.
144 pub fn is_electable(&self) -> bool {
145 self.kind == MemberKind::Data && self.is_voter()
146 }
147}
148
149/// Quorum threshold for a set of members: a strict majority of the
150/// *voting* members. Witnesses count; catching-up replicas do not.
151///
152/// For `n` voting members the threshold is `floor(n/2) + 1`, the smallest
153/// count such that two qualifying sets always intersect — the structural
154/// basis for "no two primaries in a term".
155pub fn quorum_threshold(members: &[Member]) -> usize {
156 let voters = members.iter().filter(|m| m.is_voter()).count();
157 voters / 2 + 1
158}
159
160// ---------------------------------------------------------------
161// Durable last-vote
162// ---------------------------------------------------------------
163
164/// A node's durable voting record: the highest term it has participated in
165/// and who, if anyone, it granted that term. Persisted so a restart cannot
166/// erase the fact that a vote was already cast (requirement 2).
167#[derive(Debug, Clone, PartialEq, Eq, Default)]
168pub struct LastVote {
169 /// Highest term this node has observed in a (real) vote request.
170 pub term: u64,
171 /// Who this node granted `term` to, if anyone yet.
172 pub voted_for: Option<String>,
173}
174
175impl LastVote {
176 fn to_json(&self) -> JsonValue {
177 let mut obj = serde_json::Map::new();
178 obj.insert("term".to_string(), JsonValue::Number(self.term as f64));
179 obj.insert(
180 "voted_for".to_string(),
181 match &self.voted_for {
182 Some(id) => JsonValue::String(id.clone()),
183 None => JsonValue::Null,
184 },
185 );
186 JsonValue::Object(obj)
187 }
188
189 fn from_json(value: &JsonValue) -> Result<Self, LastVoteError> {
190 let obj = value.as_object().ok_or_else(|| {
191 LastVoteError::InvalidFormat("last-vote json is not an object".into())
192 })?;
193 let term = obj
194 .get("term")
195 .and_then(JsonValue::as_u64)
196 .ok_or_else(|| LastVoteError::InvalidFormat("missing term".into()))?;
197 let voted_for = match obj.get("voted_for") {
198 None | Some(JsonValue::Null) => None,
199 Some(JsonValue::String(s)) => Some(s.clone()),
200 Some(_) => {
201 return Err(LastVoteError::InvalidFormat(
202 "voted_for must be a string or null".into(),
203 ))
204 }
205 };
206 Ok(Self { term, voted_for })
207 }
208}
209
210#[derive(Debug)]
211pub enum LastVoteError {
212 Io(std::io::Error),
213 InvalidFormat(String),
214}
215
216impl std::fmt::Display for LastVoteError {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 match self {
219 Self::Io(err) => write!(f, "last-vote io error: {err}"),
220 Self::InvalidFormat(msg) => write!(f, "invalid last-vote format: {msg}"),
221 }
222 }
223}
224
225impl std::error::Error for LastVoteError {}
226
227/// Durable store for a node's last vote. The contract is narrow on purpose:
228/// `load` returns the persisted record (or the default `term 0, voted_for
229/// None` when nothing was ever written), and `persist` makes a record
230/// durable *before* the caller acknowledges a grant.
231pub trait LastVoteStore {
232 fn load(&self) -> Result<LastVote, LastVoteError>;
233 fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError>;
234}
235
236/// In-memory last-vote store for tests and witnesses that do not need
237/// cross-restart durability. (A witness *should* still persist in
238/// production; the file store is used there.)
239#[derive(Debug, Default)]
240pub struct MemoryLastVoteStore {
241 inner: std::sync::Mutex<LastVote>,
242}
243
244impl MemoryLastVoteStore {
245 pub fn new() -> Self {
246 Self::default()
247 }
248
249 /// Seed an initial record — used by tests to simulate a node that
250 /// already voted before a restart.
251 pub fn seeded(vote: LastVote) -> Self {
252 Self {
253 inner: std::sync::Mutex::new(vote),
254 }
255 }
256}
257
258impl LastVoteStore for MemoryLastVoteStore {
259 fn load(&self) -> Result<LastVote, LastVoteError> {
260 Ok(self.inner.lock().expect("last-vote mutex").clone())
261 }
262
263 fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
264 *self.inner.lock().expect("last-vote mutex") = vote.clone();
265 Ok(())
266 }
267}
268
269/// File-backed last-vote store. Persists the record alongside the node's
270/// other durable replication state. The write is atomic (temp file +
271/// rename) so a crash mid-write never yields a torn record — either the
272/// old vote or the new one survives, never a half of each.
273pub struct FileLastVoteStore {
274 path: std::path::PathBuf,
275}
276
277impl FileLastVoteStore {
278 pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
279 Self { path: path.into() }
280 }
281}
282
283impl LastVoteStore for FileLastVoteStore {
284 fn load(&self) -> Result<LastVote, LastVoteError> {
285 match std::fs::read(&self.path) {
286 Ok(bytes) => {
287 let json: JsonValue = serde_json::from_slice(&bytes)
288 .map_err(|err| LastVoteError::InvalidFormat(format!("parse: {err}")))?;
289 LastVote::from_json(&json)
290 }
291 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(LastVote::default()),
292 Err(err) => Err(LastVoteError::Io(err)),
293 }
294 }
295
296 fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
297 let bytes = serde_json::to_vec(&vote.to_json())
298 .map_err(|err| LastVoteError::InvalidFormat(format!("serialize: {err}")))?;
299 if let Some(parent) = self.path.parent() {
300 std::fs::create_dir_all(parent).map_err(LastVoteError::Io)?;
301 }
302 let tmp = self.path.with_extension("lastvote.tmp");
303 std::fs::write(&tmp, &bytes).map_err(LastVoteError::Io)?;
304 // Atomic publish: rename over the live file. fsync the file before
305 // rename so the bytes are durable, not just in the page cache.
306 if let Ok(f) = std::fs::File::open(&tmp) {
307 let _ = f.sync_all();
308 }
309 std::fs::rename(&tmp, &self.path).map_err(LastVoteError::Io)?;
310 // fsync the parent directory so the rename itself is durable. Without
311 // this, a crash after the rename could leave the directory entry still
312 // pointing at the old record — which would let a restarted voter
313 // double-vote, the exact failure the durable last-vote forbids.
314 if let Some(parent) = self.path.parent() {
315 if let Ok(dir) = std::fs::File::open(parent) {
316 let _ = dir.sync_all();
317 }
318 }
319 Ok(())
320 }
321}
322
323// ---------------------------------------------------------------
324// Vote request / decision
325// ---------------------------------------------------------------
326
327/// A request for a vote, sent by a candidate to a voter.
328#[derive(Debug, Clone, PartialEq, Eq)]
329pub struct VoteRequest {
330 /// The candidate's stable identity.
331 pub candidate_id: String,
332 /// The term the candidate is standing for. For a real election this is
333 /// `current_term + 1`; for a dry-run probe it is the *prospective* term
334 /// the candidate would stand for, evaluated without committing to it.
335 pub term: u64,
336 /// The candidate's log frontier — the highest LSN durably in its log.
337 /// The watermark rule compares this against the commit watermark.
338 pub last_log_lsn: u64,
339 /// A dry-run probe gathers a would-be vote without persisting it or
340 /// advancing the voter's term (requirement 1). A real election sets
341 /// this `false`, which is the only path that persists a last-vote.
342 pub dry_run: bool,
343}
344
345impl VoteRequest {
346 pub fn probe(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
347 Self {
348 candidate_id: candidate_id.into(),
349 term,
350 last_log_lsn,
351 dry_run: true,
352 }
353 }
354
355 pub fn real(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
356 Self {
357 candidate_id: candidate_id.into(),
358 term,
359 last_log_lsn,
360 dry_run: false,
361 }
362 }
363}
364
365/// Why a voter refused a candidate.
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub enum RefusalReason {
368 /// The candidate's log does not cover the commit watermark, so an
369 /// acknowledged synchronous write could be lost — the safety core
370 /// refuses (requirement 3).
371 WatermarkNotCovered { candidate_lsn: u64, watermark: u64 },
372 /// The candidate's term is not newer than a term this voter already
373 /// participated in, and the voter already granted that term to someone
374 /// else (durable double-vote guard, requirement 2).
375 AlreadyVoted { term: u64, voted_for: String },
376 /// The candidate's term is older than the voter's current term — a
377 /// stale candidate from a superseded term.
378 StaleTerm {
379 candidate_term: u64,
380 voter_term: u64,
381 },
382}
383
384/// The outcome of a voter considering a [`VoteRequest`].
385#[derive(Debug, Clone, PartialEq, Eq)]
386pub enum VoteDecision {
387 /// Vote granted. For a real (non-dry-run) request the grant has already
388 /// been persisted durably before this value was produced.
389 Granted,
390 /// Vote refused, with the reason.
391 Refused(RefusalReason),
392}
393
394impl VoteDecision {
395 pub fn is_granted(&self) -> bool {
396 matches!(self, VoteDecision::Granted)
397 }
398}
399
400// ---------------------------------------------------------------
401// Voter (voter-side vote rule)
402// ---------------------------------------------------------------
403
404/// A voting member. Wraps the durable [`LastVoteStore`] and applies the
405/// vote rule. The voter is the seat of correctness: the watermark rule and
406/// the durable double-vote guard both live here.
407pub struct Voter<S: LastVoteStore> {
408 id: String,
409 store: S,
410}
411
412impl<S: LastVoteStore> Voter<S> {
413 pub fn new(id: impl Into<String>, store: S) -> Self {
414 Self {
415 id: id.into(),
416 store,
417 }
418 }
419
420 pub fn id(&self) -> &str {
421 &self.id
422 }
423
424 /// This voter's current term — the highest term it has durably recorded.
425 pub fn current_term(&self) -> Result<u64, LastVoteError> {
426 Ok(self.store.load()?.term)
427 }
428
429 /// Consider a vote request against the current `commit_watermark`.
430 ///
431 /// The decision order is deliberate:
432 ///
433 /// 1. **Watermark first** — the safety core. A candidate that cannot
434 /// carry an acknowledged write is refused regardless of term, so the
435 /// durability guarantee can never be traded away for liveness.
436 /// 2. **Stale term** — reject candidates from a superseded term.
437 /// 3. **Double-vote guard** — within a term, a voter grants exactly one
438 /// candidate; a re-ask by the *same* candidate is idempotently
439 /// re-granted.
440 ///
441 /// For a real (non-dry-run) grant, the new `(term, candidate)` is
442 /// persisted **before** `Granted` is returned, so the durability holds
443 /// across a crash at any point after the caller observes the grant.
444 pub fn consider(
445 &self,
446 req: &VoteRequest,
447 commit_watermark: u64,
448 ) -> Result<VoteDecision, LastVoteError> {
449 // 1. Watermark rule — never relaxed, checked before anything else.
450 if req.last_log_lsn < commit_watermark {
451 return Ok(VoteDecision::Refused(RefusalReason::WatermarkNotCovered {
452 candidate_lsn: req.last_log_lsn,
453 watermark: commit_watermark,
454 }));
455 }
456
457 let last = self.store.load()?;
458
459 // 2. Stale term — the candidate is behind a term we already moved past.
460 if req.term < last.term {
461 return Ok(VoteDecision::Refused(RefusalReason::StaleTerm {
462 candidate_term: req.term,
463 voter_term: last.term,
464 }));
465 }
466
467 // 3. Double-vote guard within the *same* term.
468 if req.term == last.term {
469 match &last.voted_for {
470 // Already voted for someone else this term — refuse.
471 Some(other) if other != &req.candidate_id => {
472 return Ok(VoteDecision::Refused(RefusalReason::AlreadyVoted {
473 term: last.term,
474 voted_for: other.clone(),
475 }));
476 }
477 // Already voted for this same candidate — idempotent re-grant.
478 Some(_) => return Ok(VoteDecision::Granted),
479 // Same term observed but no vote cast yet — fall through to grant.
480 None => {}
481 }
482 }
483
484 // Grant. A dry-run probe must not persist or advance the term
485 // (requirement 1); a real grant persists durably before acking.
486 if !req.dry_run {
487 self.store.persist(&LastVote {
488 term: req.term,
489 voted_for: Some(req.candidate_id.clone()),
490 })?;
491 }
492 Ok(VoteDecision::Granted)
493 }
494}
495
496// ---------------------------------------------------------------
497// Randomized election timeout
498// ---------------------------------------------------------------
499
500/// A randomized election timeout in `[base, base + jitter)`.
501///
502/// Randomization keeps candidates from standing in lockstep, which is what
503/// makes split votes rare and self-correcting (requirement 4). The function
504/// is pure in `seed` so tests pin a deterministic value; production passes
505/// an entropy-derived seed.
506pub fn randomized_election_timeout(base: Duration, jitter: Duration, seed: u64) -> Duration {
507 if jitter.is_zero() {
508 return base;
509 }
510 let jitter_ms = jitter.as_millis().max(1) as u64;
511 base + Duration::from_millis(seed % jitter_ms)
512}
513
514// ---------------------------------------------------------------
515// ElectionCoordinator (candidate-side state machine)
516// ---------------------------------------------------------------
517
518/// A request to run an election on behalf of `candidate`.
519#[derive(Debug, Clone, PartialEq, Eq)]
520pub struct ElectionRequest {
521 /// The candidate standing for election. Must be electable (a healthy,
522 /// data-bearing member) or [`ElectionCoordinator::run`] refuses up front.
523 pub candidate: Member,
524 /// The term the cluster is serving now. A real election stands for
525 /// `current_term + 1`.
526 pub current_term: u64,
527 /// The candidate's log frontier — the highest LSN durably in its log.
528 pub last_log_lsn: u64,
529 /// The commit watermark the candidate believes is in force. The
530 /// candidate must itself cover it to be electable; voters re-check
531 /// against their own watermark view.
532 pub commit_watermark: u64,
533}
534
535impl ElectionRequest {
536 /// The term a successful election produces.
537 pub fn new_term(&self) -> u64 {
538 self.current_term + 1
539 }
540}
541
542/// The result of an election attempt.
543#[derive(Debug, Clone, PartialEq, Eq)]
544pub enum ElectionOutcome {
545 /// Won a majority and was promoted under `term`. `votes`/`needed` record
546 /// the tally (including the candidate's self-vote).
547 Elected {
548 term: u64,
549 votes: usize,
550 needed: usize,
551 },
552 /// The dry-run probe did not reach a majority, so no term was bumped and
553 /// no real election was attempted (requirement 1).
554 ProbeFailed { votes: usize, needed: usize },
555 /// A real election was attempted (term bumped) but did not reach a
556 /// majority — e.g. votes split or peers came online between probe and
557 /// election. The term has advanced; a later attempt stands for a higher
558 /// term.
559 Lost {
560 term: u64,
561 votes: usize,
562 needed: usize,
563 },
564 /// The candidate is not electable (a witness, or a catching-up replica,
565 /// or its own log does not cover the watermark). No probe was sent.
566 NotElectable,
567 /// The election ran past its randomized timeout before collecting a
568 /// majority. No promotion happened.
569 TimedOut { votes: usize, needed: usize },
570}
571
572impl ElectionOutcome {
573 pub fn is_elected(&self) -> bool {
574 matches!(self, ElectionOutcome::Elected { .. })
575 }
576}
577
578/// Cluster operations the candidate drives, injected so the state machine
579/// stays pure and deterministically testable. Production backs these onto
580/// the membership view, the per-peer vote RPC, the durable term store, and
581/// the FAILOVER handover; tests back them onto a scripted fake.
582pub trait ElectionTransport {
583 /// The candidate's current view of cluster membership. The denominator
584 /// for the majority is the *voting* members of this set.
585 fn members(&self) -> Vec<Member>;
586
587 /// Ask one peer for its vote. The candidate never asks itself (it always
588 /// self-grants). Implementors route this to the peer's [`Voter`].
589 fn request_vote(&mut self, peer_id: &str, req: &VoteRequest) -> VoteDecision;
590
591 /// Time elapsed since the election began, so the coordinator enforces
592 /// the randomized timeout without owning a clock.
593 fn elapsed(&self) -> Duration;
594
595 /// Durably advance this node's current term to `new_term`. Called once,
596 /// only when a real election begins (never for a dry-run). Persisted
597 /// alongside the node's other durable replication state.
598 fn bump_term(&mut self, new_term: u64);
599
600 /// Promote the candidate to primary under `new_term`, reusing the
601 /// FAILOVER handover machinery ([`super::failover`]). Called only after
602 /// a majority is collected in the real election.
603 fn promote(&mut self, new_term: u64);
604}
605
606/// The quorum-gated election state machine.
607pub struct ElectionCoordinator;
608
609impl ElectionCoordinator {
610 /// Run an election for `req`, driving the cluster through `tx`, bounded
611 /// by `timeout` (use [`randomized_election_timeout`]).
612 ///
613 /// The flow is: electability guard → dry-run probe (no term bump) →
614 /// real election (bump term, collect votes) → promote on majority. See
615 /// the module docs for the full contract.
616 pub fn run(
617 req: &ElectionRequest,
618 tx: &mut dyn ElectionTransport,
619 timeout: Duration,
620 ) -> ElectionOutcome {
621 // Electability guard. A witness or catching-up replica may not
622 // stand; nor may a candidate whose own log does not cover the
623 // watermark (it would violate the safety core the instant it won).
624 if !req.candidate.is_electable() || req.last_log_lsn < req.commit_watermark {
625 return ElectionOutcome::NotElectable;
626 }
627
628 let members = tx.members();
629 let needed = quorum_threshold(&members);
630 let new_term = req.new_term();
631
632 // The peers we ask: every *other* voting member. The candidate
633 // self-grants, so it is one vote without an RPC.
634 let peers: Vec<String> = members
635 .iter()
636 .filter(|m| m.is_voter() && m.id != req.candidate.id)
637 .map(|m| m.id.clone())
638 .collect();
639
640 // ---- Phase 1: dry-run probe (does NOT bump the term) ----
641 let probe = VoteRequest::probe(&req.candidate.id, new_term, req.last_log_lsn);
642 let probe_votes = match Self::collect(tx, &peers, &probe, needed, timeout) {
643 CollectResult::Reached(v) => v,
644 CollectResult::Exhausted(v) => {
645 return ElectionOutcome::ProbeFailed { votes: v, needed }
646 }
647 CollectResult::TimedOut(v) => return ElectionOutcome::TimedOut { votes: v, needed },
648 };
649 debug_assert!(probe_votes >= needed);
650
651 // ---- Phase 2: real election (bumps the term, then collects) ----
652 tx.bump_term(new_term);
653 let ballot = VoteRequest::real(&req.candidate.id, new_term, req.last_log_lsn);
654 match Self::collect(tx, &peers, &ballot, needed, timeout) {
655 CollectResult::Reached(votes) => {
656 tx.promote(new_term);
657 ElectionOutcome::Elected {
658 term: new_term,
659 votes,
660 needed,
661 }
662 }
663 CollectResult::Exhausted(votes) => ElectionOutcome::Lost {
664 term: new_term,
665 votes,
666 needed,
667 },
668 CollectResult::TimedOut(votes) => ElectionOutcome::TimedOut { votes, needed },
669 }
670 }
671
672 /// Collect votes from `peers`, starting at 1 for the candidate's
673 /// self-vote, until `needed` is reached, the peers are exhausted, or the
674 /// timeout elapses. Stops early on success — no need to ask the rest.
675 fn collect(
676 tx: &mut dyn ElectionTransport,
677 peers: &[String],
678 req: &VoteRequest,
679 needed: usize,
680 timeout: Duration,
681 ) -> CollectResult {
682 let mut votes = 1usize; // self-vote
683 if votes >= needed {
684 return CollectResult::Reached(votes);
685 }
686 for peer in peers {
687 if tx.elapsed() >= timeout {
688 return CollectResult::TimedOut(votes);
689 }
690 if tx.request_vote(peer, req).is_granted() {
691 votes += 1;
692 if votes >= needed {
693 return CollectResult::Reached(votes);
694 }
695 }
696 }
697 CollectResult::Exhausted(votes)
698 }
699}
700
701enum CollectResult {
702 Reached(usize),
703 Exhausted(usize),
704 TimedOut(usize),
705}
706
707#[cfg(test)]
708mod tests;