1use std::time::Duration;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum NodeRole {
46 Primary { term: u64 },
48 Replica { primary_addr: String, term: u64 },
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FailoverNode {
55 pub id: String,
57 pub addr: String,
59 pub region: String,
61}
62
63impl FailoverNode {
64 pub fn new(id: impl Into<String>, addr: impl Into<String>, region: impl Into<String>) -> Self {
65 Self {
66 id: id.into(),
67 addr: addr.into(),
68 region: region.into(),
69 }
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum FailoverMode {
76 Coordinated { catch_up_deadline: Duration },
81 Force { timeout: Duration },
84}
85
86impl FailoverMode {
87 fn deadline(self) -> Duration {
90 match self {
91 FailoverMode::Coordinated { catch_up_deadline } => catch_up_deadline,
92 FailoverMode::Force { timeout } => timeout,
93 }
94 }
95
96 fn is_force(self) -> bool {
97 matches!(self, FailoverMode::Force { .. })
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct FailoverRequest {
104 pub old_primary: FailoverNode,
106 pub target: FailoverNode,
108 pub current_term: u64,
111 pub target_frontier_hint: u64,
115 pub timeline_history: reddb_file::TimelineHistory,
119 pub mode: FailoverMode,
121}
122
123impl FailoverRequest {
124 pub fn new_term(&self) -> u64 {
126 self.current_term + 1
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq)]
134pub struct RoleAssignment {
135 pub new_primary: NodeRole,
137 pub old_primary: NodeRole,
139}
140
141impl RoleAssignment {
142 fn swap(req: &FailoverRequest) -> Self {
143 let new_term = req.new_term();
144 Self {
145 new_primary: NodeRole::Primary { term: new_term },
146 old_primary: NodeRole::Replica {
147 primary_addr: req.target.addr.clone(),
148 term: new_term,
149 },
150 }
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct FailoverOutcome {
157 pub new_term: u64,
159 pub frontier_lsn: u64,
162 pub reached_lsn: u64,
166 pub skipped_lsn: u64,
169 pub forced: bool,
171 pub waited: Duration,
173 pub roles: RoleAssignment,
175 pub timeline_history: reddb_file::TimelineHistory,
177}
178
179impl FailoverOutcome {
180 pub fn is_zero_rpo(&self) -> bool {
183 self.skipped_lsn == 0
184 }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
189pub enum FailoverError {
190 CatchUpTimedOut {
195 frontier_lsn: u64,
196 reached_lsn: u64,
197 waited: Duration,
198 },
199 TimelineHistory(String),
202}
203
204impl std::fmt::Display for FailoverError {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 match self {
207 FailoverError::CatchUpTimedOut {
208 frontier_lsn,
209 reached_lsn,
210 waited,
211 } => write!(
212 f,
213 "coordinated failover aborted: target reached LSN {reached_lsn} of {frontier_lsn} \
214 after {waited:?}; writes resumed on the old primary, no write lost",
215 ),
216 FailoverError::TimelineHistory(message) => {
217 write!(f, "failover timeline history error: {message}")
218 }
219 }
220 }
221}
222
223impl std::error::Error for FailoverError {}
224
225pub trait FailoverTransport {
232 fn freeze_primary(&mut self) -> u64;
237
238 fn resume_primary(&mut self);
241
242 fn elapsed(&self) -> Duration;
245
246 fn poll_target_frontier(&mut self) -> u64;
250
251 fn commit_handover(&mut self, new_term: u64);
255}
256
257pub struct FailoverCoordinator;
259
260impl FailoverCoordinator {
261 pub fn run(
271 req: &FailoverRequest,
272 tx: &mut dyn FailoverTransport,
273 ) -> Result<FailoverOutcome, FailoverError> {
274 let new_term = req.new_term();
275 let frontier = tx.freeze_primary();
276
277 if req.target_frontier_hint >= frontier {
280 let timeline_history = Self::promoted_timeline_history(req, frontier)?;
281 tx.commit_handover(new_term);
282 return Ok(Self::clean_outcome(
283 req,
284 frontier,
285 frontier,
286 Duration::ZERO,
287 timeline_history,
288 ));
289 }
290
291 let deadline = req.mode.deadline();
294 let mut reached = req.target_frontier_hint;
295 while tx.elapsed() < deadline {
296 reached = tx.poll_target_frontier();
297 if reached >= frontier {
298 let waited = tx.elapsed();
299 let timeline_history = Self::promoted_timeline_history(req, reached)?;
300 tx.commit_handover(new_term);
301 return Ok(Self::clean_outcome(
302 req,
303 frontier,
304 reached,
305 waited,
306 timeline_history,
307 ));
308 }
309 }
310
311 let waited = tx.elapsed();
315 if req.mode.is_force() {
316 let timeline_history = Self::promoted_timeline_history(req, reached)?;
317 tx.commit_handover(new_term);
318 Ok(FailoverOutcome {
319 new_term,
320 frontier_lsn: frontier,
321 reached_lsn: reached,
322 skipped_lsn: frontier.saturating_sub(reached),
323 forced: true,
324 waited,
325 roles: RoleAssignment::swap(req),
326 timeline_history,
327 })
328 } else {
329 tx.resume_primary();
330 Err(FailoverError::CatchUpTimedOut {
331 frontier_lsn: frontier,
332 reached_lsn: reached,
333 waited,
334 })
335 }
336 }
337
338 fn clean_outcome(
339 req: &FailoverRequest,
340 frontier: u64,
341 reached: u64,
342 waited: Duration,
343 timeline_history: reddb_file::TimelineHistory,
344 ) -> FailoverOutcome {
345 FailoverOutcome {
346 new_term: req.new_term(),
347 frontier_lsn: frontier,
348 reached_lsn: reached,
349 skipped_lsn: 0,
350 forced: false,
351 waited,
352 roles: RoleAssignment::swap(req),
353 timeline_history,
354 }
355 }
356
357 fn promoted_timeline_history(
358 req: &FailoverRequest,
359 reached_lsn: u64,
360 ) -> Result<reddb_file::TimelineHistory, FailoverError> {
361 let parent = req
362 .timeline_history
363 .current()
364 .unwrap_or_else(reddb_file::TimelineId::initial);
365 let candidate = reddb_file::PromotionCandidate {
366 replica_id: req.target.id.clone(),
367 timeline: parent,
368 received_lsn: reached_lsn,
369 flushed_lsn: reached_lsn,
370 applied_lsn: reached_lsn,
371 };
372 req.timeline_history
373 .promotion_history(&candidate, parent.next(), crate::utils::now_unix_millis())
374 .map_err(|err| FailoverError::TimelineHistory(err.to_string()))
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 struct FakeTransport {
387 frontier: u64,
388 readings: std::collections::VecDeque<u64>,
389 stuck_at: u64,
392 elapsed: Duration,
393 tick: Duration,
394 froze: bool,
395 resumed: bool,
396 committed_term: Option<u64>,
397 }
398
399 impl FakeTransport {
400 fn new(frontier: u64, readings: Vec<u64>, tick: Duration) -> Self {
401 let stuck_at = readings.last().copied().unwrap_or(0);
402 Self {
403 frontier,
404 readings: readings.into(),
405 stuck_at,
406 elapsed: Duration::ZERO,
407 tick,
408 froze: false,
409 resumed: false,
410 committed_term: None,
411 }
412 }
413 }
414
415 impl FailoverTransport for FakeTransport {
416 fn freeze_primary(&mut self) -> u64 {
417 self.froze = true;
418 self.frontier
419 }
420 fn resume_primary(&mut self) {
421 self.resumed = true;
422 }
423 fn elapsed(&self) -> Duration {
424 self.elapsed
425 }
426 fn poll_target_frontier(&mut self) -> u64 {
427 self.elapsed += self.tick;
428 self.readings.pop_front().unwrap_or(self.stuck_at)
429 }
430 fn commit_handover(&mut self, new_term: u64) {
431 self.committed_term = Some(new_term);
432 }
433 }
434
435 fn request(mode: FailoverMode, hint: u64) -> FailoverRequest {
436 FailoverRequest {
437 old_primary: FailoverNode::new("n1", "http://n1:50051", "us-east"),
438 target: FailoverNode::new("n2", "http://n2:50051", "us-west"),
439 current_term: 4,
440 target_frontier_hint: hint,
441 timeline_history: reddb_file::TimelineHistory::new(10),
442 mode,
443 }
444 }
445
446 #[test]
447 fn fast_path_hands_over_without_waiting_when_target_already_caught_up() {
448 let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
449 let req = request(
450 FailoverMode::Coordinated {
451 catch_up_deadline: Duration::from_secs(5),
452 },
453 100,
454 );
455
456 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
457
458 assert!(tx.froze, "writes must be paused");
459 assert_eq!(tx.committed_term, Some(5), "new term handed over");
460 assert!(!tx.resumed, "no abort on a clean handover");
461 assert_eq!(outcome.waited, Duration::ZERO, "fast path does not wait");
462 assert!(outcome.is_zero_rpo());
463 assert_eq!(outcome.skipped_lsn, 0);
464 assert_eq!(
465 outcome.timeline_history.current(),
466 Some(reddb_file::TimelineId(2))
467 );
468 assert_eq!(
469 outcome
470 .timeline_history
471 .ancestor_lsn(reddb_file::TimelineId(2)),
472 Some(100)
473 );
474 }
475
476 #[test]
477 fn coordinated_waits_then_hands_over_when_target_catches_up() {
478 let mut tx = FakeTransport::new(100, vec![60, 80, 100], Duration::from_millis(10));
480 let req = request(
481 FailoverMode::Coordinated {
482 catch_up_deadline: Duration::from_secs(5),
483 },
484 50,
485 );
486
487 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
488
489 assert_eq!(tx.committed_term, Some(5));
490 assert!(!tx.resumed);
491 assert_eq!(outcome.new_term, 5);
492 assert_eq!(outcome.frontier_lsn, 100);
493 assert_eq!(outcome.reached_lsn, 100);
494 assert!(outcome.is_zero_rpo(), "no write lost in a clean handover");
495 assert_eq!(outcome.waited, Duration::from_millis(30));
496 assert_eq!(
497 outcome.roles.new_primary,
498 NodeRole::Primary { term: 5 },
499 "new primary advertises the new term",
500 );
501 assert_eq!(
502 outcome.roles.old_primary,
503 NodeRole::Replica {
504 primary_addr: "http://n2:50051".to_string(),
505 term: 5,
506 },
507 "old primary streams as a replica of the new primary",
508 );
509 }
510
511 #[test]
512 fn coordinated_handover_appends_second_timeline_promotion() {
513 let mut tx = FakeTransport::new(200, vec![], Duration::from_millis(10));
514 let mut req = request(
515 FailoverMode::Coordinated {
516 catch_up_deadline: Duration::from_secs(5),
517 },
518 200,
519 );
520 req.current_term = 5;
521 req.timeline_history
522 .fork(
523 reddb_file::TimelineId(2),
524 reddb_file::TimelineId(1),
525 100,
526 20,
527 "promote replica-a",
528 )
529 .expect("seed previous promotion");
530
531 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("second handover");
532
533 assert_eq!(outcome.new_term, 6);
534 assert_eq!(
535 outcome.timeline_history.current(),
536 Some(reddb_file::TimelineId(3))
537 );
538 assert_eq!(
539 outcome
540 .timeline_history
541 .ancestor_lsn(reddb_file::TimelineId(2)),
542 Some(100)
543 );
544 assert_eq!(
545 outcome
546 .timeline_history
547 .ancestor_lsn(reddb_file::TimelineId(3)),
548 Some(200)
549 );
550 let chain = outcome
551 .timeline_history
552 .descendant_chain_from(reddb_file::TimelineId(1))
553 .expect("chain from initial timeline");
554 assert_eq!(chain.len(), 2);
555 assert_eq!(chain[0].timeline, reddb_file::TimelineId(2));
556 assert_eq!(chain[1].timeline, reddb_file::TimelineId(3));
557 }
558
559 #[test]
560 fn coordinated_aborts_and_resumes_when_target_never_catches_up() {
561 let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
564 let req = request(
565 FailoverMode::Coordinated {
566 catch_up_deadline: Duration::from_millis(50),
567 },
568 50,
569 );
570
571 let err = FailoverCoordinator::run(&req, &mut tx).expect_err("must abort");
572
573 assert!(tx.resumed, "writes must resume on the old primary");
574 assert_eq!(tx.committed_term, None, "no term handed over on abort");
575 match err {
576 FailoverError::CatchUpTimedOut {
577 frontier_lsn,
578 reached_lsn,
579 ..
580 } => {
581 assert_eq!(frontier_lsn, 100);
582 assert_eq!(reached_lsn, 70);
583 }
584 FailoverError::TimelineHistory(err) => panic!("unexpected timeline error: {err}"),
585 }
586 }
587
588 #[test]
589 fn force_completes_within_timeout_surfacing_skipped_catch_up() {
590 let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
592 let req = request(
593 FailoverMode::Force {
594 timeout: Duration::from_millis(50),
595 },
596 50,
597 );
598
599 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover");
600
601 assert!(!tx.resumed, "forced handover does not abort");
602 assert_eq!(tx.committed_term, Some(5), "term handed over under force");
603 assert!(outcome.forced);
604 assert_eq!(outcome.frontier_lsn, 100);
605 assert_eq!(outcome.reached_lsn, 70);
606 assert_eq!(outcome.skipped_lsn, 30, "skipped catch-up surfaced");
607 assert!(!outcome.is_zero_rpo());
608 assert_eq!(
609 outcome
610 .timeline_history
611 .ancestor_lsn(reddb_file::TimelineId(2)),
612 Some(70),
613 "forced promotion forks where the target actually reached"
614 );
615 assert!(
616 outcome.waited <= Duration::from_millis(60),
617 "completes within the timeout window",
618 );
619 assert_eq!(outcome.roles.new_primary, NodeRole::Primary { term: 5 });
620 }
621
622 #[test]
623 fn force_still_takes_fast_path_when_target_already_caught_up() {
624 let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
625 let req = request(
626 FailoverMode::Force {
627 timeout: Duration::from_millis(50),
628 },
629 120,
630 );
631
632 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean forced handover");
633
634 assert!(!outcome.forced, "no force needed when already caught up");
635 assert_eq!(outcome.skipped_lsn, 0);
636 assert!(outcome.is_zero_rpo());
637 }
638
639 #[test]
640 fn force_that_catches_up_in_time_skips_nothing() {
641 let mut tx = FakeTransport::new(100, vec![90, 100], Duration::from_millis(10));
642 let req = request(
643 FailoverMode::Force {
644 timeout: Duration::from_secs(5),
645 },
646 50,
647 );
648
649 let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover catches up");
650
651 assert!(!outcome.forced);
652 assert_eq!(outcome.skipped_lsn, 0);
653 assert!(outcome.is_zero_rpo());
654 }
655}