Skip to main content

reddb_server/replication/
election.rs

1//! Term-based, quorum-gated automatic election (issue #834, PRD #819, ADR 0030).
2//!
3//! This is the consensus core that turns a primary loss into an automatic,
4//! *safe* promotion. It lives in the first-party-but-decoupled control-plane
5//! supervisor (ADR 0030) — distinct from the data path — and reuses the two
6//! pieces the rest of replication already built:
7//!
8//! * the **commit watermark** ([`super::commit_waiter`] / [`super::quorum`]) —
9//!   the highest LSN durably replicated to a quorum that intersects every
10//!   possible election majority. Nothing at or below it may ever be rolled
11//!   back; and
12//! * the **FAILOVER handover machinery** ([`super::failover`]) — once a
13//!   candidate wins, promotion is driven through the same coordinated
14//!   role-swap, not a parallel state machine.
15//!
16//! ## The five hard requirements (ADR 0030, issue #834)
17//!
18//! 1. **Dry-run probe.** A candidate first asks "*would* you vote for me?"
19//!    without bumping any term. Only a real election bumps the term. This
20//!    keeps a flapping candidate from burning through terms and lets the
21//!    supervisor probe liveness cheaply.
22//! 2. **Durable last-vote.** A voter persists `(term, voted_for)` *before*
23//!    acknowledging a grant, so a voter that crashes and restarts mid-term
24//!    never double-votes — the second request in the same term for a
25//!    different candidate is refused from disk.
26//! 3. **Watermark vote rule (the safety core).** A voter MUST refuse any
27//!    candidate whose log does not cover the commit watermark. An
28//!    acknowledged synchronous write sits at or below the watermark, so a
29//!    winner necessarily carries it — the write provably survives the
30//!    failover. This is the one rule that may not be relaxed.
31//! 4. **Randomized election timeouts.** Candidates wait a randomized
32//!    interval before standing, so split votes are rare and self-correcting.
33//! 5. **Membership rules.** A quorum is a *majority of voting members*.
34//!    **Witness** members ([#836]) hold no data but vote, so `2 data + 1
35//!    witness` is a valid HA shape. A **catching-up** replica is
36//!    *non-voting* until it reaches a healthy state — it neither votes nor
37//!    stands.
38//!
39//! ## No two primaries in a term
40//!
41//! This invariant is structural, not probabilistic:
42//!
43//! * a win requires a strict majority of voting members, and two strict
44//!   majorities of the same set always intersect; and
45//! * the shared voter in any two majorities votes at most once per term
46//!   (durable last-vote), so it cannot grant two different candidates the
47//!   same term.
48//!
49//! Therefore at most one candidate can collect a majority in a given term,
50//! even under an arbitrary network partition. The partition tests exercise
51//! exactly this.
52//!
53//! ## Module shape
54//!
55//! Like [`super::failover`], the candidate-side [`ElectionCoordinator::run`]
56//! is a **pure state machine**: the clock, the per-peer vote RPC, the
57//! durable term bump, and the promotion are injected behind
58//! [`ElectionTransport`], so the whole election is exercised
59//! deterministically with a scripted fake — no clock, no network, no engine.
60//! The voter-side [`Voter`] wraps a [`LastVoteStore`] (durable on disk in
61//! production, in-memory in tests) and applies the vote rule.
62//!
63//! [#836]: https://github.com/reddb-io/reddb/issues/836
64
65use std::time::Duration;
66
67use crate::serde_json::{self, Value as JsonValue};
68
69// ---------------------------------------------------------------
70// Membership model
71// ---------------------------------------------------------------
72
73/// Whether a member holds data (and can therefore be promoted to primary)
74/// or is a vote-only witness (ADR 0030 — "a node that runs only the
75/// supervisor module").
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum MemberKind {
78    /// Holds data; can vote and can stand for election.
79    Data,
80    /// Vote-only witness ([#836]); counts toward quorum, never primary.
81    ///
82    /// [#836]: https://github.com/reddb-io/reddb/issues/836
83    Witness,
84}
85
86/// Whether a member currently participates in voting.
87///
88/// A data replica that is still catching up (has not reached a healthy,
89/// watermark-covering state) is [`VotingState::CatchingUp`] and is excluded
90/// from the voter set entirely — it neither votes nor counts toward the
91/// majority denominator. Once healthy it becomes [`VotingState::Voting`].
92/// Witnesses are always [`VotingState::Voting`].
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum VotingState {
95    /// Healthy member that participates in quorum.
96    Voting,
97    /// Replica still syncing; non-voting until healthy.
98    CatchingUp,
99}
100
101/// A cluster member as seen by the supervisor's membership view.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Member {
104    /// Stable node identity (matches the replica registry / ack id).
105    pub id: String,
106    pub kind: MemberKind,
107    pub state: VotingState,
108}
109
110impl Member {
111    pub fn data_voting(id: impl Into<String>) -> Self {
112        Self {
113            id: id.into(),
114            kind: MemberKind::Data,
115            state: VotingState::Voting,
116        }
117    }
118
119    pub fn data_catching_up(id: impl Into<String>) -> Self {
120        Self {
121            id: id.into(),
122            kind: MemberKind::Data,
123            state: VotingState::CatchingUp,
124        }
125    }
126
127    pub fn witness(id: impl Into<String>) -> Self {
128        Self {
129            id: id.into(),
130            kind: MemberKind::Witness,
131            state: VotingState::Voting,
132        }
133    }
134
135    /// Does this member count toward quorum? Only healthy members vote;
136    /// a catching-up replica is non-voting (ADR 0030).
137    pub fn is_voter(&self) -> bool {
138        matches!(self.state, VotingState::Voting)
139    }
140
141    /// May this member stand for election? Only a healthy, data-bearing
142    /// member can become primary — a witness holds no data and a
143    /// catching-up replica is not healthy.
144    pub fn is_electable(&self) -> bool {
145        self.kind == MemberKind::Data && self.is_voter()
146    }
147}
148
149/// Quorum threshold for a set of members: a strict majority of the
150/// *voting* members. Witnesses count; catching-up replicas do not.
151///
152/// For `n` voting members the threshold is `floor(n/2) + 1`, the smallest
153/// count such that two qualifying sets always intersect — the structural
154/// basis for "no two primaries in a term".
155pub fn quorum_threshold(members: &[Member]) -> usize {
156    let voters = members.iter().filter(|m| m.is_voter()).count();
157    voters / 2 + 1
158}
159
160// ---------------------------------------------------------------
161// Durable last-vote
162// ---------------------------------------------------------------
163
164/// A node's durable voting record: the highest term it has participated in
165/// and who, if anyone, it granted that term. Persisted so a restart cannot
166/// erase the fact that a vote was already cast (requirement 2).
167#[derive(Debug, Clone, PartialEq, Eq, Default)]
168pub struct LastVote {
169    /// Highest term this node has observed in a (real) vote request.
170    pub term: u64,
171    /// Who this node granted `term` to, if anyone yet.
172    pub voted_for: Option<String>,
173}
174
175impl LastVote {
176    fn to_json(&self) -> JsonValue {
177        let mut obj = serde_json::Map::new();
178        obj.insert("term".to_string(), JsonValue::Number(self.term as f64));
179        obj.insert(
180            "voted_for".to_string(),
181            match &self.voted_for {
182                Some(id) => JsonValue::String(id.clone()),
183                None => JsonValue::Null,
184            },
185        );
186        JsonValue::Object(obj)
187    }
188
189    fn from_json(value: &JsonValue) -> Result<Self, LastVoteError> {
190        let obj = value.as_object().ok_or_else(|| {
191            LastVoteError::InvalidFormat("last-vote json is not an object".into())
192        })?;
193        let term = obj
194            .get("term")
195            .and_then(JsonValue::as_u64)
196            .ok_or_else(|| LastVoteError::InvalidFormat("missing term".into()))?;
197        let voted_for = match obj.get("voted_for") {
198            None | Some(JsonValue::Null) => None,
199            Some(JsonValue::String(s)) => Some(s.clone()),
200            Some(_) => {
201                return Err(LastVoteError::InvalidFormat(
202                    "voted_for must be a string or null".into(),
203                ))
204            }
205        };
206        Ok(Self { term, voted_for })
207    }
208}
209
210#[derive(Debug)]
211pub enum LastVoteError {
212    Io(std::io::Error),
213    InvalidFormat(String),
214}
215
216impl std::fmt::Display for LastVoteError {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        match self {
219            Self::Io(err) => write!(f, "last-vote io error: {err}"),
220            Self::InvalidFormat(msg) => write!(f, "invalid last-vote format: {msg}"),
221        }
222    }
223}
224
225impl std::error::Error for LastVoteError {}
226
227/// Durable store for a node's last vote. The contract is narrow on purpose:
228/// `load` returns the persisted record (or the default `term 0, voted_for
229/// None` when nothing was ever written), and `persist` makes a record
230/// durable *before* the caller acknowledges a grant.
231pub trait LastVoteStore {
232    fn load(&self) -> Result<LastVote, LastVoteError>;
233    fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError>;
234}
235
236/// In-memory last-vote store for tests and witnesses that do not need
237/// cross-restart durability. (A witness *should* still persist in
238/// production; the file store is used there.)
239#[derive(Debug, Default)]
240pub struct MemoryLastVoteStore {
241    inner: std::sync::Mutex<LastVote>,
242}
243
244impl MemoryLastVoteStore {
245    pub fn new() -> Self {
246        Self::default()
247    }
248
249    /// Seed an initial record — used by tests to simulate a node that
250    /// already voted before a restart.
251    pub fn seeded(vote: LastVote) -> Self {
252        Self {
253            inner: std::sync::Mutex::new(vote),
254        }
255    }
256}
257
258impl LastVoteStore for MemoryLastVoteStore {
259    fn load(&self) -> Result<LastVote, LastVoteError> {
260        Ok(self.inner.lock().expect("last-vote mutex").clone())
261    }
262
263    fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
264        *self.inner.lock().expect("last-vote mutex") = vote.clone();
265        Ok(())
266    }
267}
268
269/// File-backed last-vote store. Persists the record alongside the node's
270/// other durable replication state. The write is atomic (temp file +
271/// rename) so a crash mid-write never yields a torn record — either the
272/// old vote or the new one survives, never a half of each.
273pub struct FileLastVoteStore {
274    path: std::path::PathBuf,
275}
276
277impl FileLastVoteStore {
278    pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
279        Self { path: path.into() }
280    }
281}
282
283impl LastVoteStore for FileLastVoteStore {
284    fn load(&self) -> Result<LastVote, LastVoteError> {
285        match std::fs::read(&self.path) {
286            Ok(bytes) => {
287                let json: JsonValue = serde_json::from_slice(&bytes)
288                    .map_err(|err| LastVoteError::InvalidFormat(format!("parse: {err}")))?;
289                LastVote::from_json(&json)
290            }
291            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(LastVote::default()),
292            Err(err) => Err(LastVoteError::Io(err)),
293        }
294    }
295
296    fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
297        let bytes = serde_json::to_vec(&vote.to_json())
298            .map_err(|err| LastVoteError::InvalidFormat(format!("serialize: {err}")))?;
299        if let Some(parent) = self.path.parent() {
300            std::fs::create_dir_all(parent).map_err(LastVoteError::Io)?;
301        }
302        let tmp = self.path.with_extension("lastvote.tmp");
303        std::fs::write(&tmp, &bytes).map_err(LastVoteError::Io)?;
304        // Atomic publish: rename over the live file. fsync the file before
305        // rename so the bytes are durable, not just in the page cache.
306        if let Ok(f) = std::fs::File::open(&tmp) {
307            let _ = f.sync_all();
308        }
309        std::fs::rename(&tmp, &self.path).map_err(LastVoteError::Io)?;
310        // fsync the parent directory so the rename itself is durable. Without
311        // this, a crash after the rename could leave the directory entry still
312        // pointing at the old record — which would let a restarted voter
313        // double-vote, the exact failure the durable last-vote forbids.
314        if let Some(parent) = self.path.parent() {
315            if let Ok(dir) = std::fs::File::open(parent) {
316                let _ = dir.sync_all();
317            }
318        }
319        Ok(())
320    }
321}
322
323// ---------------------------------------------------------------
324// Vote request / decision
325// ---------------------------------------------------------------
326
327/// A request for a vote, sent by a candidate to a voter.
328#[derive(Debug, Clone, PartialEq, Eq)]
329pub struct VoteRequest {
330    /// The candidate's stable identity.
331    pub candidate_id: String,
332    /// The term the candidate is standing for. For a real election this is
333    /// `current_term + 1`; for a dry-run probe it is the *prospective* term
334    /// the candidate would stand for, evaluated without committing to it.
335    pub term: u64,
336    /// The candidate's log frontier — the highest LSN durably in its log.
337    /// The watermark rule compares this against the commit watermark.
338    pub last_log_lsn: u64,
339    /// A dry-run probe gathers a would-be vote without persisting it or
340    /// advancing the voter's term (requirement 1). A real election sets
341    /// this `false`, which is the only path that persists a last-vote.
342    pub dry_run: bool,
343}
344
345impl VoteRequest {
346    pub fn probe(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
347        Self {
348            candidate_id: candidate_id.into(),
349            term,
350            last_log_lsn,
351            dry_run: true,
352        }
353    }
354
355    pub fn real(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
356        Self {
357            candidate_id: candidate_id.into(),
358            term,
359            last_log_lsn,
360            dry_run: false,
361        }
362    }
363}
364
365/// Why a voter refused a candidate.
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub enum RefusalReason {
368    /// The candidate's log does not cover the commit watermark, so an
369    /// acknowledged synchronous write could be lost — the safety core
370    /// refuses (requirement 3).
371    WatermarkNotCovered { candidate_lsn: u64, watermark: u64 },
372    /// The candidate's term is not newer than a term this voter already
373    /// participated in, and the voter already granted that term to someone
374    /// else (durable double-vote guard, requirement 2).
375    AlreadyVoted { term: u64, voted_for: String },
376    /// The candidate's term is older than the voter's current term — a
377    /// stale candidate from a superseded term.
378    StaleTerm {
379        candidate_term: u64,
380        voter_term: u64,
381    },
382}
383
384/// The outcome of a voter considering a [`VoteRequest`].
385#[derive(Debug, Clone, PartialEq, Eq)]
386pub enum VoteDecision {
387    /// Vote granted. For a real (non-dry-run) request the grant has already
388    /// been persisted durably before this value was produced.
389    Granted,
390    /// Vote refused, with the reason.
391    Refused(RefusalReason),
392}
393
394impl VoteDecision {
395    pub fn is_granted(&self) -> bool {
396        matches!(self, VoteDecision::Granted)
397    }
398}
399
400// ---------------------------------------------------------------
401// Voter (voter-side vote rule)
402// ---------------------------------------------------------------
403
404/// A voting member. Wraps the durable [`LastVoteStore`] and applies the
405/// vote rule. The voter is the seat of correctness: the watermark rule and
406/// the durable double-vote guard both live here.
407pub struct Voter<S: LastVoteStore> {
408    id: String,
409    store: S,
410}
411
412impl<S: LastVoteStore> Voter<S> {
413    pub fn new(id: impl Into<String>, store: S) -> Self {
414        Self {
415            id: id.into(),
416            store,
417        }
418    }
419
420    pub fn id(&self) -> &str {
421        &self.id
422    }
423
424    /// This voter's current term — the highest term it has durably recorded.
425    pub fn current_term(&self) -> Result<u64, LastVoteError> {
426        Ok(self.store.load()?.term)
427    }
428
429    /// Consider a vote request against the current `commit_watermark`.
430    ///
431    /// The decision order is deliberate:
432    ///
433    /// 1. **Watermark first** — the safety core. A candidate that cannot
434    ///    carry an acknowledged write is refused regardless of term, so the
435    ///    durability guarantee can never be traded away for liveness.
436    /// 2. **Stale term** — reject candidates from a superseded term.
437    /// 3. **Double-vote guard** — within a term, a voter grants exactly one
438    ///    candidate; a re-ask by the *same* candidate is idempotently
439    ///    re-granted.
440    ///
441    /// For a real (non-dry-run) grant, the new `(term, candidate)` is
442    /// persisted **before** `Granted` is returned, so the durability holds
443    /// across a crash at any point after the caller observes the grant.
444    pub fn consider(
445        &self,
446        req: &VoteRequest,
447        commit_watermark: u64,
448    ) -> Result<VoteDecision, LastVoteError> {
449        // 1. Watermark rule — never relaxed, checked before anything else.
450        if req.last_log_lsn < commit_watermark {
451            return Ok(VoteDecision::Refused(RefusalReason::WatermarkNotCovered {
452                candidate_lsn: req.last_log_lsn,
453                watermark: commit_watermark,
454            }));
455        }
456
457        let last = self.store.load()?;
458
459        // 2. Stale term — the candidate is behind a term we already moved past.
460        if req.term < last.term {
461            return Ok(VoteDecision::Refused(RefusalReason::StaleTerm {
462                candidate_term: req.term,
463                voter_term: last.term,
464            }));
465        }
466
467        // 3. Double-vote guard within the *same* term.
468        if req.term == last.term {
469            match &last.voted_for {
470                // Already voted for someone else this term — refuse.
471                Some(other) if other != &req.candidate_id => {
472                    return Ok(VoteDecision::Refused(RefusalReason::AlreadyVoted {
473                        term: last.term,
474                        voted_for: other.clone(),
475                    }));
476                }
477                // Already voted for this same candidate — idempotent re-grant.
478                Some(_) => return Ok(VoteDecision::Granted),
479                // Same term observed but no vote cast yet — fall through to grant.
480                None => {}
481            }
482        }
483
484        // Grant. A dry-run probe must not persist or advance the term
485        // (requirement 1); a real grant persists durably before acking.
486        if !req.dry_run {
487            self.store.persist(&LastVote {
488                term: req.term,
489                voted_for: Some(req.candidate_id.clone()),
490            })?;
491        }
492        Ok(VoteDecision::Granted)
493    }
494}
495
496// ---------------------------------------------------------------
497// Randomized election timeout
498// ---------------------------------------------------------------
499
500/// A randomized election timeout in `[base, base + jitter)`.
501///
502/// Randomization keeps candidates from standing in lockstep, which is what
503/// makes split votes rare and self-correcting (requirement 4). The function
504/// is pure in `seed` so tests pin a deterministic value; production passes
505/// an entropy-derived seed.
506pub fn randomized_election_timeout(base: Duration, jitter: Duration, seed: u64) -> Duration {
507    if jitter.is_zero() {
508        return base;
509    }
510    let jitter_ms = jitter.as_millis().max(1) as u64;
511    base + Duration::from_millis(seed % jitter_ms)
512}
513
514// ---------------------------------------------------------------
515// ElectionCoordinator (candidate-side state machine)
516// ---------------------------------------------------------------
517
518/// A request to run an election on behalf of `candidate`.
519#[derive(Debug, Clone, PartialEq, Eq)]
520pub struct ElectionRequest {
521    /// The candidate standing for election. Must be electable (a healthy,
522    /// data-bearing member) or [`ElectionCoordinator::run`] refuses up front.
523    pub candidate: Member,
524    /// The term the cluster is serving now. A real election stands for
525    /// `current_term + 1`.
526    pub current_term: u64,
527    /// The candidate's log frontier — the highest LSN durably in its log.
528    pub last_log_lsn: u64,
529    /// The commit watermark the candidate believes is in force. The
530    /// candidate must itself cover it to be electable; voters re-check
531    /// against their own watermark view.
532    pub commit_watermark: u64,
533}
534
535impl ElectionRequest {
536    /// The term a successful election produces.
537    pub fn new_term(&self) -> u64 {
538        self.current_term + 1
539    }
540}
541
542/// The result of an election attempt.
543#[derive(Debug, Clone, PartialEq, Eq)]
544pub enum ElectionOutcome {
545    /// Won a majority and was promoted under `term`. `votes`/`needed` record
546    /// the tally (including the candidate's self-vote).
547    Elected {
548        term: u64,
549        votes: usize,
550        needed: usize,
551    },
552    /// The dry-run probe did not reach a majority, so no term was bumped and
553    /// no real election was attempted (requirement 1).
554    ProbeFailed { votes: usize, needed: usize },
555    /// A real election was attempted (term bumped) but did not reach a
556    /// majority — e.g. votes split or peers came online between probe and
557    /// election. The term has advanced; a later attempt stands for a higher
558    /// term.
559    Lost {
560        term: u64,
561        votes: usize,
562        needed: usize,
563    },
564    /// The candidate is not electable (a witness, or a catching-up replica,
565    /// or its own log does not cover the watermark). No probe was sent.
566    NotElectable,
567    /// The election ran past its randomized timeout before collecting a
568    /// majority. No promotion happened.
569    TimedOut { votes: usize, needed: usize },
570}
571
572impl ElectionOutcome {
573    pub fn is_elected(&self) -> bool {
574        matches!(self, ElectionOutcome::Elected { .. })
575    }
576}
577
578/// Cluster operations the candidate drives, injected so the state machine
579/// stays pure and deterministically testable. Production backs these onto
580/// the membership view, the per-peer vote RPC, the durable term store, and
581/// the FAILOVER handover; tests back them onto a scripted fake.
582pub trait ElectionTransport {
583    /// The candidate's current view of cluster membership. The denominator
584    /// for the majority is the *voting* members of this set.
585    fn members(&self) -> Vec<Member>;
586
587    /// Ask one peer for its vote. The candidate never asks itself (it always
588    /// self-grants). Implementors route this to the peer's [`Voter`].
589    fn request_vote(&mut self, peer_id: &str, req: &VoteRequest) -> VoteDecision;
590
591    /// Time elapsed since the election began, so the coordinator enforces
592    /// the randomized timeout without owning a clock.
593    fn elapsed(&self) -> Duration;
594
595    /// Durably advance this node's current term to `new_term`. Called once,
596    /// only when a real election begins (never for a dry-run). Persisted
597    /// alongside the node's other durable replication state.
598    fn bump_term(&mut self, new_term: u64);
599
600    /// Promote the candidate to primary under `new_term`, reusing the
601    /// FAILOVER handover machinery ([`super::failover`]). Called only after
602    /// a majority is collected in the real election.
603    fn promote(&mut self, new_term: u64);
604}
605
606/// The quorum-gated election state machine.
607pub struct ElectionCoordinator;
608
609impl ElectionCoordinator {
610    /// Run an election for `req`, driving the cluster through `tx`, bounded
611    /// by `timeout` (use [`randomized_election_timeout`]).
612    ///
613    /// The flow is: electability guard → dry-run probe (no term bump) →
614    /// real election (bump term, collect votes) → promote on majority. See
615    /// the module docs for the full contract.
616    pub fn run(
617        req: &ElectionRequest,
618        tx: &mut dyn ElectionTransport,
619        timeout: Duration,
620    ) -> ElectionOutcome {
621        // Electability guard. A witness or catching-up replica may not
622        // stand; nor may a candidate whose own log does not cover the
623        // watermark (it would violate the safety core the instant it won).
624        if !req.candidate.is_electable() || req.last_log_lsn < req.commit_watermark {
625            return ElectionOutcome::NotElectable;
626        }
627
628        let members = tx.members();
629        let needed = quorum_threshold(&members);
630        let new_term = req.new_term();
631
632        // The peers we ask: every *other* voting member. The candidate
633        // self-grants, so it is one vote without an RPC.
634        let peers: Vec<String> = members
635            .iter()
636            .filter(|m| m.is_voter() && m.id != req.candidate.id)
637            .map(|m| m.id.clone())
638            .collect();
639
640        // ---- Phase 1: dry-run probe (does NOT bump the term) ----
641        let probe = VoteRequest::probe(&req.candidate.id, new_term, req.last_log_lsn);
642        let probe_votes = match Self::collect(tx, &peers, &probe, needed, timeout) {
643            CollectResult::Reached(v) => v,
644            CollectResult::Exhausted(v) => {
645                return ElectionOutcome::ProbeFailed { votes: v, needed }
646            }
647            CollectResult::TimedOut(v) => return ElectionOutcome::TimedOut { votes: v, needed },
648        };
649        debug_assert!(probe_votes >= needed);
650
651        // ---- Phase 2: real election (bumps the term, then collects) ----
652        tx.bump_term(new_term);
653        let ballot = VoteRequest::real(&req.candidate.id, new_term, req.last_log_lsn);
654        match Self::collect(tx, &peers, &ballot, needed, timeout) {
655            CollectResult::Reached(votes) => {
656                tx.promote(new_term);
657                ElectionOutcome::Elected {
658                    term: new_term,
659                    votes,
660                    needed,
661                }
662            }
663            CollectResult::Exhausted(votes) => ElectionOutcome::Lost {
664                term: new_term,
665                votes,
666                needed,
667            },
668            CollectResult::TimedOut(votes) => ElectionOutcome::TimedOut { votes, needed },
669        }
670    }
671
672    /// Collect votes from `peers`, starting at 1 for the candidate's
673    /// self-vote, until `needed` is reached, the peers are exhausted, or the
674    /// timeout elapses. Stops early on success — no need to ask the rest.
675    fn collect(
676        tx: &mut dyn ElectionTransport,
677        peers: &[String],
678        req: &VoteRequest,
679        needed: usize,
680        timeout: Duration,
681    ) -> CollectResult {
682        let mut votes = 1usize; // self-vote
683        if votes >= needed {
684            return CollectResult::Reached(votes);
685        }
686        for peer in peers {
687            if tx.elapsed() >= timeout {
688                return CollectResult::TimedOut(votes);
689            }
690            if tx.request_vote(peer, req).is_granted() {
691                votes += 1;
692                if votes >= needed {
693                    return CollectResult::Reached(votes);
694                }
695            }
696        }
697        CollectResult::Exhausted(votes)
698    }
699}
700
701enum CollectResult {
702    Reached(usize),
703    Exhausted(usize),
704    TimedOut(usize),
705}
706
707#[cfg(test)]
708mod tests;