use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeRole {
Primary { term: u64 },
Replica { primary_addr: String, term: u64 },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailoverNode {
pub id: String,
pub addr: String,
pub region: String,
}
impl FailoverNode {
pub fn new(id: impl Into<String>, addr: impl Into<String>, region: impl Into<String>) -> Self {
Self {
id: id.into(),
addr: addr.into(),
region: region.into(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FailoverMode {
Coordinated { catch_up_deadline: Duration },
Force { timeout: Duration },
}
impl FailoverMode {
fn deadline(self) -> Duration {
match self {
FailoverMode::Coordinated { catch_up_deadline } => catch_up_deadline,
FailoverMode::Force { timeout } => timeout,
}
}
fn is_force(self) -> bool {
matches!(self, FailoverMode::Force { .. })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailoverRequest {
pub old_primary: FailoverNode,
pub target: FailoverNode,
pub current_term: u64,
pub target_frontier_hint: u64,
pub mode: FailoverMode,
}
impl FailoverRequest {
pub fn new_term(&self) -> u64 {
self.current_term + 1
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoleAssignment {
pub new_primary: NodeRole,
pub old_primary: NodeRole,
}
impl RoleAssignment {
fn swap(req: &FailoverRequest) -> Self {
let new_term = req.new_term();
Self {
new_primary: NodeRole::Primary { term: new_term },
old_primary: NodeRole::Replica {
primary_addr: req.target.addr.clone(),
term: new_term,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailoverOutcome {
pub new_term: u64,
pub frontier_lsn: u64,
pub reached_lsn: u64,
pub skipped_lsn: u64,
pub forced: bool,
pub waited: Duration,
pub roles: RoleAssignment,
}
impl FailoverOutcome {
pub fn is_zero_rpo(&self) -> bool {
self.skipped_lsn == 0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FailoverError {
CatchUpTimedOut {
frontier_lsn: u64,
reached_lsn: u64,
waited: Duration,
},
}
impl std::fmt::Display for FailoverError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FailoverError::CatchUpTimedOut {
frontier_lsn,
reached_lsn,
waited,
} => write!(
f,
"coordinated failover aborted: target reached LSN {reached_lsn} of {frontier_lsn} \
after {waited:?}; writes resumed on the old primary, no write lost",
),
}
}
}
impl std::error::Error for FailoverError {}
pub trait FailoverTransport {
fn freeze_primary(&mut self) -> u64;
fn resume_primary(&mut self);
fn elapsed(&self) -> Duration;
fn poll_target_frontier(&mut self) -> u64;
fn commit_handover(&mut self, new_term: u64);
}
pub struct FailoverCoordinator;
impl FailoverCoordinator {
pub fn run(
req: &FailoverRequest,
tx: &mut dyn FailoverTransport,
) -> Result<FailoverOutcome, FailoverError> {
let new_term = req.new_term();
let frontier = tx.freeze_primary();
if req.target_frontier_hint >= frontier {
tx.commit_handover(new_term);
return Ok(Self::clean_outcome(req, frontier, frontier, Duration::ZERO));
}
let deadline = req.mode.deadline();
let mut reached = req.target_frontier_hint;
while tx.elapsed() < deadline {
reached = tx.poll_target_frontier();
if reached >= frontier {
let waited = tx.elapsed();
tx.commit_handover(new_term);
return Ok(Self::clean_outcome(req, frontier, reached, waited));
}
}
let waited = tx.elapsed();
if req.mode.is_force() {
tx.commit_handover(new_term);
Ok(FailoverOutcome {
new_term,
frontier_lsn: frontier,
reached_lsn: reached,
skipped_lsn: frontier.saturating_sub(reached),
forced: true,
waited,
roles: RoleAssignment::swap(req),
})
} else {
tx.resume_primary();
Err(FailoverError::CatchUpTimedOut {
frontier_lsn: frontier,
reached_lsn: reached,
waited,
})
}
}
fn clean_outcome(
req: &FailoverRequest,
frontier: u64,
reached: u64,
waited: Duration,
) -> FailoverOutcome {
FailoverOutcome {
new_term: req.new_term(),
frontier_lsn: frontier,
reached_lsn: reached,
skipped_lsn: 0,
forced: false,
waited,
roles: RoleAssignment::swap(req),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
struct FakeTransport {
frontier: u64,
readings: std::collections::VecDeque<u64>,
stuck_at: u64,
elapsed: Duration,
tick: Duration,
froze: bool,
resumed: bool,
committed_term: Option<u64>,
}
impl FakeTransport {
fn new(frontier: u64, readings: Vec<u64>, tick: Duration) -> Self {
let stuck_at = readings.last().copied().unwrap_or(0);
Self {
frontier,
readings: readings.into(),
stuck_at,
elapsed: Duration::ZERO,
tick,
froze: false,
resumed: false,
committed_term: None,
}
}
}
impl FailoverTransport for FakeTransport {
fn freeze_primary(&mut self) -> u64 {
self.froze = true;
self.frontier
}
fn resume_primary(&mut self) {
self.resumed = true;
}
fn elapsed(&self) -> Duration {
self.elapsed
}
fn poll_target_frontier(&mut self) -> u64 {
self.elapsed += self.tick;
self.readings.pop_front().unwrap_or(self.stuck_at)
}
fn commit_handover(&mut self, new_term: u64) {
self.committed_term = Some(new_term);
}
}
fn request(mode: FailoverMode, hint: u64) -> FailoverRequest {
FailoverRequest {
old_primary: FailoverNode::new("n1", "http://n1:50051", "us-east"),
target: FailoverNode::new("n2", "http://n2:50051", "us-west"),
current_term: 4,
target_frontier_hint: hint,
mode,
}
}
#[test]
fn fast_path_hands_over_without_waiting_when_target_already_caught_up() {
let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
let req = request(
FailoverMode::Coordinated {
catch_up_deadline: Duration::from_secs(5),
},
100,
);
let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
assert!(tx.froze, "writes must be paused");
assert_eq!(tx.committed_term, Some(5), "new term handed over");
assert!(!tx.resumed, "no abort on a clean handover");
assert_eq!(outcome.waited, Duration::ZERO, "fast path does not wait");
assert!(outcome.is_zero_rpo());
assert_eq!(outcome.skipped_lsn, 0);
}
#[test]
fn coordinated_waits_then_hands_over_when_target_catches_up() {
let mut tx = FakeTransport::new(100, vec![60, 80, 100], Duration::from_millis(10));
let req = request(
FailoverMode::Coordinated {
catch_up_deadline: Duration::from_secs(5),
},
50,
);
let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean handover");
assert_eq!(tx.committed_term, Some(5));
assert!(!tx.resumed);
assert_eq!(outcome.new_term, 5);
assert_eq!(outcome.frontier_lsn, 100);
assert_eq!(outcome.reached_lsn, 100);
assert!(outcome.is_zero_rpo(), "no write lost in a clean handover");
assert_eq!(outcome.waited, Duration::from_millis(30));
assert_eq!(
outcome.roles.new_primary,
NodeRole::Primary { term: 5 },
"new primary advertises the new term",
);
assert_eq!(
outcome.roles.old_primary,
NodeRole::Replica {
primary_addr: "http://n2:50051".to_string(),
term: 5,
},
"old primary streams as a replica of the new primary",
);
}
#[test]
fn coordinated_aborts_and_resumes_when_target_never_catches_up() {
let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
let req = request(
FailoverMode::Coordinated {
catch_up_deadline: Duration::from_millis(50),
},
50,
);
let err = FailoverCoordinator::run(&req, &mut tx).expect_err("must abort");
assert!(tx.resumed, "writes must resume on the old primary");
assert_eq!(tx.committed_term, None, "no term handed over on abort");
match err {
FailoverError::CatchUpTimedOut {
frontier_lsn,
reached_lsn,
..
} => {
assert_eq!(frontier_lsn, 100);
assert_eq!(reached_lsn, 70);
}
}
}
#[test]
fn force_completes_within_timeout_surfacing_skipped_catch_up() {
let mut tx = FakeTransport::new(100, vec![60, 65, 70], Duration::from_millis(10));
let req = request(
FailoverMode::Force {
timeout: Duration::from_millis(50),
},
50,
);
let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover");
assert!(!tx.resumed, "forced handover does not abort");
assert_eq!(tx.committed_term, Some(5), "term handed over under force");
assert!(outcome.forced);
assert_eq!(outcome.frontier_lsn, 100);
assert_eq!(outcome.reached_lsn, 70);
assert_eq!(outcome.skipped_lsn, 30, "skipped catch-up surfaced");
assert!(!outcome.is_zero_rpo());
assert!(
outcome.waited <= Duration::from_millis(60),
"completes within the timeout window",
);
assert_eq!(outcome.roles.new_primary, NodeRole::Primary { term: 5 });
}
#[test]
fn force_still_takes_fast_path_when_target_already_caught_up() {
let mut tx = FakeTransport::new(100, vec![], Duration::from_millis(10));
let req = request(
FailoverMode::Force {
timeout: Duration::from_millis(50),
},
120,
);
let outcome = FailoverCoordinator::run(&req, &mut tx).expect("clean forced handover");
assert!(!outcome.forced, "no force needed when already caught up");
assert_eq!(outcome.skipped_lsn, 0);
assert!(outcome.is_zero_rpo());
}
#[test]
fn force_that_catches_up_in_time_skips_nothing() {
let mut tx = FakeTransport::new(100, vec![90, 100], Duration::from_millis(10));
let req = request(
FailoverMode::Force {
timeout: Duration::from_secs(5),
},
50,
);
let outcome = FailoverCoordinator::run(&req, &mut tx).expect("forced handover catches up");
assert!(!outcome.forced);
assert_eq!(outcome.skipped_lsn, 0);
assert!(outcome.is_zero_rpo());
}
}