reddb_server/replication/failover.rs
1//! Coordinated zero-RPO failover (issue #833, PRD #819).
2//!
3//! Drives a *planned* primary handover so that no acknowledged write is
4//! lost. The flow is the classic coordinated switchover:
5//!
6//! 1. **Freeze writes** on the current primary and capture its frontier
7//! LSN at the instant writes stopped. No new LSN is minted after the
8//! freeze, so the frontier is a *fixed* catch-up target.
9//! 2. **Wait the target replica to the frontier** — poll the target's
10//! acknowledged (durable) frontier until it covers the frozen LSN.
11//! 3. **Hand over the term** — mint `current_term + 1` and stamp it on
12//! the target, promoting it to primary.
13//! 4. **Demote the old primary** to a replica that streams from the new
14//! primary under the new term.
15//!
16//! ## Two modes
17//!
18//! * [`FailoverMode::Coordinated`] is the zero-RPO path. If the target
19//! cannot reach the frontier before `catch_up_deadline`, the handover
20//! **aborts**: writes resume on the old primary and nothing is
21//! committed on the target, so the cluster keeps serving and no
22//! acknowledged write is lost (issue #833 criterion 1).
23//! * [`FailoverMode::Force`] is the emergency path. It still tries to
24//! reach the frontier, but on `timeout` it completes the handover
25//! anyway, surfacing the *skipped catch-up* — the un-replicated LSN
26//! gap between the frozen frontier and the target's reached frontier
27//! (issue #833 criterion 2).
28//!
29//! ## Module shape
30//!
31//! [`FailoverCoordinator::run`] is a pure state machine. The clock and
32//! the cluster mutations (freeze, resume, poll, commit) are injected
33//! behind [`FailoverTransport`], so the whole flow is exercised
34//! deterministically with a scripted fake — no clock, no network, no
35//! engine dependency. The post-handover roles are returned in the
36//! outcome ([`RoleAssignment`]) so a caller can assert that the new
37//! primary advertises the new term and the old primary streams as a
38//! replica (issue #833 criterion 3). Wiring the transport to the real
39//! WAL frontier and the gRPC role-swap is left to the transport layer.
40
41use std::time::Duration;
42
43/// The replication role a node plays after a failover step.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum NodeRole {
46 /// Accepts writes under `term`, streams WAL to replicas.
47 Primary { term: u64 },
48 /// Read-only, streams WAL from `primary_addr` under `term`.
49 Replica { primary_addr: String, term: u64 },
50}
51
52/// A node participating in a failover.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FailoverNode {
55 /// Stable node identifier (matches the replica registry id).
56 pub id: String,
57 /// Dial address other nodes use to reach this node.
58 pub addr: String,
59 /// Region/fault-domain identifier.
60 pub region: String,
61}
62
63impl FailoverNode {
64 pub fn new(id: impl Into<String>, addr: impl Into<String>, region: impl Into<String>) -> Self {
65 Self {
66 id: id.into(),
67 addr: addr.into(),
68 region: region.into(),
69 }
70 }
71}
72
73/// How a failover should be executed.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum FailoverMode {
76 /// Zero-RPO coordinated handover. The target MUST reach the frozen
77 /// frontier within `catch_up_deadline`; otherwise the handover
78 /// aborts and writes resume on the old primary. No acknowledged
79 /// write is ever lost.
80 Coordinated { catch_up_deadline: Duration },
81 /// Emergency handover. Tries to reach the frontier but completes
82 /// within `timeout` regardless, surfacing the skipped catch-up.
83 Force { timeout: Duration },
84}
85
86impl FailoverMode {
87 /// Upper bound on how long the catch-up wait may run before the
88 /// mode's terminal behaviour (abort vs. force) kicks in.
89 fn deadline(self) -> Duration {
90 match self {
91 FailoverMode::Coordinated { catch_up_deadline } => catch_up_deadline,
92 FailoverMode::Force { timeout } => timeout,
93 }
94 }
95
96 fn is_force(self) -> bool {
97 matches!(self, FailoverMode::Force { .. })
98 }
99}
100
101/// A request to hand the primary role from `old_primary` to `target`.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct FailoverRequest {
104 /// The node currently serving as primary.
105 pub old_primary: FailoverNode,
106 /// The replica being promoted.
107 pub target: FailoverNode,
108 /// The replication term the cluster is serving *now*. The handover
109 /// mints `current_term + 1`.
110 pub current_term: u64,
111 /// Last known acknowledged frontier of the target (from the replica
112 /// registry). Lets the coordinator take a no-wait fast path when the
113 /// target is already caught up at freeze time.
114 pub target_frontier_hint: u64,
115 /// Coordinated vs. forced execution.
116 pub mode: FailoverMode,
117}
118
119impl FailoverRequest {
120 /// The term the cluster serves *after* a successful handover.
121 pub fn new_term(&self) -> u64 {
122 self.current_term + 1
123 }
124}
125
126/// Post-handover roles of the two nodes, used to assert that the new
127/// primary advertises the new term and the old primary streams as a
128/// replica (issue #833 criterion 3).
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct RoleAssignment {
131 /// The promoted target — now primary under the new term.
132 pub new_primary: NodeRole,
133 /// The demoted old primary — now a replica of the new primary.
134 pub old_primary: NodeRole,
135}
136
137impl RoleAssignment {
138 fn swap(req: &FailoverRequest) -> Self {
139 let new_term = req.new_term();
140 Self {
141 new_primary: NodeRole::Primary { term: new_term },
142 old_primary: NodeRole::Replica {
143 primary_addr: req.target.addr.clone(),
144 term: new_term,
145 },
146 }
147 }
148}
149
150/// The result of a completed handover.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct FailoverOutcome {
153 /// The term the new primary now serves.
154 pub new_term: u64,
155 /// The primary frontier frozen at the moment writes were paused —
156 /// the catch-up target.
157 pub frontier_lsn: u64,
158 /// The target's acknowledged frontier at the moment the term was
159 /// handed over. Equals `frontier_lsn` for a clean handover; may be
160 /// below it for a forced one.
161 pub reached_lsn: u64,
162 /// `frontier_lsn - reached_lsn` — acknowledged-but-not-yet-replicated
163 /// LSNs skipped by a forced handover. Always `0` for a clean one.
164 pub skipped_lsn: u64,
165 /// Whether the handover had to be forced past an un-caught-up target.
166 pub forced: bool,
167 /// How long the catch-up wait ran before the term was handed over.
168 pub waited: Duration,
169 /// Post-handover roles of the two nodes.
170 pub roles: RoleAssignment,
171}
172
173impl FailoverOutcome {
174 /// True when the target reached the full frontier — a true zero-RPO
175 /// handover with nothing skipped.
176 pub fn is_zero_rpo(&self) -> bool {
177 self.skipped_lsn == 0
178 }
179}
180
181/// Why a coordinated failover could not complete without losing writes.
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub enum FailoverError {
184 /// A coordinated handover aborted because the target could not reach
185 /// the frozen frontier before the deadline. Writes have been resumed
186 /// on the old primary; no acknowledged write was lost. Use
187 /// [`FailoverMode::Force`] to hand over anyway.
188 CatchUpTimedOut {
189 frontier_lsn: u64,
190 reached_lsn: u64,
191 waited: Duration,
192 },
193}
194
195impl std::fmt::Display for FailoverError {
196 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 match self {
198 FailoverError::CatchUpTimedOut {
199 frontier_lsn,
200 reached_lsn,
201 waited,
202 } => write!(
203 f,
204 "coordinated failover aborted: target reached LSN {reached_lsn} of {frontier_lsn} \
205 after {waited:?}; writes resumed on the old primary, no write lost",
206 ),
207 }
208 }
209}
210
211impl std::error::Error for FailoverError {}
212
213/// Cluster mutations and the clock the coordinator drives, injected so
214/// the state machine stays pure and deterministically testable.
215///
216/// Implementors back these onto the real WAL frontier, the replica
217/// registry, and the gRPC role-swap in production; tests back them onto
218/// a scripted fake.
219pub trait FailoverTransport {
220 /// Pause writes on the current primary and return the frontier LSN
221 /// (`current_lsn`) frozen at the instant writes stopped. After this
222 /// returns, no new LSN is minted, so the returned value is a fixed
223 /// catch-up target.
224 fn freeze_primary(&mut self) -> u64;
225
226 /// Resume writes on the old primary. Called only when a coordinated
227 /// handover aborts, so the cluster keeps serving with no lost write.
228 fn resume_primary(&mut self);
229
230 /// Time elapsed since the failover began, so the coordinator can
231 /// enforce the deadline without owning a clock.
232 fn elapsed(&self) -> Duration;
233
234 /// Block for one poll interval (clamped by the caller's remaining
235 /// deadline in spirit), then return the target replica's current
236 /// acknowledged (durable) frontier LSN.
237 fn poll_target_frontier(&mut self) -> u64;
238
239 /// Commit the role swap: stamp `new_term` on the target (promoting
240 /// it to primary) and reconfigure the old primary to stream as a
241 /// replica of the new primary under `new_term`.
242 fn commit_handover(&mut self, new_term: u64);
243}
244
245/// The coordinated zero-RPO failover state machine.
246pub struct FailoverCoordinator;
247
248impl FailoverCoordinator {
249 /// Execute the handover described by `req`, driving the cluster
250 /// through `tx`.
251 ///
252 /// Returns `Ok(FailoverOutcome)` once the term has been handed over
253 /// (cleanly, or forced past a lagging target). Returns
254 /// `Err(FailoverError::CatchUpTimedOut)` only for a *coordinated*
255 /// handover whose target never caught up — in which case writes have
256 /// already been resumed on the old primary and nothing was committed
257 /// on the target.
258 pub fn run(
259 req: &FailoverRequest,
260 tx: &mut dyn FailoverTransport,
261 ) -> Result<FailoverOutcome, FailoverError> {
262 let new_term = req.new_term();
263 let frontier = tx.freeze_primary();
264
265 // Fast path: the target was already at/past the frontier when we
266 // froze. Hand over immediately, no wait.
267 if req.target_frontier_hint >= frontier {
268 tx.commit_handover(new_term);
269 return Ok(Self::clean_outcome(req, frontier, frontier, Duration::ZERO));
270 }
271
272 // Bounded wait: poll the target's live frontier until it covers
273 // the frozen LSN or the deadline elapses.
274 let deadline = req.mode.deadline();
275 let mut reached = req.target_frontier_hint;
276 while tx.elapsed() < deadline {
277 reached = tx.poll_target_frontier();
278 if reached >= frontier {
279 let waited = tx.elapsed();
280 tx.commit_handover(new_term);
281 return Ok(Self::clean_outcome(req, frontier, reached, waited));
282 }
283 }
284
285 // Deadline blown. A coordinated handover aborts (resume writes,
286 // lose nothing); a forced one hands over anyway and surfaces the
287 // skipped catch-up.
288 let waited = tx.elapsed();
289 if req.mode.is_force() {
290 tx.commit_handover(new_term);
291 Ok(FailoverOutcome {
292 new_term,
293 frontier_lsn: frontier,
294 reached_lsn: reached,
295 skipped_lsn: frontier.saturating_sub(reached),
296 forced: true,
297 waited,
298 roles: RoleAssignment::swap(req),
299 })
300 } else {
301 tx.resume_primary();
302 Err(FailoverError::CatchUpTimedOut {
303 frontier_lsn: frontier,
304 reached_lsn: reached,
305 waited,
306 })
307 }
308 }
309
310 fn clean_outcome(
311 req: &FailoverRequest,
312 frontier: u64,
313 reached: u64,
314 waited: Duration,
315 ) -> FailoverOutcome {
316 FailoverOutcome {
317 new_term: req.new_term(),
318 frontier_lsn: frontier,
319 reached_lsn: reached,
320 skipped_lsn: 0,
321 forced: false,
322 waited,
323 roles: RoleAssignment::swap(req),
324 }
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 /// Scripted transport: a fixed frozen frontier and a queue of target
333 /// frontier readings consumed one per poll. `tick` advances the fake
334 /// clock by a fixed step on every poll so deadlines are exercised
335 /// deterministically.
336 struct FakeTransport {
337 frontier: u64,
338 readings: std::collections::VecDeque<u64>,
339 /// Last reading repeated once the script is exhausted (replica
340 /// stuck behind).
341 stuck_at: u64,
342 elapsed: Duration,
343 tick: Duration,
344 froze: bool,
345 resumed: bool,
346 committed_term: Option<u64>,
347 }
348
349 impl FakeTransport {
350 fn new(frontier: u64, readings: Vec<u64>, tick: Duration) -> Self {
351 let stuck_at = readings.last().copied().unwrap_or(0);
352 Self {
353 frontier,
354 readings: readings.into(),
355 stuck_at,
356 elapsed: Duration::ZERO,
357 tick,
358 froze: false,
359 resumed: false,
360 committed_term: None,
361 }
362 }
363 }
364
365 impl FailoverTransport for FakeTransport {
366 fn freeze_primary(&mut self) -> u64 {
367 self.froze = true;
368 self.frontier
369 }
370 fn resume_primary(&mut self) {
371 self.resumed = true;
372 }
373 fn elapsed(&self) -> Duration {
374 self.elapsed
375 }
376 fn poll_target_frontier(&mut self) -> u64 {
377 self.elapsed += self.tick;
378 self.readings.pop_front().unwrap_or(self.stuck_at)
379 }
380 fn commit_handover(&mut self, new_term: u64) {
381 self.committed_term = Some(new_term);
382 }
383 }
384
385 fn request(mode: FailoverMode, hint: u64) -> FailoverRequest {
386 FailoverRequest {
387 old_primary: FailoverNode::new("n1", "http://n1:50051", "us-east"),
388 target: FailoverNode::new("n2", "http://n2:50051", "us-west"),
389 current_term: 4,
390 target_frontier_hint: hint,
391 mode,
392 }
393 }
394
395 #[test]
396 fn fast_path_hands_over_without_waiting_when_target_already_caught_up() {
397 let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
398 let req = request(
399 FailoverMode::Coordinated {
400 catch_up_deadline: Duration::from_secs(5),
401 },
402 100,
403 );
404
405 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
406
407 assert!(tx.froze, "writes must be paused");
408 assert_eq!(tx.committed_term, Some(5), "new term handed over");
409 assert!(!tx.resumed, "no abort on a clean handover");
410 assert_eq!(outcome.waited, Duration::ZERO, "fast path does not wait");
411 assert!(outcome.is_zero_rpo());
412 assert_eq!(outcome.skipped_lsn, 0);
413 }
414
415 #[test]
416 fn coordinated_waits_then_hands_over_when_target_catches_up() {
417 // Target climbs 60 -> 80 -> 100, reaching the frontier on poll 3.
418 let mut tx = FakeTransport::new(100, vec![60, 80, 100], Duration::from_millis(10));
419 let req = request(
420 FailoverMode::Coordinated {
421 catch_up_deadline: Duration::from_secs(5),
422 },
423 50,
424 );
425
426 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
427
428 assert_eq!(tx.committed_term, Some(5));
429 assert!(!tx.resumed);
430 assert_eq!(outcome.new_term, 5);
431 assert_eq!(outcome.frontier_lsn, 100);
432 assert_eq!(outcome.reached_lsn, 100);
433 assert!(outcome.is_zero_rpo(), "no write lost in a clean handover");
434 assert_eq!(outcome.waited, Duration::from_millis(30));
435 assert_eq!(
436 outcome.roles.new_primary,
437 NodeRole::Primary { term: 5 },
438 "new primary advertises the new term",
439 );
440 assert_eq!(
441 outcome.roles.old_primary,
442 NodeRole::Replica {
443 primary_addr: "http://n2:50051".to_string(),
444 term: 5,
445 },
446 "old primary streams as a replica of the new primary",
447 );
448 }
449
450 #[test]
451 fn coordinated_aborts_and_resumes_when_target_never_catches_up() {
452 // Target stalls at 70, never reaching the frontier of 100 before
453 // the 50ms deadline (5 polls at 10ms).
454 let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
455 let req = request(
456 FailoverMode::Coordinated {
457 catch_up_deadline: Duration::from_millis(50),
458 },
459 50,
460 );
461
462 let err = FailoverCoordinator::run(&req, &mut tx).expect_err("must abort");
463
464 assert!(tx.resumed, "writes must resume on the old primary");
465 assert_eq!(tx.committed_term, None, "no term handed over on abort");
466 match err {
467 FailoverError::CatchUpTimedOut {
468 frontier_lsn,
469 reached_lsn,
470 ..
471 } => {
472 assert_eq!(frontier_lsn, 100);
473 assert_eq!(reached_lsn, 70);
474 }
475 }
476 }
477
478 #[test]
479 fn force_completes_within_timeout_surfacing_skipped_catch_up() {
480 // Target stalls at 70; FORCE hands over anyway after the timeout.
481 let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
482 let req = request(
483 FailoverMode::Force {
484 timeout: Duration::from_millis(50),
485 },
486 50,
487 );
488
489 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover");
490
491 assert!(!tx.resumed, "forced handover does not abort");
492 assert_eq!(tx.committed_term, Some(5), "term handed over under force");
493 assert!(outcome.forced);
494 assert_eq!(outcome.frontier_lsn, 100);
495 assert_eq!(outcome.reached_lsn, 70);
496 assert_eq!(outcome.skipped_lsn, 30, "skipped catch-up surfaced");
497 assert!(!outcome.is_zero_rpo());
498 assert!(
499 outcome.waited <= Duration::from_millis(60),
500 "completes within the timeout window",
501 );
502 assert_eq!(outcome.roles.new_primary, NodeRole::Primary { term: 5 });
503 }
504
505 #[test]
506 fn force_still_takes_fast_path_when_target_already_caught_up() {
507 let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
508 let req = request(
509 FailoverMode::Force {
510 timeout: Duration::from_millis(50),
511 },
512 120,
513 );
514
515 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean forced handover");
516
517 assert!(!outcome.forced, "no force needed when already caught up");
518 assert_eq!(outcome.skipped_lsn, 0);
519 assert!(outcome.is_zero_rpo());
520 }
521
522 #[test]
523 fn force_that_catches_up_in_time_skips_nothing() {
524 let mut tx = FakeTransport::new(100, vec![90, 100], Duration::from_millis(10));
525 let req = request(
526 FailoverMode::Force {
527 timeout: Duration::from_secs(5),
528 },
529 50,
530 );
531
532 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover catches up");
533
534 assert!(!outcome.forced);
535 assert_eq!(outcome.skipped_lsn, 0);
536 assert!(outcome.is_zero_rpo());
537 }
538}