reddb_server/replication/election.rs
1//! Term-based, quorum-gated automatic election (issue #834, PRD #819, ADR 0030).
2//!
3//! This is the consensus core that turns a primary loss into an automatic,
4//! *safe* promotion. It lives in the first-party-but-decoupled control-plane
5//! supervisor (ADR 0030) — distinct from the data path — and reuses the two
6//! pieces the rest of replication already built:
7//!
8//! * the **commit watermark** ([`super::commit_waiter`] / [`super::quorum`]) —
9//! the highest LSN durably replicated to a quorum that intersects every
10//! possible election majority. Nothing at or below it may ever be rolled
11//! back; and
12//! * the **FAILOVER handover machinery** ([`super::failover`]) — once a
13//! candidate wins, promotion is driven through the same coordinated
14//! role-swap, not a parallel state machine.
15//!
16//! ## The five hard requirements (ADR 0030, issue #834)
17//!
18//! 1. **Dry-run probe.** A candidate first asks "*would* you vote for me?"
19//! without bumping any term. Only a real election bumps the term. This
20//! keeps a flapping candidate from burning through terms and lets the
21//! supervisor probe liveness cheaply.
22//! 2. **Durable last-vote.** A voter persists `(term, voted_for)` *before*
23//! acknowledging a grant, so a voter that crashes and restarts mid-term
24//! never double-votes — the second request in the same term for a
25//! different candidate is refused from disk.
26//! 3. **Watermark vote rule (the safety core).** A voter MUST refuse any
27//! candidate whose log does not cover the commit watermark. An
28//! acknowledged synchronous write sits at or below the watermark, so a
29//! winner necessarily carries it — the write provably survives the
30//! failover. This is the one rule that may not be relaxed.
31//! 4. **Randomized election timeouts.** Candidates wait a randomized
32//! interval before standing, so split votes are rare and self-correcting.
33//! 5. **Membership rules.** A quorum is a *majority of voting members*.
34//! **Witness** members ([#836]) hold no data but vote, so `2 data + 1
35//! witness` is a valid HA shape. A **catching-up** replica is
36//! *non-voting* until it reaches a healthy state — it neither votes nor
37//! stands.
38//!
39//! ## No two primaries in a term
40//!
41//! This invariant is structural, not probabilistic:
42//!
43//! * a win requires a strict majority of voting members, and two strict
44//! majorities of the same set always intersect; and
45//! * the shared voter in any two majorities votes at most once per term
46//! (durable last-vote), so it cannot grant two different candidates the
47//! same term.
48//!
49//! Therefore at most one candidate can collect a majority in a given term,
50//! even under an arbitrary network partition. The partition tests exercise
51//! exactly this.
52//!
53//! ## Module shape
54//!
55//! Like [`super::failover`], the candidate-side [`ElectionCoordinator::run`]
56//! is a **pure state machine**: the clock, the per-peer vote RPC, the
57//! durable term bump, and the promotion are injected behind
58//! [`ElectionTransport`], so the whole election is exercised
59//! deterministically with a scripted fake — no clock, no network, no engine.
60//! The voter-side [`Voter`] wraps a [`LastVoteStore`] (durable on disk in
61//! production, in-memory in tests) and applies the vote rule.
62//!
63//! [#836]: https://github.com/reddb-io/reddb/issues/836
64
65use std::time::Duration;
66
67pub use reddb_file::FileLastVoteStore;
68
69// ---------------------------------------------------------------
70// Membership model
71// ---------------------------------------------------------------
72
73/// Whether a member holds data (and can therefore be promoted to primary)
74/// or is a vote-only witness (ADR 0030 — "a node that runs only the
75/// supervisor module").
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum MemberKind {
78 /// Holds data; can vote and can stand for election.
79 Data,
80 /// Vote-only witness ([#836]); counts toward quorum, never primary.
81 ///
82 /// [#836]: https://github.com/reddb-io/reddb/issues/836
83 Witness,
84}
85
86/// Whether a member currently participates in voting.
87///
88/// A data replica that is still catching up (has not reached a healthy,
89/// watermark-covering state) is [`VotingState::CatchingUp`] and is excluded
90/// from the voter set entirely — it neither votes nor counts toward the
91/// majority denominator. Once healthy it becomes [`VotingState::Voting`].
92/// Witnesses are always [`VotingState::Voting`].
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum VotingState {
95 /// Healthy member that participates in quorum.
96 Voting,
97 /// Replica still syncing; non-voting until healthy.
98 CatchingUp,
99}
100
101/// A cluster member as seen by the supervisor's membership view.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Member {
104 /// Stable node identity (matches the replica registry / ack id).
105 pub id: String,
106 pub kind: MemberKind,
107 pub state: VotingState,
108}
109
110impl Member {
111 pub fn data_voting(id: impl Into<String>) -> Self {
112 Self {
113 id: id.into(),
114 kind: MemberKind::Data,
115 state: VotingState::Voting,
116 }
117 }
118
119 pub fn data_catching_up(id: impl Into<String>) -> Self {
120 Self {
121 id: id.into(),
122 kind: MemberKind::Data,
123 state: VotingState::CatchingUp,
124 }
125 }
126
127 pub fn witness(id: impl Into<String>) -> Self {
128 Self {
129 id: id.into(),
130 kind: MemberKind::Witness,
131 state: VotingState::Voting,
132 }
133 }
134
135 /// Does this member count toward quorum? Only healthy members vote;
136 /// a catching-up replica is non-voting (ADR 0030).
137 pub fn is_voter(&self) -> bool {
138 matches!(self.state, VotingState::Voting)
139 }
140
141 /// May this member stand for election? Only a healthy, data-bearing
142 /// member can become primary — a witness holds no data and a
143 /// catching-up replica is not healthy.
144 pub fn is_electable(&self) -> bool {
145 self.kind == MemberKind::Data && self.is_voter()
146 }
147}
148
149/// Quorum threshold for a set of members: a strict majority of the
150/// *voting* members. Witnesses count; catching-up replicas do not.
151///
152/// For `n` voting members the threshold is `floor(n/2) + 1`, the smallest
153/// count such that two qualifying sets always intersect — the structural
154/// basis for "no two primaries in a term".
155pub fn quorum_threshold(members: &[Member]) -> usize {
156 let voters = members.iter().filter(|m| m.is_voter()).count();
157 voters / 2 + 1
158}
159
160// ---------------------------------------------------------------
161// Durable last-vote
162// ---------------------------------------------------------------
163
164/// A node's durable voting record: the highest term it has participated in
165/// and who, if anyone, it granted that term. Persisted so a restart cannot
166/// erase the fact that a vote was already cast (requirement 2).
167#[derive(Debug, Clone, PartialEq, Eq, Default)]
168pub struct LastVote {
169 /// Highest term this node has observed in a (real) vote request.
170 pub term: u64,
171 /// Who this node granted `term` to, if anyone yet.
172 pub voted_for: Option<String>,
173}
174
175impl LastVote {
176 fn from_file(value: reddb_file::DurableLastVote) -> Self {
177 Self {
178 term: value.term,
179 voted_for: value.voted_for,
180 }
181 }
182
183 fn to_file(&self) -> reddb_file::DurableLastVote {
184 reddb_file::DurableLastVote::new(self.term, self.voted_for.clone())
185 }
186}
187
188#[derive(Debug)]
189pub enum LastVoteError {
190 Io(std::io::Error),
191 InvalidFormat(String),
192}
193
194impl std::fmt::Display for LastVoteError {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 match self {
197 Self::Io(err) => write!(f, "last-vote io error: {err}"),
198 Self::InvalidFormat(msg) => write!(f, "invalid last-vote format: {msg}"),
199 }
200 }
201}
202
203impl std::error::Error for LastVoteError {}
204
205impl From<reddb_file::RdbFileError> for LastVoteError {
206 fn from(value: reddb_file::RdbFileError) -> Self {
207 match value {
208 reddb_file::RdbFileError::Io(err) => Self::Io(err),
209 reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
210 }
211 }
212}
213
214/// Durable store for a node's last vote. The contract is narrow on purpose:
215/// `load` returns the persisted record (or the default `term 0, voted_for
216/// None` when nothing was ever written), and `persist` makes a record
217/// durable *before* the caller acknowledges a grant.
218pub trait LastVoteStore {
219 fn load(&self) -> Result<LastVote, LastVoteError>;
220 fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError>;
221}
222
223/// In-memory last-vote store for tests and witnesses that do not need
224/// cross-restart durability. (A witness *should* still persist in
225/// production; the file store is used there.)
226#[derive(Debug, Default)]
227pub struct MemoryLastVoteStore {
228 inner: std::sync::Mutex<LastVote>,
229}
230
231impl MemoryLastVoteStore {
232 pub fn new() -> Self {
233 Self::default()
234 }
235
236 /// Seed an initial record — used by tests to simulate a node that
237 /// already voted before a restart.
238 pub fn seeded(vote: LastVote) -> Self {
239 Self {
240 inner: std::sync::Mutex::new(vote),
241 }
242 }
243}
244
245impl LastVoteStore for MemoryLastVoteStore {
246 fn load(&self) -> Result<LastVote, LastVoteError> {
247 Ok(self.inner.lock().expect("last-vote mutex").clone())
248 }
249
250 fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
251 *self.inner.lock().expect("last-vote mutex") = vote.clone();
252 Ok(())
253 }
254}
255
256impl LastVoteStore for FileLastVoteStore {
257 fn load(&self) -> Result<LastVote, LastVoteError> {
258 self.load_file()
259 .map(LastVote::from_file)
260 .map_err(LastVoteError::from)
261 }
262
263 fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
264 self.persist_file(&vote.to_file())
265 .map_err(LastVoteError::from)
266 }
267}
268
269// ---------------------------------------------------------------
270// Vote request / decision
271// ---------------------------------------------------------------
272
273/// A request for a vote, sent by a candidate to a voter.
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct VoteRequest {
276 /// The candidate's stable identity.
277 pub candidate_id: String,
278 /// The term the candidate is standing for. For a real election this is
279 /// `current_term + 1`; for a dry-run probe it is the *prospective* term
280 /// the candidate would stand for, evaluated without committing to it.
281 pub term: u64,
282 /// The candidate's log frontier — the highest LSN durably in its log.
283 /// The watermark rule compares this against the commit watermark.
284 pub last_log_lsn: u64,
285 /// A dry-run probe gathers a would-be vote without persisting it or
286 /// advancing the voter's term (requirement 1). A real election sets
287 /// this `false`, which is the only path that persists a last-vote.
288 pub dry_run: bool,
289}
290
291impl VoteRequest {
292 pub fn probe(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
293 Self {
294 candidate_id: candidate_id.into(),
295 term,
296 last_log_lsn,
297 dry_run: true,
298 }
299 }
300
301 pub fn real(candidate_id: impl Into<String>, term: u64, last_log_lsn: u64) -> Self {
302 Self {
303 candidate_id: candidate_id.into(),
304 term,
305 last_log_lsn,
306 dry_run: false,
307 }
308 }
309}
310
311/// Why a voter refused a candidate.
312#[derive(Debug, Clone, PartialEq, Eq)]
313pub enum RefusalReason {
314 /// The candidate's log does not cover the commit watermark, so an
315 /// acknowledged synchronous write could be lost — the safety core
316 /// refuses (requirement 3).
317 WatermarkNotCovered { candidate_lsn: u64, watermark: u64 },
318 /// The candidate's term is not newer than a term this voter already
319 /// participated in, and the voter already granted that term to someone
320 /// else (durable double-vote guard, requirement 2).
321 AlreadyVoted { term: u64, voted_for: String },
322 /// The candidate's term is older than the voter's current term — a
323 /// stale candidate from a superseded term.
324 StaleTerm {
325 candidate_term: u64,
326 voter_term: u64,
327 },
328}
329
330/// The outcome of a voter considering a [`VoteRequest`].
331#[derive(Debug, Clone, PartialEq, Eq)]
332pub enum VoteDecision {
333 /// Vote granted. For a real (non-dry-run) request the grant has already
334 /// been persisted durably before this value was produced.
335 Granted,
336 /// Vote refused, with the reason.
337 Refused(RefusalReason),
338}
339
340impl VoteDecision {
341 pub fn is_granted(&self) -> bool {
342 matches!(self, VoteDecision::Granted)
343 }
344}
345
346// ---------------------------------------------------------------
347// Voter (voter-side vote rule)
348// ---------------------------------------------------------------
349
350/// A voting member. Wraps the durable [`LastVoteStore`] and applies the
351/// vote rule. The voter is the seat of correctness: the watermark rule and
352/// the durable double-vote guard both live here.
353pub struct Voter<S: LastVoteStore> {
354 id: String,
355 store: S,
356}
357
358impl<S: LastVoteStore> Voter<S> {
359 pub fn new(id: impl Into<String>, store: S) -> Self {
360 Self {
361 id: id.into(),
362 store,
363 }
364 }
365
366 pub fn id(&self) -> &str {
367 &self.id
368 }
369
370 /// This voter's current term — the highest term it has durably recorded.
371 pub fn current_term(&self) -> Result<u64, LastVoteError> {
372 Ok(self.store.load()?.term)
373 }
374
375 /// Consider a vote request against the current `commit_watermark`.
376 ///
377 /// The decision order is deliberate:
378 ///
379 /// 1. **Watermark first** — the safety core. A candidate that cannot
380 /// carry an acknowledged write is refused regardless of term, so the
381 /// durability guarantee can never be traded away for liveness.
382 /// 2. **Stale term** — reject candidates from a superseded term.
383 /// 3. **Double-vote guard** — within a term, a voter grants exactly one
384 /// candidate; a re-ask by the *same* candidate is idempotently
385 /// re-granted.
386 ///
387 /// For a real (non-dry-run) grant, the new `(term, candidate)` is
388 /// persisted **before** `Granted` is returned, so the durability holds
389 /// across a crash at any point after the caller observes the grant.
390 pub fn consider(
391 &self,
392 req: &VoteRequest,
393 commit_watermark: u64,
394 ) -> Result<VoteDecision, LastVoteError> {
395 // 1. Watermark rule — never relaxed, checked before anything else.
396 if req.last_log_lsn < commit_watermark {
397 return Ok(VoteDecision::Refused(RefusalReason::WatermarkNotCovered {
398 candidate_lsn: req.last_log_lsn,
399 watermark: commit_watermark,
400 }));
401 }
402
403 let last = self.store.load()?;
404
405 // 2. Stale term — the candidate is behind a term we already moved past.
406 if req.term < last.term {
407 return Ok(VoteDecision::Refused(RefusalReason::StaleTerm {
408 candidate_term: req.term,
409 voter_term: last.term,
410 }));
411 }
412
413 // 3. Double-vote guard within the *same* term.
414 if req.term == last.term {
415 match &last.voted_for {
416 // Already voted for someone else this term — refuse.
417 Some(other) if other != &req.candidate_id => {
418 return Ok(VoteDecision::Refused(RefusalReason::AlreadyVoted {
419 term: last.term,
420 voted_for: other.clone(),
421 }));
422 }
423 // Already voted for this same candidate — idempotent re-grant.
424 Some(_) => return Ok(VoteDecision::Granted),
425 // Same term observed but no vote cast yet — fall through to grant.
426 None => {}
427 }
428 }
429
430 // Grant. A dry-run probe must not persist or advance the term
431 // (requirement 1); a real grant persists durably before acking.
432 if !req.dry_run {
433 self.store.persist(&LastVote {
434 term: req.term,
435 voted_for: Some(req.candidate_id.clone()),
436 })?;
437 }
438 Ok(VoteDecision::Granted)
439 }
440}
441
442// ---------------------------------------------------------------
443// Randomized election timeout
444// ---------------------------------------------------------------
445
446/// A randomized election timeout in `[base, base + jitter)`.
447///
448/// Randomization keeps candidates from standing in lockstep, which is what
449/// makes split votes rare and self-correcting (requirement 4). The function
450/// is pure in `seed` so tests pin a deterministic value; production passes
451/// an entropy-derived seed.
452pub fn randomized_election_timeout(base: Duration, jitter: Duration, seed: u64) -> Duration {
453 if jitter.is_zero() {
454 return base;
455 }
456 let jitter_ms = jitter.as_millis().max(1) as u64;
457 base + Duration::from_millis(seed % jitter_ms)
458}
459
460// ---------------------------------------------------------------
461// ElectionCoordinator (candidate-side state machine)
462// ---------------------------------------------------------------
463
464/// A request to run an election on behalf of `candidate`.
465#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct ElectionRequest {
467 /// The candidate standing for election. Must be electable (a healthy,
468 /// data-bearing member) or [`ElectionCoordinator::run`] refuses up front.
469 pub candidate: Member,
470 /// The term the cluster is serving now. A real election stands for
471 /// `current_term + 1`.
472 pub current_term: u64,
473 /// The candidate's log frontier — the highest LSN durably in its log.
474 pub last_log_lsn: u64,
475 /// The commit watermark the candidate believes is in force. The
476 /// candidate must itself cover it to be electable; voters re-check
477 /// against their own watermark view.
478 pub commit_watermark: u64,
479}
480
481impl ElectionRequest {
482 /// The term a successful election produces.
483 pub fn new_term(&self) -> u64 {
484 self.current_term + 1
485 }
486}
487
488/// The result of an election attempt.
489#[derive(Debug, Clone, PartialEq, Eq)]
490pub enum ElectionOutcome {
491 /// Won a majority and was promoted under `term`. `votes`/`needed` record
492 /// the tally (including the candidate's self-vote).
493 Elected {
494 term: u64,
495 votes: usize,
496 needed: usize,
497 },
498 /// The dry-run probe did not reach a majority, so no term was bumped and
499 /// no real election was attempted (requirement 1).
500 ProbeFailed { votes: usize, needed: usize },
501 /// A real election was attempted (term bumped) but did not reach a
502 /// majority — e.g. votes split or peers came online between probe and
503 /// election. The term has advanced; a later attempt stands for a higher
504 /// term.
505 Lost {
506 term: u64,
507 votes: usize,
508 needed: usize,
509 },
510 /// The candidate is not electable (a witness, or a catching-up replica,
511 /// or its own log does not cover the watermark). No probe was sent.
512 NotElectable,
513 /// The election ran past its randomized timeout before collecting a
514 /// majority. No promotion happened.
515 TimedOut { votes: usize, needed: usize },
516}
517
518impl ElectionOutcome {
519 pub fn is_elected(&self) -> bool {
520 matches!(self, ElectionOutcome::Elected { .. })
521 }
522}
523
524/// Cluster operations the candidate drives, injected so the state machine
525/// stays pure and deterministically testable. Production backs these onto
526/// the membership view, the per-peer vote RPC, the durable term store, and
527/// the FAILOVER handover; tests back them onto a scripted fake.
528pub trait ElectionTransport {
529 /// The candidate's current view of cluster membership. The denominator
530 /// for the majority is the *voting* members of this set.
531 fn members(&self) -> Vec<Member>;
532
533 /// Ask one peer for its vote. The candidate never asks itself (it always
534 /// self-grants). Implementors route this to the peer's [`Voter`].
535 fn request_vote(&mut self, peer_id: &str, req: &VoteRequest) -> VoteDecision;
536
537 /// Time elapsed since the election began, so the coordinator enforces
538 /// the randomized timeout without owning a clock.
539 fn elapsed(&self) -> Duration;
540
541 /// Durably advance this node's current term to `new_term`. Called once,
542 /// only when a real election begins (never for a dry-run). Persisted
543 /// alongside the node's other durable replication state.
544 fn bump_term(&mut self, new_term: u64);
545
546 /// Promote the candidate to primary under `new_term`, reusing the
547 /// FAILOVER handover machinery ([`super::failover`]). Called only after
548 /// a majority is collected in the real election.
549 fn promote(&mut self, new_term: u64);
550}
551
552/// The quorum-gated election state machine.
553pub struct ElectionCoordinator;
554
555impl ElectionCoordinator {
556 /// Run an election for `req`, driving the cluster through `tx`, bounded
557 /// by `timeout` (use [`randomized_election_timeout`]).
558 ///
559 /// The flow is: electability guard → dry-run probe (no term bump) →
560 /// real election (bump term, collect votes) → promote on majority. See
561 /// the module docs for the full contract.
562 pub fn run(
563 req: &ElectionRequest,
564 tx: &mut dyn ElectionTransport,
565 timeout: Duration,
566 ) -> ElectionOutcome {
567 // Electability guard. A witness or catching-up replica may not
568 // stand; nor may a candidate whose own log does not cover the
569 // watermark (it would violate the safety core the instant it won).
570 if !req.candidate.is_electable() || req.last_log_lsn < req.commit_watermark {
571 return ElectionOutcome::NotElectable;
572 }
573
574 let members = tx.members();
575 let needed = quorum_threshold(&members);
576 let new_term = req.new_term();
577
578 // The peers we ask: every *other* voting member. The candidate
579 // self-grants, so it is one vote without an RPC.
580 let peers: Vec<String> = members
581 .iter()
582 .filter(|m| m.is_voter() && m.id != req.candidate.id)
583 .map(|m| m.id.clone())
584 .collect();
585
586 // ---- Phase 1: dry-run probe (does NOT bump the term) ----
587 let probe = VoteRequest::probe(&req.candidate.id, new_term, req.last_log_lsn);
588 let probe_votes = match Self::collect(tx, &peers, &probe, needed, timeout) {
589 CollectResult::Reached(v) => v,
590 CollectResult::Exhausted(v) => {
591 return ElectionOutcome::ProbeFailed { votes: v, needed }
592 }
593 CollectResult::TimedOut(v) => return ElectionOutcome::TimedOut { votes: v, needed },
594 };
595 debug_assert!(probe_votes >= needed);
596
597 // ---- Phase 2: real election (bumps the term, then collects) ----
598 tx.bump_term(new_term);
599 let ballot = VoteRequest::real(&req.candidate.id, new_term, req.last_log_lsn);
600 match Self::collect(tx, &peers, &ballot, needed, timeout) {
601 CollectResult::Reached(votes) => {
602 tx.promote(new_term);
603 ElectionOutcome::Elected {
604 term: new_term,
605 votes,
606 needed,
607 }
608 }
609 CollectResult::Exhausted(votes) => ElectionOutcome::Lost {
610 term: new_term,
611 votes,
612 needed,
613 },
614 CollectResult::TimedOut(votes) => ElectionOutcome::TimedOut { votes, needed },
615 }
616 }
617
618 /// Collect votes from `peers`, starting at 1 for the candidate's
619 /// self-vote, until `needed` is reached, the peers are exhausted, or the
620 /// timeout elapses. Stops early on success — no need to ask the rest.
621 fn collect(
622 tx: &mut dyn ElectionTransport,
623 peers: &[String],
624 req: &VoteRequest,
625 needed: usize,
626 timeout: Duration,
627 ) -> CollectResult {
628 let mut votes = 1usize; // self-vote
629 if votes >= needed {
630 return CollectResult::Reached(votes);
631 }
632 for peer in peers {
633 if tx.elapsed() >= timeout {
634 return CollectResult::TimedOut(votes);
635 }
636 if tx.request_vote(peer, req).is_granted() {
637 votes += 1;
638 if votes >= needed {
639 return CollectResult::Reached(votes);
640 }
641 }
642 }
643 CollectResult::Exhausted(votes)
644 }
645}
646
647enum CollectResult {
648 Reached(usize),
649 Exhausted(usize),
650 TimedOut(usize),
651}
652
653#[cfg(test)]
654mod tests;