Skip to main content

reddb_server/replication/
election.rs

1//! Term-based, quorum-gated automatic election (issue #834, PRD #819, ADR 0030).
2//!
3//! This is the consensus core that turns a primary loss into an automatic,
4//! *safe* promotion. It lives in the first-party-but-decoupled control-plane
5//! supervisor (ADR 0030) — distinct from the data path — and reuses the two
6//! pieces the rest of replication already built:
7//!
8//! * the **commit watermark** ([`super::commit_waiter`] / [`super::quorum`]) —
9//!   the highest LSN durably replicated to a quorum that intersects every
10//!   possible election majority. Nothing at or below it may ever be rolled
11//!   back; and
12//! * the **FAILOVER handover machinery** ([`super::failover`]) — once a
13//!   candidate wins, promotion is driven through the same coordinated
14//!   role-swap, not a parallel state machine.
15//!
16//! ## The five hard requirements (ADR 0030, issue #834)
17//!
18//! 1. **Dry-run probe.** A candidate first asks "*would* you vote for me?"
19//!    without bumping any term. Only a real election bumps the term. This
20//!    keeps a flapping candidate from burning through terms and lets the
21//!    supervisor probe liveness cheaply.
22//! 2. **Durable last-vote.** A voter persists `(term, voted_for)` *before*
23//!    acknowledging a grant, so a voter that crashes and restarts mid-term
24//!    never double-votes — the second request in the same term for a
25//!    different candidate is refused from disk.
26//! 3. **Watermark vote rule (the safety core).** A voter MUST refuse any
27//!    candidate whose log does not cover the commit watermark. An
28//!    acknowledged synchronous write sits at or below the watermark, so a
29//!    winner necessarily carries it — the write provably survives the
30//!    failover. This is the one rule that may not be relaxed.
31//! 4. **Randomized election timeouts.** Candidates wait a randomized
32//!    interval before standing, so split votes are rare and self-correcting.
33//! 5. **Membership rules.** A quorum is a *majority of voting members*.
34//!    **Witness** members ([#836]) hold no data but vote, so `2 data + 1
35//!    witness` is a valid HA shape. A **catching-up** replica is
36//!    *non-voting* until it reaches a healthy state — it neither votes nor
37//!    stands.
38//!
39//! ## No two primaries in a term
40//!
41//! This invariant is structural, not probabilistic:
42//!
43//! * a win requires a strict majority of voting members, and two strict
44//!   majorities of the same set always intersect; and
45//! * the shared voter in any two majorities votes at most once per term
46//!   (durable last-vote), so it cannot grant two different candidates the
47//!   same term.
48//!
49//! Therefore at most one candidate can collect a majority in a given term,
50//! even under an arbitrary network partition. The partition tests exercise
51//! exactly this.
52//!
53//! ## Module shape
54//!
55//! Like [`super::failover`], the candidate-side [`ElectionCoordinator::run`]
56//! is a **pure state machine**: the clock, the per-peer vote RPC, the
57//! durable term bump, and the promotion are injected behind
58//! [`ElectionTransport`], so the whole election is exercised
59//! deterministically with a scripted fake — no clock, no network, no engine.
60//! The voter-side [`Voter`] wraps a [`LastVoteStore`] (durable on disk in
61//! production, in-memory in tests) and applies the vote rule.
62//!
63//! [#836]: https://github.com/reddb-io/reddb/issues/836
64
65use std::time::Duration;
66
67pub use reddb_file::FileLastVoteStore;
68
69// ---------------------------------------------------------------
70// Membership model
71// ---------------------------------------------------------------
72
73/// Whether a member holds data (and can therefore be promoted to primary)
74/// or is a vote-only witness (ADR 0030 — "a node that runs only the
75/// supervisor module").
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum MemberKind {
78    /// Holds data; can vote and can stand for election.
79    Data,
80    /// Vote-only witness ([#836]); counts toward quorum, never primary.
81    ///
82    /// [#836]: https://github.com/reddb-io/reddb/issues/836
83    Witness,
84}
85
86/// Whether a member currently participates in voting.
87///
88/// A data replica that is still catching up (has not reached a healthy,
89/// watermark-covering state) is [`VotingState::CatchingUp`] and is excluded
90/// from the voter set entirely — it neither votes nor counts toward the
91/// majority denominator. Once healthy it becomes [`VotingState::Voting`].
92/// Witnesses are always [`VotingState::Voting`].
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum VotingState {
95    /// Healthy member that participates in quorum.
96    Voting,
97    /// Replica still syncing; non-voting until healthy.
98    CatchingUp,
99}
100
101/// A cluster member as seen by the supervisor's membership view.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Member {
104    /// Stable node identity (matches the replica registry / ack id).
105    pub id: String,
106    pub kind: MemberKind,
107    pub state: VotingState,
108}
109
110impl Member {
111    pub fn data_voting(id: impl Into<String>) -> Self {
112        Self {
113            id: id.into(),
114            kind: MemberKind::Data,
115            state: VotingState::Voting,
116        }
117    }
118
119    pub fn data_catching_up(id: impl Into<String>) -> Self {
120        Self {
121            id: id.into(),
122            kind: MemberKind::Data,
123            state: VotingState::CatchingUp,
124        }
125    }
126
127    pub fn witness(id: impl Into<String>) -> Self {
128        Self {
129            id: id.into(),
130            kind: MemberKind::Witness,
131            state: VotingState::Voting,
132        }
133    }
134
135    /// Does this member count toward quorum? Only healthy members vote;
136    /// a catching-up replica is non-voting (ADR 0030).
137    pub fn is_voter(&self) -> bool {
138        matches!(self.state, VotingState::Voting)
139    }
140
141    /// May this member stand for election? Only a healthy, data-bearing
142    /// member can become primary — a witness holds no data and a
143    /// catching-up replica is not healthy.
144    pub fn is_electable(&self) -> bool {
145        self.kind == MemberKind::Data && self.is_voter()
146    }
147}
148
149/// Quorum threshold for a set of members: a strict majority of the
150/// *voting* members. Witnesses count; catching-up replicas do not.
151///
152/// For `n` voting members the threshold is `floor(n/2) + 1`, the smallest
153/// count such that two qualifying sets always intersect — the structural
154/// basis for "no two primaries in a term".
155pub fn quorum_threshold(members: &[Member]) -> usize {
156    let voters = members.iter().filter(|m| m.is_voter()).count();
157    voters / 2 + 1
158}
159
160// ---------------------------------------------------------------
161// Durable last-vote
162// ---------------------------------------------------------------
163
164/// A node's durable voting record: the highest term it has participated in
165/// and who, if anyone, it granted that term. Persisted so a restart cannot
166/// erase the fact that a vote was already cast (requirement 2).
167#[derive(Debug, Clone, PartialEq, Eq, Default)]
168pub struct LastVote {
169    /// Highest term this node has observed in a (real) vote request.
170    pub term: u64,
171    /// Who this node granted `term` to, if anyone yet.
172    pub voted_for: Option<String>,
173}
174
175impl LastVote {
176    fn from_file(value: reddb_file::DurableLastVote) -> Self {
177        Self {
178            term: value.term,
179            voted_for: value.voted_for,
180        }
181    }
182
183    fn to_file(&self) -> reddb_file::DurableLastVote {
184        reddb_file::DurableLastVote::new(self.term, self.voted_for.clone())
185    }
186}
187
188#[derive(Debug)]
189pub enum LastVoteError {
190    Io(std::io::Error),
191    InvalidFormat(String),
192}
193
194impl std::fmt::Display for LastVoteError {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        match self {
197            Self::Io(err) => write!(f, "last-vote io error: {err}"),
198            Self::InvalidFormat(msg) => write!(f, "invalid last-vote format: {msg}"),
199        }
200    }
201}
202
203impl std::error::Error for LastVoteError {}
204
205impl From<reddb_file::RdbFileError> for LastVoteError {
206    fn from(value: reddb_file::RdbFileError) -> Self {
207        match value {
208            reddb_file::RdbFileError::Io(err) => Self::Io(err),
209            reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
210        }
211    }
212}
213
214/// Durable store for a node's last vote. The contract is narrow on purpose:
215/// `load` returns the persisted record (or the default `term 0, voted_for
216/// None` when nothing was ever written), and `persist` makes a record
217/// durable *before* the caller acknowledges a grant.
218pub trait LastVoteStore {
219    fn load(&self) -> Result<LastVote, LastVoteError>;
220    fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError>;
221}
222
223/// In-memory last-vote store for tests and witnesses that do not need
224/// cross-restart durability. (A witness *should* still persist in
225/// production; the file store is used there.)
226#[derive(Debug, Default)]
227pub struct MemoryLastVoteStore {
228    inner: std::sync::Mutex<LastVote>,
229}
230
231impl MemoryLastVoteStore {
232    pub fn new() -> Self {
233        Self::default()
234    }
235
236    /// Seed an initial record — used by tests to simulate a node that
237    /// already voted before a restart.
238    pub fn seeded(vote: LastVote) -> Self {
239        Self {
240            inner: std::sync::Mutex::new(vote),
241        }
242    }
243}
244
245impl LastVoteStore for MemoryLastVoteStore {
246    fn load(&self) -> Result<LastVote, LastVoteError> {
247        Ok(self.inner.lock().expect("last-vote mutex").clone())
248    }
249
250    fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
251        *self.inner.lock().expect("last-vote mutex") = vote.clone();
252        Ok(())
253    }
254}
255
256impl LastVoteStore for FileLastVoteStore {
257    fn load(&self) -> Result<LastVote, LastVoteError> {
258        self.load_file()
259            .map(LastVote::from_file)
260            .map_err(LastVoteError::from)
261    }
262
263    fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
264        self.persist_file(&vote.to_file())
265            .map_err(LastVoteError::from)
266    }
267}
268
269// ---------------------------------------------------------------
270// Vote request / decision
271// ---------------------------------------------------------------
272
273/// A request for a vote, sent by a candidate to a voter.
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct VoteRequest {
276    /// The candidate's stable identity.
277    pub candidate_id: String,
278    /// The term the candidate is standing for. For a real election this is
279    /// `current_term + 1`; for a dry-run probe it is the *prospective* term
280    /// the candidate would stand for, evaluated without committing to it.
281    pub term: u64,
282    /// The candidate's log frontier — the highest LSN durably in its log.
283    /// The watermark rule compares this against the commit watermark.
284    pub last_log_lsn: u64,
285    /// A dry-run probe gathers a would-be vote without persisting it or
286    /// advancing the voter's term (requirement 1). A real election sets
287    /// this `false`, which is the only path that persists a last-vote.
288    pub dry_run: bool,
289}
290
291impl VoteRequest {
292    pub fn probe(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
293        Self {
294            candidate_id: candidate_id.into(),
295            term,
296            last_log_lsn,
297            dry_run: true,
298        }
299    }
300
301    pub fn real(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
302        Self {
303            candidate_id: candidate_id.into(),
304            term,
305            last_log_lsn,
306            dry_run: false,
307        }
308    }
309}
310
311/// Why a voter refused a candidate.
312#[derive(Debug, Clone, PartialEq, Eq)]
313pub enum RefusalReason {
314    /// The candidate's log does not cover the commit watermark, so an
315    /// acknowledged synchronous write could be lost — the safety core
316    /// refuses (requirement 3).
317    WatermarkNotCovered { candidate_lsn: u64, watermark: u64 },
318    /// The candidate's term is not newer than a term this voter already
319    /// participated in, and the voter already granted that term to someone
320    /// else (durable double-vote guard, requirement 2).
321    AlreadyVoted { term: u64, voted_for: String },
322    /// The candidate's term is older than the voter's current term — a
323    /// stale candidate from a superseded term.
324    StaleTerm {
325        candidate_term: u64,
326        voter_term: u64,
327    },
328}
329
330/// The outcome of a voter considering a [`VoteRequest`].
331#[derive(Debug, Clone, PartialEq, Eq)]
332pub enum VoteDecision {
333    /// Vote granted. For a real (non-dry-run) request the grant has already
334    /// been persisted durably before this value was produced.
335    Granted,
336    /// Vote refused, with the reason.
337    Refused(RefusalReason),
338}
339
340impl VoteDecision {
341    pub fn is_granted(&self) -> bool {
342        matches!(self, VoteDecision::Granted)
343    }
344}
345
346// ---------------------------------------------------------------
347// Voter (voter-side vote rule)
348// ---------------------------------------------------------------
349
350/// A voting member. Wraps the durable [`LastVoteStore`] and applies the
351/// vote rule. The voter is the seat of correctness: the watermark rule and
352/// the durable double-vote guard both live here.
353pub struct Voter<S: LastVoteStore> {
354    id: String,
355    store: S,
356}
357
358impl<S: LastVoteStore> Voter<S> {
359    pub fn new(id: impl Into<String>, store: S) -> Self {
360        Self {
361            id: id.into(),
362            store,
363        }
364    }
365
366    pub fn id(&self) -> &str {
367        &self.id
368    }
369
370    /// This voter's current term — the highest term it has durably recorded.
371    pub fn current_term(&self) -> Result<u64, LastVoteError> {
372        Ok(self.store.load()?.term)
373    }
374
375    /// Consider a vote request against the current `commit_watermark`.
376    ///
377    /// The decision order is deliberate:
378    ///
379    /// 1. **Watermark first** — the safety core. A candidate that cannot
380    ///    carry an acknowledged write is refused regardless of term, so the
381    ///    durability guarantee can never be traded away for liveness.
382    /// 2. **Stale term** — reject candidates from a superseded term.
383    /// 3. **Double-vote guard** — within a term, a voter grants exactly one
384    ///    candidate; a re-ask by the *same* candidate is idempotently
385    ///    re-granted.
386    ///
387    /// For a real (non-dry-run) grant, the new `(term, candidate)` is
388    /// persisted **before** `Granted` is returned, so the durability holds
389    /// across a crash at any point after the caller observes the grant.
390    pub fn consider(
391        &self,
392        req: &VoteRequest,
393        commit_watermark: u64,
394    ) -> Result<VoteDecision, LastVoteError> {
395        // 1. Watermark rule — never relaxed, checked before anything else.
396        if req.last_log_lsn < commit_watermark {
397            return Ok(VoteDecision::Refused(RefusalReason::WatermarkNotCovered {
398                candidate_lsn: req.last_log_lsn,
399                watermark: commit_watermark,
400            }));
401        }
402
403        let last = self.store.load()?;
404
405        // 2. Stale term — the candidate is behind a term we already moved past.
406        if req.term < last.term {
407            return Ok(VoteDecision::Refused(RefusalReason::StaleTerm {
408                candidate_term: req.term,
409                voter_term: last.term,
410            }));
411        }
412
413        // 3. Double-vote guard within the *same* term.
414        if req.term == last.term {
415            match &last.voted_for {
416                // Already voted for someone else this term — refuse.
417                Some(other) if other != &req.candidate_id => {
418                    return Ok(VoteDecision::Refused(RefusalReason::AlreadyVoted {
419                        term: last.term,
420                        voted_for: other.clone(),
421                    }));
422                }
423                // Already voted for this same candidate — idempotent re-grant.
424                Some(_) => return Ok(VoteDecision::Granted),
425                // Same term observed but no vote cast yet — fall through to grant.
426                None => {}
427            }
428        }
429
430        // Grant. A dry-run probe must not persist or advance the term
431        // (requirement 1); a real grant persists durably before acking.
432        if !req.dry_run {
433            self.store.persist(&LastVote {
434                term: req.term,
435                voted_for: Some(req.candidate_id.clone()),
436            })?;
437        }
438        Ok(VoteDecision::Granted)
439    }
440}
441
442// ---------------------------------------------------------------
443// Randomized election timeout
444// ---------------------------------------------------------------
445
446/// A randomized election timeout in `[base, base + jitter)`.
447///
448/// Randomization keeps candidates from standing in lockstep, which is what
449/// makes split votes rare and self-correcting (requirement 4). The function
450/// is pure in `seed` so tests pin a deterministic value; production passes
451/// an entropy-derived seed.
452pub fn randomized_election_timeout(base: Duration, jitter: Duration, seed: u64) -> Duration {
453    if jitter.is_zero() {
454        return base;
455    }
456    let jitter_ms = jitter.as_millis().max(1) as u64;
457    base + Duration::from_millis(seed % jitter_ms)
458}
459
460// ---------------------------------------------------------------
461// ElectionCoordinator (candidate-side state machine)
462// ---------------------------------------------------------------
463
464/// A request to run an election on behalf of `candidate`.
465#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct ElectionRequest {
467    /// The candidate standing for election. Must be electable (a healthy,
468    /// data-bearing member) or [`ElectionCoordinator::run`] refuses up front.
469    pub candidate: Member,
470    /// The term the cluster is serving now. A real election stands for
471    /// `current_term + 1`.
472    pub current_term: u64,
473    /// The candidate's log frontier — the highest LSN durably in its log.
474    pub last_log_lsn: u64,
475    /// The commit watermark the candidate believes is in force. The
476    /// candidate must itself cover it to be electable; voters re-check
477    /// against their own watermark view.
478    pub commit_watermark: u64,
479}
480
481impl ElectionRequest {
482    /// The term a successful election produces.
483    pub fn new_term(&self) -> u64 {
484        self.current_term + 1
485    }
486}
487
488/// The result of an election attempt.
489#[derive(Debug, Clone, PartialEq, Eq)]
490pub enum ElectionOutcome {
491    /// Won a majority and was promoted under `term`. `votes`/`needed` record
492    /// the tally (including the candidate's self-vote).
493    Elected {
494        term: u64,
495        votes: usize,
496        needed: usize,
497    },
498    /// The dry-run probe did not reach a majority, so no term was bumped and
499    /// no real election was attempted (requirement 1).
500    ProbeFailed { votes: usize, needed: usize },
501    /// A real election was attempted (term bumped) but did not reach a
502    /// majority — e.g. votes split or peers came online between probe and
503    /// election. The term has advanced; a later attempt stands for a higher
504    /// term.
505    Lost {
506        term: u64,
507        votes: usize,
508        needed: usize,
509    },
510    /// The candidate is not electable (a witness, or a catching-up replica,
511    /// or its own log does not cover the watermark). No probe was sent.
512    NotElectable,
513    /// The election ran past its randomized timeout before collecting a
514    /// majority. No promotion happened.
515    TimedOut { votes: usize, needed: usize },
516}
517
518impl ElectionOutcome {
519    pub fn is_elected(&self) -> bool {
520        matches!(self, ElectionOutcome::Elected { .. })
521    }
522}
523
524/// Cluster operations the candidate drives, injected so the state machine
525/// stays pure and deterministically testable. Production backs these onto
526/// the membership view, the per-peer vote RPC, the durable term store, and
527/// the FAILOVER handover; tests back them onto a scripted fake.
528pub trait ElectionTransport {
529    /// The candidate's current view of cluster membership. The denominator
530    /// for the majority is the *voting* members of this set.
531    fn members(&self) -> Vec<Member>;
532
533    /// Ask one peer for its vote. The candidate never asks itself (it always
534    /// self-grants). Implementors route this to the peer's [`Voter`].
535    fn request_vote(&mut self, peer_id: &str, req: &VoteRequest) -> VoteDecision;
536
537    /// Time elapsed since the election began, so the coordinator enforces
538    /// the randomized timeout without owning a clock.
539    fn elapsed(&self) -> Duration;
540
541    /// Durably advance this node's current term to `new_term`. Called once,
542    /// only when a real election begins (never for a dry-run). Persisted
543    /// alongside the node's other durable replication state.
544    fn bump_term(&mut self, new_term: u64);
545
546    /// Promote the candidate to primary under `new_term`, reusing the
547    /// FAILOVER handover machinery ([`super::failover`]). Called only after
548    /// a majority is collected in the real election.
549    fn promote(&mut self, new_term: u64);
550}
551
552/// The quorum-gated election state machine.
553pub struct ElectionCoordinator;
554
555impl ElectionCoordinator {
556    /// Run an election for `req`, driving the cluster through `tx`, bounded
557    /// by `timeout` (use [`randomized_election_timeout`]).
558    ///
559    /// The flow is: electability guard → dry-run probe (no term bump) →
560    /// real election (bump term, collect votes) → promote on majority. See
561    /// the module docs for the full contract.
562    pub fn run(
563        req: &ElectionRequest,
564        tx: &mut dyn ElectionTransport,
565        timeout: Duration,
566    ) -> ElectionOutcome {
567        // Electability guard. A witness or catching-up replica may not
568        // stand; nor may a candidate whose own log does not cover the
569        // watermark (it would violate the safety core the instant it won).
570        if !req.candidate.is_electable() || req.last_log_lsn < req.commit_watermark {
571            return ElectionOutcome::NotElectable;
572        }
573
574        let members = tx.members();
575        let needed = quorum_threshold(&members);
576        let new_term = req.new_term();
577
578        // The peers we ask: every *other* voting member. The candidate
579        // self-grants, so it is one vote without an RPC.
580        let peers: Vec<String> = members
581            .iter()
582            .filter(|m| m.is_voter() && m.id != req.candidate.id)
583            .map(|m| m.id.clone())
584            .collect();
585
586        // ---- Phase 1: dry-run probe (does NOT bump the term) ----
587        let probe = VoteRequest::probe(&req.candidate.id, new_term, req.last_log_lsn);
588        let probe_votes = match Self::collect(tx, &peers, &probe, needed, timeout) {
589            CollectResult::Reached(v) => v,
590            CollectResult::Exhausted(v) => {
591                return ElectionOutcome::ProbeFailed { votes: v, needed }
592            }
593            CollectResult::TimedOut(v) => return ElectionOutcome::TimedOut { votes: v, needed },
594        };
595        debug_assert!(probe_votes >= needed);
596
597        // ---- Phase 2: real election (bumps the term, then collects) ----
598        tx.bump_term(new_term);
599        let ballot = VoteRequest::real(&req.candidate.id, new_term, req.last_log_lsn);
600        match Self::collect(tx, &peers, &ballot, needed, timeout) {
601            CollectResult::Reached(votes) => {
602                tx.promote(new_term);
603                ElectionOutcome::Elected {
604                    term: new_term,
605                    votes,
606                    needed,
607                }
608            }
609            CollectResult::Exhausted(votes) => ElectionOutcome::Lost {
610                term: new_term,
611                votes,
612                needed,
613            },
614            CollectResult::TimedOut(votes) => ElectionOutcome::TimedOut { votes, needed },
615        }
616    }
617
618    /// Collect votes from `peers`, starting at 1 for the candidate's
619    /// self-vote, until `needed` is reached, the peers are exhausted, or the
620    /// timeout elapses. Stops early on success — no need to ask the rest.
621    fn collect(
622        tx: &mut dyn ElectionTransport,
623        peers: &[String],
624        req: &VoteRequest,
625        needed: usize,
626        timeout: Duration,
627    ) -> CollectResult {
628        let mut votes = 1usize; // self-vote
629        if votes >= needed {
630            return CollectResult::Reached(votes);
631        }
632        for peer in peers {
633            if tx.elapsed() >= timeout {
634                return CollectResult::TimedOut(votes);
635            }
636            if tx.request_vote(peer, req).is_granted() {
637                votes += 1;
638                if votes >= needed {
639                    return CollectResult::Reached(votes);
640                }
641            }
642        }
643        CollectResult::Exhausted(votes)
644    }
645}
646
647enum CollectResult {
648    Reached(usize),
649    Exhausted(usize),
650    TimedOut(usize),
651}
652
653#[cfg(test)]
654mod tests;