Skip to main content

reddb_server/replication/
failover.rs

1//! Coordinated zero-RPO failover (issue #833, PRD #819).
2//!
3//! Drives a *planned* primary handover so that no acknowledged write is
4//! lost. The flow is the classic coordinated switchover:
5//!
6//! 1. **Freeze writes** on the current primary and capture its frontier
7//!    LSN at the instant writes stopped. No new LSN is minted after the
8//!    freeze, so the frontier is a *fixed* catch-up target.
9//! 2. **Wait the target replica to the frontier** — poll the target's
10//!    acknowledged (durable) frontier until it covers the frozen LSN.
11//! 3. **Hand over the term** — mint `current_term + 1` and stamp it on
12//!    the target, promoting it to primary.
13//! 4. **Demote the old primary** to a replica that streams from the new
14//!    primary under the new term.
15//!
16//! ## Two modes
17//!
18//! * [`FailoverMode::Coordinated`] is the zero-RPO path. If the target
19//!   cannot reach the frontier before `catch_up_deadline`, the handover
20//!   **aborts**: writes resume on the old primary and nothing is
21//!   committed on the target, so the cluster keeps serving and no
22//!   acknowledged write is lost (issue #833 criterion 1).
23//! * [`FailoverMode::Force`] is the emergency path. It still tries to
24//!   reach the frontier, but on `timeout` it completes the handover
25//!   anyway, surfacing the *skipped catch-up* — the un-replicated LSN
26//!   gap between the frozen frontier and the target's reached frontier
27//!   (issue #833 criterion 2).
28//!
29//! ## Module shape
30//!
31//! [`FailoverCoordinator::run`] is a pure state machine. The clock and
32//! the cluster mutations (freeze, resume, poll, commit) are injected
33//! behind [`FailoverTransport`], so the whole flow is exercised
34//! deterministically with a scripted fake — no clock, no network, no
35//! engine dependency. The post-handover roles are returned in the
36//! outcome ([`RoleAssignment`]) so a caller can assert that the new
37//! primary advertises the new term and the old primary streams as a
38//! replica (issue #833 criterion 3). Wiring the transport to the real
39//! WAL frontier and the gRPC role-swap is left to the transport layer.
40
41use std::time::Duration;
42
43/// The replication role a node plays after a failover step.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum NodeRole {
46    /// Accepts writes under `term`, streams WAL to replicas.
47    Primary { term: u64 },
48    /// Read-only, streams WAL from `primary_addr` under `term`.
49    Replica { primary_addr: String, term: u64 },
50}
51
52/// A node participating in a failover.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FailoverNode {
55    /// Stable node identifier (matches the replica registry id).
56    pub id: String,
57    /// Dial address other nodes use to reach this node.
58    pub addr: String,
59    /// Region/fault-domain identifier.
60    pub region: String,
61}
62
63impl FailoverNode {
64    pub fn new(id: impl Into<String>, addr: impl Into<String>, region: impl Into<String>) -> Self {
65        Self {
66            id: id.into(),
67            addr: addr.into(),
68            region: region.into(),
69        }
70    }
71}
72
73/// How a failover should be executed.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum FailoverMode {
76    /// Zero-RPO coordinated handover. The target MUST reach the frozen
77    /// frontier within `catch_up_deadline`; otherwise the handover
78    /// aborts and writes resume on the old primary. No acknowledged
79    /// write is ever lost.
80    Coordinated { catch_up_deadline: Duration },
81    /// Emergency handover. Tries to reach the frontier but completes
82    /// within `timeout` regardless, surfacing the skipped catch-up.
83    Force { timeout: Duration },
84}
85
86impl FailoverMode {
87    /// Upper bound on how long the catch-up wait may run before the
88    /// mode's terminal behaviour (abort vs. force) kicks in.
89    fn deadline(self) -> Duration {
90        match self {
91            FailoverMode::Coordinated { catch_up_deadline } => catch_up_deadline,
92            FailoverMode::Force { timeout } => timeout,
93        }
94    }
95
96    fn is_force(self) -> bool {
97        matches!(self, FailoverMode::Force { .. })
98    }
99}
100
101/// A request to hand the primary role from `old_primary` to `target`.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct FailoverRequest {
104    /// The node currently serving as primary.
105    pub old_primary: FailoverNode,
106    /// The replica being promoted.
107    pub target: FailoverNode,
108    /// The replication term the cluster is serving *now*. The handover
109    /// mints `current_term + 1`.
110    pub current_term: u64,
111    /// Last known acknowledged frontier of the target (from the replica
112    /// registry). Lets the coordinator take a no-wait fast path when the
113    /// target is already caught up at freeze time.
114    pub target_frontier_hint: u64,
115    /// Current primary-replica timeline ancestry. A successful handover
116    /// forks this history at the target's reached LSN and returns the new
117    /// history in [`FailoverOutcome::timeline_history`].
118    pub timeline_history: reddb_file::TimelineHistory,
119    /// Coordinated vs. forced execution.
120    pub mode: FailoverMode,
121}
122
123impl FailoverRequest {
124    /// The term the cluster serves *after* a successful handover.
125    pub fn new_term(&self) -> u64 {
126        self.current_term + 1
127    }
128}
129
130/// Post-handover roles of the two nodes, used to assert that the new
131/// primary advertises the new term and the old primary streams as a
132/// replica (issue #833 criterion 3).
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct RoleAssignment {
135    /// The promoted target — now primary under the new term.
136    pub new_primary: NodeRole,
137    /// The demoted old primary — now a replica of the new primary.
138    pub old_primary: NodeRole,
139}
140
141impl RoleAssignment {
142    fn swap(req: &FailoverRequest) -> Self {
143        let new_term = req.new_term();
144        Self {
145            new_primary: NodeRole::Primary { term: new_term },
146            old_primary: NodeRole::Replica {
147                primary_addr: req.target.addr.clone(),
148                term: new_term,
149            },
150        }
151    }
152}
153
154/// The result of a completed handover.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FailoverOutcome {
157    /// The term the new primary now serves.
158    pub new_term: u64,
159    /// The primary frontier frozen at the moment writes were paused —
160    /// the catch-up target.
161    pub frontier_lsn: u64,
162    /// The target's acknowledged frontier at the moment the term was
163    /// handed over. Equals `frontier_lsn` for a clean handover; may be
164    /// below it for a forced one.
165    pub reached_lsn: u64,
166    /// `frontier_lsn - reached_lsn` — acknowledged-but-not-yet-replicated
167    /// LSNs skipped by a forced handover. Always `0` for a clean one.
168    pub skipped_lsn: u64,
169    /// Whether the handover had to be forced past an un-caught-up target.
170    pub forced: bool,
171    /// How long the catch-up wait ran before the term was handed over.
172    pub waited: Duration,
173    /// Post-handover roles of the two nodes.
174    pub roles: RoleAssignment,
175    /// Timeline ancestry after the promoted target became primary.
176    pub timeline_history: reddb_file::TimelineHistory,
177}
178
179impl FailoverOutcome {
180    /// True when the target reached the full frontier — a true zero-RPO
181    /// handover with nothing skipped.
182    pub fn is_zero_rpo(&self) -> bool {
183        self.skipped_lsn == 0
184    }
185}
186
187/// Why a coordinated failover could not complete without losing writes.
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub enum FailoverError {
190    /// A coordinated handover aborted because the target could not reach
191    /// the frozen frontier before the deadline. Writes have been resumed
192    /// on the old primary; no acknowledged write was lost. Use
193    /// [`FailoverMode::Force`] to hand over anyway.
194    CatchUpTimedOut {
195        frontier_lsn: u64,
196        reached_lsn: u64,
197        waited: Duration,
198    },
199    /// The term/role swap was otherwise ready, but the timeline fork could
200    /// not be represented safely. No handover is committed in this case.
201    TimelineHistory(String),
202}
203
204impl std::fmt::Display for FailoverError {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        match self {
207            FailoverError::CatchUpTimedOut {
208                frontier_lsn,
209                reached_lsn,
210                waited,
211            } => write!(
212                f,
213                "coordinated failover aborted: target reached LSN {reached_lsn} of {frontier_lsn} \
214                 after {waited:?}; writes resumed on the old primary, no write lost",
215            ),
216            FailoverError::TimelineHistory(message) => {
217                write!(f, "failover timeline history error: {message}")
218            }
219        }
220    }
221}
222
223impl std::error::Error for FailoverError {}
224
225/// Cluster mutations and the clock the coordinator drives, injected so
226/// the state machine stays pure and deterministically testable.
227///
228/// Implementors back these onto the real WAL frontier, the replica
229/// registry, and the gRPC role-swap in production; tests back them onto
230/// a scripted fake.
231pub trait FailoverTransport {
232    /// Pause writes on the current primary and return the frontier LSN
233    /// (`current_lsn`) frozen at the instant writes stopped. After this
234    /// returns, no new LSN is minted, so the returned value is a fixed
235    /// catch-up target.
236    fn freeze_primary(&mut self) -> u64;
237
238    /// Resume writes on the old primary. Called only when a coordinated
239    /// handover aborts, so the cluster keeps serving with no lost write.
240    fn resume_primary(&mut self);
241
242    /// Time elapsed since the failover began, so the coordinator can
243    /// enforce the deadline without owning a clock.
244    fn elapsed(&self) -> Duration;
245
246    /// Block for one poll interval (clamped by the caller's remaining
247    /// deadline in spirit), then return the target replica's current
248    /// acknowledged (durable) frontier LSN.
249    fn poll_target_frontier(&mut self) -> u64;
250
251    /// Commit the role swap: stamp `new_term` on the target (promoting
252    /// it to primary) and reconfigure the old primary to stream as a
253    /// replica of the new primary under `new_term`.
254    fn commit_handover(&mut self, new_term: u64);
255}
256
257/// The coordinated zero-RPO failover state machine.
258pub struct FailoverCoordinator;
259
260impl FailoverCoordinator {
261    /// Execute the handover described by `req`, driving the cluster
262    /// through `tx`.
263    ///
264    /// Returns `Ok(FailoverOutcome)` once the term has been handed over
265    /// (cleanly, or forced past a lagging target). Returns
266    /// `Err(FailoverError::CatchUpTimedOut)` only for a *coordinated*
267    /// handover whose target never caught up — in which case writes have
268    /// already been resumed on the old primary and nothing was committed
269    /// on the target.
270    pub fn run(
271        req: &FailoverRequest,
272        tx: &mut dyn FailoverTransport,
273    ) -> Result<FailoverOutcome, FailoverError> {
274        let new_term = req.new_term();
275        let frontier = tx.freeze_primary();
276
277        // Fast path: the target was already at/past the frontier when we
278        // froze. Hand over immediately, no wait.
279        if req.target_frontier_hint >= frontier {
280            let timeline_history = Self::promoted_timeline_history(req, frontier)?;
281            tx.commit_handover(new_term);
282            return Ok(Self::clean_outcome(
283                req,
284                frontier,
285                frontier,
286                Duration::ZERO,
287                timeline_history,
288            ));
289        }
290
291        // Bounded wait: poll the target's live frontier until it covers
292        // the frozen LSN or the deadline elapses.
293        let deadline = req.mode.deadline();
294        let mut reached = req.target_frontier_hint;
295        while tx.elapsed() < deadline {
296            reached = tx.poll_target_frontier();
297            if reached >= frontier {
298                let waited = tx.elapsed();
299                let timeline_history = Self::promoted_timeline_history(req, reached)?;
300                tx.commit_handover(new_term);
301                return Ok(Self::clean_outcome(
302                    req,
303                    frontier,
304                    reached,
305                    waited,
306                    timeline_history,
307                ));
308            }
309        }
310
311        // Deadline blown. A coordinated handover aborts (resume writes,
312        // lose nothing); a forced one hands over anyway and surfaces the
313        // skipped catch-up.
314        let waited = tx.elapsed();
315        if req.mode.is_force() {
316            let timeline_history = Self::promoted_timeline_history(req, reached)?;
317            tx.commit_handover(new_term);
318            Ok(FailoverOutcome {
319                new_term,
320                frontier_lsn: frontier,
321                reached_lsn: reached,
322                skipped_lsn: frontier.saturating_sub(reached),
323                forced: true,
324                waited,
325                roles: RoleAssignment::swap(req),
326                timeline_history,
327            })
328        } else {
329            tx.resume_primary();
330            Err(FailoverError::CatchUpTimedOut {
331                frontier_lsn: frontier,
332                reached_lsn: reached,
333                waited,
334            })
335        }
336    }
337
338    fn clean_outcome(
339        req: &FailoverRequest,
340        frontier: u64,
341        reached: u64,
342        waited: Duration,
343        timeline_history: reddb_file::TimelineHistory,
344    ) -> FailoverOutcome {
345        FailoverOutcome {
346            new_term: req.new_term(),
347            frontier_lsn: frontier,
348            reached_lsn: reached,
349            skipped_lsn: 0,
350            forced: false,
351            waited,
352            roles: RoleAssignment::swap(req),
353            timeline_history,
354        }
355    }
356
357    fn promoted_timeline_history(
358        req: &FailoverRequest,
359        reached_lsn: u64,
360    ) -> Result<reddb_file::TimelineHistory, FailoverError> {
361        let parent = req
362            .timeline_history
363            .current()
364            .unwrap_or_else(reddb_file::TimelineId::initial);
365        let candidate = reddb_file::PromotionCandidate {
366            replica_id: req.target.id.clone(),
367            timeline: parent,
368            received_lsn: reached_lsn,
369            flushed_lsn: reached_lsn,
370            applied_lsn: reached_lsn,
371        };
372        req.timeline_history
373            .promotion_history(&candidate, parent.next(), crate::utils::now_unix_millis())
374            .map_err(|err| FailoverError::TimelineHistory(err.to_string()))
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    /// Scripted transport: a fixed frozen frontier and a queue of target
383    /// frontier readings consumed one per poll. `tick` advances the fake
384    /// clock by a fixed step on every poll so deadlines are exercised
385    /// deterministically.
386    struct FakeTransport {
387        frontier: u64,
388        readings: std::collections::VecDeque<u64>,
389        /// Last reading repeated once the script is exhausted (replica
390        /// stuck behind).
391        stuck_at: u64,
392        elapsed: Duration,
393        tick: Duration,
394        froze: bool,
395        resumed: bool,
396        committed_term: Option<u64>,
397    }
398
399    impl FakeTransport {
400        fn new(frontier: u64, readings: Vec<u64>, tick: Duration) -> Self {
401            let stuck_at = readings.last().copied().unwrap_or(0);
402            Self {
403                frontier,
404                readings: readings.into(),
405                stuck_at,
406                elapsed: Duration::ZERO,
407                tick,
408                froze: false,
409                resumed: false,
410                committed_term: None,
411            }
412        }
413    }
414
415    impl FailoverTransport for FakeTransport {
416        fn freeze_primary(&mut self) -> u64 {
417            self.froze = true;
418            self.frontier
419        }
420        fn resume_primary(&mut self) {
421            self.resumed = true;
422        }
423        fn elapsed(&self) -> Duration {
424            self.elapsed
425        }
426        fn poll_target_frontier(&mut self) -> u64 {
427            self.elapsed += self.tick;
428            self.readings.pop_front().unwrap_or(self.stuck_at)
429        }
430        fn commit_handover(&mut self, new_term: u64) {
431            self.committed_term = Some(new_term);
432        }
433    }
434
435    fn request(mode: FailoverMode, hint: u64) -> FailoverRequest {
436        FailoverRequest {
437            old_primary: FailoverNode::new("n1", "http://n1:50051", "us-east"),
438            target: FailoverNode::new("n2", "http://n2:50051", "us-west"),
439            current_term: 4,
440            target_frontier_hint: hint,
441            timeline_history: reddb_file::TimelineHistory::new(10),
442            mode,
443        }
444    }
445
446    #[test]
447    fn fast_path_hands_over_without_waiting_when_target_already_caught_up() {
448        let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
449        let req = request(
450            FailoverMode::Coordinated {
451                catch_up_deadline: Duration::from_secs(5),
452            },
453            100,
454        );
455
456        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
457
458        assert!(tx.froze, "writes must be paused");
459        assert_eq!(tx.committed_term, Some(5), "new term handed over");
460        assert!(!tx.resumed, "no abort on a clean handover");
461        assert_eq!(outcome.waited, Duration::ZERO, "fast path does not wait");
462        assert!(outcome.is_zero_rpo());
463        assert_eq!(outcome.skipped_lsn, 0);
464        assert_eq!(
465            outcome.timeline_history.current(),
466            Some(reddb_file::TimelineId(2))
467        );
468        assert_eq!(
469            outcome
470                .timeline_history
471                .ancestor_lsn(reddb_file::TimelineId(2)),
472            Some(100)
473        );
474    }
475
476    #[test]
477    fn coordinated_waits_then_hands_over_when_target_catches_up() {
478        // Target climbs 60 -> 80 -> 100, reaching the frontier on poll 3.
479        let mut tx = FakeTransport::new(100, vec![60, 80, 100], Duration::from_millis(10));
480        let req = request(
481            FailoverMode::Coordinated {
482                catch_up_deadline: Duration::from_secs(5),
483            },
484            50,
485        );
486
487        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
488
489        assert_eq!(tx.committed_term, Some(5));
490        assert!(!tx.resumed);
491        assert_eq!(outcome.new_term, 5);
492        assert_eq!(outcome.frontier_lsn, 100);
493        assert_eq!(outcome.reached_lsn, 100);
494        assert!(outcome.is_zero_rpo(), "no write lost in a clean handover");
495        assert_eq!(outcome.waited, Duration::from_millis(30));
496        assert_eq!(
497            outcome.roles.new_primary,
498            NodeRole::Primary { term: 5 },
499            "new primary advertises the new term",
500        );
501        assert_eq!(
502            outcome.roles.old_primary,
503            NodeRole::Replica {
504                primary_addr: "http://n2:50051".to_string(),
505                term: 5,
506            },
507            "old primary streams as a replica of the new primary",
508        );
509    }
510
511    #[test]
512    fn coordinated_handover_appends_second_timeline_promotion() {
513        let mut tx = FakeTransport::new(200, vec![], Duration::from_millis(10));
514        let mut req = request(
515            FailoverMode::Coordinated {
516                catch_up_deadline: Duration::from_secs(5),
517            },
518            200,
519        );
520        req.current_term = 5;
521        req.timeline_history
522            .fork(
523                reddb_file::TimelineId(2),
524                reddb_file::TimelineId(1),
525                100,
526                20,
527                "promote replica-a",
528            )
529            .expect("seed previous promotion");
530
531        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("second handover");
532
533        assert_eq!(outcome.new_term, 6);
534        assert_eq!(
535            outcome.timeline_history.current(),
536            Some(reddb_file::TimelineId(3))
537        );
538        assert_eq!(
539            outcome
540                .timeline_history
541                .ancestor_lsn(reddb_file::TimelineId(2)),
542            Some(100)
543        );
544        assert_eq!(
545            outcome
546                .timeline_history
547                .ancestor_lsn(reddb_file::TimelineId(3)),
548            Some(200)
549        );
550        let chain = outcome
551            .timeline_history
552            .descendant_chain_from(reddb_file::TimelineId(1))
553            .expect("chain from initial timeline");
554        assert_eq!(chain.len(), 2);
555        assert_eq!(chain[0].timeline, reddb_file::TimelineId(2));
556        assert_eq!(chain[1].timeline, reddb_file::TimelineId(3));
557    }
558
559    #[test]
560    fn coordinated_aborts_and_resumes_when_target_never_catches_up() {
561        // Target stalls at 70, never reaching the frontier of 100 before
562        // the 50ms deadline (5 polls at 10ms).
563        let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
564        let req = request(
565            FailoverMode::Coordinated {
566                catch_up_deadline: Duration::from_millis(50),
567            },
568            50,
569        );
570
571        let err = FailoverCoordinator::run(&req, &mut tx).expect_err("must abort");
572
573        assert!(tx.resumed, "writes must resume on the old primary");
574        assert_eq!(tx.committed_term, None, "no term handed over on abort");
575        match err {
576            FailoverError::CatchUpTimedOut {
577                frontier_lsn,
578                reached_lsn,
579                ..
580            } => {
581                assert_eq!(frontier_lsn, 100);
582                assert_eq!(reached_lsn, 70);
583            }
584            FailoverError::TimelineHistory(err) => panic!("unexpected timeline error: {err}"),
585        }
586    }
587
588    #[test]
589    fn force_completes_within_timeout_surfacing_skipped_catch_up() {
590        // Target stalls at 70; FORCE hands over anyway after the timeout.
591        let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
592        let req = request(
593            FailoverMode::Force {
594                timeout: Duration::from_millis(50),
595            },
596            50,
597        );
598
599        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover");
600
601        assert!(!tx.resumed, "forced handover does not abort");
602        assert_eq!(tx.committed_term, Some(5), "term handed over under force");
603        assert!(outcome.forced);
604        assert_eq!(outcome.frontier_lsn, 100);
605        assert_eq!(outcome.reached_lsn, 70);
606        assert_eq!(outcome.skipped_lsn, 30, "skipped catch-up surfaced");
607        assert!(!outcome.is_zero_rpo());
608        assert_eq!(
609            outcome
610                .timeline_history
611                .ancestor_lsn(reddb_file::TimelineId(2)),
612            Some(70),
613            "forced promotion forks where the target actually reached"
614        );
615        assert!(
616            outcome.waited <= Duration::from_millis(60),
617            "completes within the timeout window",
618        );
619        assert_eq!(outcome.roles.new_primary, NodeRole::Primary { term: 5 });
620    }
621
622    #[test]
623    fn force_still_takes_fast_path_when_target_already_caught_up() {
624        let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
625        let req = request(
626            FailoverMode::Force {
627                timeout: Duration::from_millis(50),
628            },
629            120,
630        );
631
632        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean forced handover");
633
634        assert!(!outcome.forced, "no force needed when already caught up");
635        assert_eq!(outcome.skipped_lsn, 0);
636        assert!(outcome.is_zero_rpo());
637    }
638
639    #[test]
640    fn force_that_catches_up_in_time_skips_nothing() {
641        let mut tx = FakeTransport::new(100, vec![90, 100], Duration::from_millis(10));
642        let req = request(
643            FailoverMode::Force {
644                timeout: Duration::from_secs(5),
645            },
646            50,
647        );
648
649        let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover catches up");
650
651        assert!(!outcome.forced);
652        assert_eq!(outcome.skipped_lsn, 0);
653        assert!(outcome.is_zero_rpo());
654    }
655}