Skip to main content

reddb_server/replication/
failover.rs

1//! Coordinated zero-RPO failover (issue #833, PRD #819).
2//!
3//! Drives a *planned* primary handover so that no acknowledged write is
4//! lost. The flow is the classic coordinated switchover:
5//!
6//! 1. **Freeze writes** on the current primary and capture its frontier
7//!    LSN at the instant writes stopped. No new LSN is minted after the
8//!    freeze, so the frontier is a *fixed* catch-up target.
9//! 2. **Wait the target replica to the frontier** — poll the target's
10//!    acknowledged (durable) frontier until it covers the frozen LSN.
11//! 3. **Hand over the term** — mint `current_term + 1` and stamp it on
12//!    the target, promoting it to primary.
13//! 4. **Demote the old primary** to a replica that streams from the new
14//!    primary under the new term.
15//!
16//! ## Two modes
17//!
18//! * [`FailoverMode::Coordinated`] is the zero-RPO path. If the target
19//!   cannot reach the frontier before `catch_up_deadline`, the handover
20//!   **aborts**: writes resume on the old primary and nothing is
21//!   committed on the target, so the cluster keeps serving and no
22//!   acknowledged write is lost (issue #833 criterion 1).
23//! * [`FailoverMode::Force`] is the emergency path. It still tries to
24//!   reach the frontier, but on `timeout` it completes the handover
25//!   anyway, surfacing the *skipped catch-up* — the un-replicated LSN
26//!   gap between the frozen frontier and the target's reached frontier
27//!   (issue #833 criterion 2).
28//!
29//! ## Module shape
30//!
31//! [`FailoverCoordinator::run`] is a pure state machine. The clock and
32//! the cluster mutations (freeze, resume, poll, commit) are injected
33//! behind [`FailoverTransport`], so the whole flow is exercised
34//! deterministically with a scripted fake — no clock, no network, no
35//! engine dependency. The post-handover roles are returned in the
36//! outcome ([`RoleAssignment`]) so a caller can assert that the new
37//! primary advertises the new term and the old primary streams as a
38//! replica (issue #833 criterion 3). Wiring the transport to the real
39//! WAL frontier and the gRPC role-swap is left to the transport layer.
40
41use std::time::Duration;
42
43/// The replication role a node plays after a failover step.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum NodeRole {
46    /// Accepts writes under `term`, streams WAL to replicas.
47    Primary { term: u64 },
48    /// Read-only, streams WAL from `primary_addr` under `term`.
49    Replica { primary_addr: String, term: u64 },
50}
51
52/// A node participating in a failover.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FailoverNode {
55    /// Stable node identifier (matches the replica registry id).
56    pub id: String,
57    /// Dial address other nodes use to reach this node.
58    pub addr: String,
59    /// Region/fault-domain identifier.
60    pub region: String,
61}
62
63impl FailoverNode {
64    pub fn new(id: impl Into<String>, addr: impl Into<String>, region: impl Into<String>) -> Self {
65        Self {
66            id: id.into(),
67            addr: addr.into(),
68            region: region.into(),
69        }
70    }
71}
72
73/// How a failover should be executed.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum FailoverMode {
76    /// Zero-RPO coordinated handover. The target MUST reach the frozen
77    /// frontier within `catch_up_deadline`; otherwise the handover
78    /// aborts and writes resume on the old primary. No acknowledged
79    /// write is ever lost.
80    Coordinated { catch_up_deadline: Duration },
81    /// Emergency handover. Tries to reach the frontier but completes
82    /// within `timeout` regardless, surfacing the skipped catch-up.
83    Force { timeout: Duration },
84}
85
86impl FailoverMode {
87    /// Upper bound on how long the catch-up wait may run before the
88    /// mode's terminal behaviour (abort vs. force) kicks in.
89    fn deadline(self) -> Duration {
90        match self {
91            FailoverMode::Coordinated { catch_up_deadline } => catch_up_deadline,
92            FailoverMode::Force { timeout } => timeout,
93        }
94    }
95
96    fn is_force(self) -> bool {
97        matches!(self, FailoverMode::Force { .. })
98    }
99}
100
101/// A request to hand the primary role from `old_primary` to `target`.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct FailoverRequest {
104    /// The node currently serving as primary.
105    pub old_primary: FailoverNode,
106    /// The replica being promoted.
107    pub target: FailoverNode,
108    /// The replication term the cluster is serving *now*. The handover
109    /// mints `current_term + 1`.
110    pub current_term: u64,
111    /// Last known acknowledged frontier of the target (from the replica
112    /// registry). Lets the coordinator take a no-wait fast path when the
113    /// target is already caught up at freeze time.
114    pub target_frontier_hint: u64,
115    /// Coordinated vs. forced execution.
116    pub mode: FailoverMode,
117}
118
119impl FailoverRequest {
120    /// The term the cluster serves *after* a successful handover.
121    pub fn new_term(&self) -> u64 {
122        self.current_term + 1
123    }
124}
125
126/// Post-handover roles of the two nodes, used to assert that the new
127/// primary advertises the new term and the old primary streams as a
128/// replica (issue #833 criterion 3).
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct RoleAssignment {
131    /// The promoted target — now primary under the new term.
132    pub new_primary: NodeRole,
133    /// The demoted old primary — now a replica of the new primary.
134    pub old_primary: NodeRole,
135}
136
137impl RoleAssignment {
138    fn swap(req: &FailoverRequest) -> Self {
139        let new_term = req.new_term();
140        Self {
141            new_primary: NodeRole::Primary { term: new_term },
142            old_primary: NodeRole::Replica {
143                primary_addr: req.target.addr.clone(),
144                term: new_term,
145            },
146        }
147    }
148}
149
150/// The result of a completed handover.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct FailoverOutcome {
153    /// The term the new primary now serves.
154    pub new_term: u64,
155    /// The primary frontier frozen at the moment writes were paused —
156    /// the catch-up target.
157    pub frontier_lsn: u64,
158    /// The target's acknowledged frontier at the moment the term was
159    /// handed over. Equals `frontier_lsn` for a clean handover; may be
160    /// below it for a forced one.
161    pub reached_lsn: u64,
162    /// `frontier_lsn - reached_lsn` — acknowledged-but-not-yet-replicated
163    /// LSNs skipped by a forced handover. Always `0` for a clean one.
164    pub skipped_lsn: u64,
165    /// Whether the handover had to be forced past an un-caught-up target.
166    pub forced: bool,
167    /// How long the catch-up wait ran before the term was handed over.
168    pub waited: Duration,
169    /// Post-handover roles of the two nodes.
170    pub roles: RoleAssignment,
171}
172
173impl FailoverOutcome {
174    /// True when the target reached the full frontier — a true zero-RPO
175    /// handover with nothing skipped.
176    pub fn is_zero_rpo(&self) -> bool {
177        self.skipped_lsn == 0
178    }
179}
180
181/// Why a coordinated failover could not complete without losing writes.
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub enum FailoverError {
184    /// A coordinated handover aborted because the target could not reach
185    /// the frozen frontier before the deadline. Writes have been resumed
186    /// on the old primary; no acknowledged write was lost. Use
187    /// [`FailoverMode::Force`] to hand over anyway.
188    CatchUpTimedOut {
189        frontier_lsn: u64,
190        reached_lsn: u64,
191        waited: Duration,
192    },
193}
194
195impl std::fmt::Display for FailoverError {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        match self {
198            FailoverError::CatchUpTimedOut {
199                frontier_lsn,
200                reached_lsn,
201                waited,
202            } => write!(
203                f,
204                "coordinated failover aborted: target reached LSN {reached_lsn} of {frontier_lsn} \
205                 after {waited:?}; writes resumed on the old primary, no write lost",
206            ),
207        }
208    }
209}
210
211impl std::error::Error for FailoverError {}
212
213/// Cluster mutations and the clock the coordinator drives, injected so
214/// the state machine stays pure and deterministically testable.
215///
216/// Implementors back these onto the real WAL frontier, the replica
217/// registry, and the gRPC role-swap in production; tests back them onto
218/// a scripted fake.
219pub trait FailoverTransport {
220    /// Pause writes on the current primary and return the frontier LSN
221    /// (`current_lsn`) frozen at the instant writes stopped. After this
222    /// returns, no new LSN is minted, so the returned value is a fixed
223    /// catch-up target.
224    fn freeze_primary(&mut self) -> u64;
225
226    /// Resume writes on the old primary. Called only when a coordinated
227    /// handover aborts, so the cluster keeps serving with no lost write.
228    fn resume_primary(&mut self);
229
230    /// Time elapsed since the failover began, so the coordinator can
231    /// enforce the deadline without owning a clock.
232    fn elapsed(&self) -> Duration;
233
234    /// Block for one poll interval (clamped by the caller's remaining
235    /// deadline in spirit), then return the target replica's current
236    /// acknowledged (durable) frontier LSN.
237    fn poll_target_frontier(&mut self) -> u64;
238
239    /// Commit the role swap: stamp `new_term` on the target (promoting
240    /// it to primary) and reconfigure the old primary to stream as a
241    /// replica of the new primary under `new_term`.
242    fn commit_handover(&mut self, new_term: u64);
243}
244
245/// The coordinated zero-RPO failover state machine.
246pub struct FailoverCoordinator;
247
248impl FailoverCoordinator {
249    /// Execute the handover described by `req`, driving the cluster
250    /// through `tx`.
251    ///
252    /// Returns `Ok(FailoverOutcome)` once the term has been handed over
253    /// (cleanly, or forced past a lagging target). Returns
254    /// `Err(FailoverError::CatchUpTimedOut)` only for a *coordinated*
255    /// handover whose target never caught up — in which case writes have
256    /// already been resumed on the old primary and nothing was committed
257    /// on the target.
258    pub fn run(
259        req: &FailoverRequest,
260        tx: &mut dyn FailoverTransport,
261    ) -> Result<FailoverOutcome, FailoverError> {
262        let new_term = req.new_term();
263        let frontier = tx.freeze_primary();
264
265        // Fast path: the target was already at/past the frontier when we
266        // froze. Hand over immediately, no wait.
267        if req.target_frontier_hint >= frontier {
268            tx.commit_handover(new_term);
269            return Ok(Self::clean_outcome(req, frontier, frontier, Duration::ZERO));
270        }
271
272        // Bounded wait: poll the target's live frontier until it covers
273        // the frozen LSN or the deadline elapses.
274        let deadline = req.mode.deadline();
275        let mut reached = req.target_frontier_hint;
276        while tx.elapsed() < deadline {
277            reached = tx.poll_target_frontier();
278            if reached >= frontier {
279                let waited = tx.elapsed();
280                tx.commit_handover(new_term);
281                return Ok(Self::clean_outcome(req, frontier, reached, waited));
282            }
283        }
284
285        // Deadline blown. A coordinated handover aborts (resume writes,
286        // lose nothing); a forced one hands over anyway and surfaces the
287        // skipped catch-up.
288        let waited = tx.elapsed();
289        if req.mode.is_force() {
290            tx.commit_handover(new_term);
291            Ok(FailoverOutcome {
292                new_term,
293                frontier_lsn: frontier,
294                reached_lsn: reached,
295                skipped_lsn: frontier.saturating_sub(reached),
296                forced: true,
297                waited,
298                roles: RoleAssignment::swap(req),
299            })
300        } else {
301            tx.resume_primary();
302            Err(FailoverError::CatchUpTimedOut {
303                frontier_lsn: frontier,
304                reached_lsn: reached,
305                waited,
306            })
307        }
308    }
309
310    fn clean_outcome(
311        req: &FailoverRequest,
312        frontier: u64,
313        reached: u64,
314        waited: Duration,
315    ) -> FailoverOutcome {
316        FailoverOutcome {
317            new_term: req.new_term(),
318            frontier_lsn: frontier,
319            reached_lsn: reached,
320            skipped_lsn: 0,
321            forced: false,
322            waited,
323            roles: RoleAssignment::swap(req),
324        }
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    /// Scripted transport: a fixed frozen frontier and a queue of target
333    /// frontier readings consumed one per poll. `tick` advances the fake
334    /// clock by a fixed step on every poll so deadlines are exercised
335    /// deterministically.
336    struct FakeTransport {
337        frontier: u64,
338        readings: std::collections::VecDeque<u64>,
339        /// Last reading repeated once the script is exhausted (replica
340        /// stuck behind).
341        stuck_at: u64,
342        elapsed: Duration,
343        tick: Duration,
344        froze: bool,
345        resumed: bool,
346        committed_term: Option<u64>,
347    }
348
349    impl FakeTransport {
350        fn new(frontier: u64, readings: Vec<u64>, tick: Duration) -> Self {
351            let stuck_at = readings.last().copied().unwrap_or(0);
352            Self {
353                frontier,
354                readings: readings.into(),
355                stuck_at,
356                elapsed: Duration::ZERO,
357                tick,
358                froze: false,
359                resumed: false,
360                committed_term: None,
361            }
362        }
363    }
364
365    impl FailoverTransport for FakeTransport {
366        fn freeze_primary(&mut self) -> u64 {
367            self.froze = true;
368            self.frontier
369        }
370        fn resume_primary(&mut self) {
371            self.resumed = true;
372        }
373        fn elapsed(&self) -> Duration {
374            self.elapsed
375        }
376        fn poll_target_frontier(&mut self) -> u64 {
377            self.elapsed += self.tick;
378            self.readings.pop_front().unwrap_or(self.stuck_at)
379        }
380        fn commit_handover(&mut self, new_term: u64) {
381            self.committed_term = Some(new_term);
382        }
383    }
384
385    fn request(mode: FailoverMode, hint: u64) -> FailoverRequest {
386        FailoverRequest {
387            old_primary: FailoverNode::new("n1", "http://n1:50051", "us-east"),
388            target: FailoverNode::new("n2", "http://n2:50051", "us-west"),
389            current_term: 4,
390            target_frontier_hint: hint,
391            mode,
392        }
393    }
394
395    #[test]
396    fn fast_path_hands_over_without_waiting_when_target_already_caught_up() {
397        let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
398        let req = request(
399            FailoverMode::Coordinated {
400                catch_up_deadline: Duration::from_secs(5),
401            },
402            100,
403        );
404
405        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
406
407        assert!(tx.froze, "writes must be paused");
408        assert_eq!(tx.committed_term, Some(5), "new term handed over");
409        assert!(!tx.resumed, "no abort on a clean handover");
410        assert_eq!(outcome.waited, Duration::ZERO, "fast path does not wait");
411        assert!(outcome.is_zero_rpo());
412        assert_eq!(outcome.skipped_lsn, 0);
413    }
414
415    #[test]
416    fn coordinated_waits_then_hands_over_when_target_catches_up() {
417        // Target climbs 60 -> 80 -> 100, reaching the frontier on poll 3.
418        let mut tx = FakeTransport::new(100, vec![60, 80, 100], Duration::from_millis(10));
419        let req = request(
420            FailoverMode::Coordinated {
421                catch_up_deadline: Duration::from_secs(5),
422            },
423            50,
424        );
425
426        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
427
428        assert_eq!(tx.committed_term, Some(5));
429        assert!(!tx.resumed);
430        assert_eq!(outcome.new_term, 5);
431        assert_eq!(outcome.frontier_lsn, 100);
432        assert_eq!(outcome.reached_lsn, 100);
433        assert!(outcome.is_zero_rpo(), "no write lost in a clean handover");
434        assert_eq!(outcome.waited, Duration::from_millis(30));
435        assert_eq!(
436            outcome.roles.new_primary,
437            NodeRole::Primary { term: 5 },
438            "new primary advertises the new term",
439        );
440        assert_eq!(
441            outcome.roles.old_primary,
442            NodeRole::Replica {
443                primary_addr: "http://n2:50051".to_string(),
444                term: 5,
445            },
446            "old primary streams as a replica of the new primary",
447        );
448    }
449
450    #[test]
451    fn coordinated_aborts_and_resumes_when_target_never_catches_up() {
452        // Target stalls at 70, never reaching the frontier of 100 before
453        // the 50ms deadline (5 polls at 10ms).
454        let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
455        let req = request(
456            FailoverMode::Coordinated {
457                catch_up_deadline: Duration::from_millis(50),
458            },
459            50,
460        );
461
462        let err = FailoverCoordinator::run(&req, &mut tx).expect_err("must abort");
463
464        assert!(tx.resumed, "writes must resume on the old primary");
465        assert_eq!(tx.committed_term, None, "no term handed over on abort");
466        match err {
467            FailoverError::CatchUpTimedOut {
468                frontier_lsn,
469                reached_lsn,
470                ..
471            } => {
472                assert_eq!(frontier_lsn, 100);
473                assert_eq!(reached_lsn, 70);
474            }
475        }
476    }
477
478    #[test]
479    fn force_completes_within_timeout_surfacing_skipped_catch_up() {
480        // Target stalls at 70; FORCE hands over anyway after the timeout.
481        let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
482        let req = request(
483            FailoverMode::Force {
484                timeout: Duration::from_millis(50),
485            },
486            50,
487        );
488
489        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover");
490
491        assert!(!tx.resumed, "forced handover does not abort");
492        assert_eq!(tx.committed_term, Some(5), "term handed over under force");
493        assert!(outcome.forced);
494        assert_eq!(outcome.frontier_lsn, 100);
495        assert_eq!(outcome.reached_lsn, 70);
496        assert_eq!(outcome.skipped_lsn, 30, "skipped catch-up surfaced");
497        assert!(!outcome.is_zero_rpo());
498        assert!(
499            outcome.waited <= Duration::from_millis(60),
500            "completes within the timeout window",
501        );
502        assert_eq!(outcome.roles.new_primary, NodeRole::Primary { term: 5 });
503    }
504
505    #[test]
506    fn force_still_takes_fast_path_when_target_already_caught_up() {
507        let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
508        let req = request(
509            FailoverMode::Force {
510                timeout: Duration::from_millis(50),
511            },
512            120,
513        );
514
515        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean forced handover");
516
517        assert!(!outcome.forced, "no force needed when already caught up");
518        assert_eq!(outcome.skipped_lsn, 0);
519        assert!(outcome.is_zero_rpo());
520    }
521
522    #[test]
523    fn force_that_catches_up_in_time_skips_nothing() {
524        let mut tx = FakeTransport::new(100, vec![90, 100], Duration::from_millis(10));
525        let req = request(
526            FailoverMode::Force {
527                timeout: Duration::from_secs(5),
528            },
529            50,
530        );
531
532        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover catches up");
533
534        assert!(!outcome.forced);
535        assert_eq!(outcome.skipped_lsn, 0);
536        assert!(outcome.is_zero_rpo());
537    }
538}