use super::passive::PassiveHandle;
use super::primary::ReplicatorConfig;
use super::protocol::FencingToken;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use tokio::sync::watch;
use tracing::{error, info};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeRole {
Primary,
Passive,
Promoting,
Standalone,
}
impl std::fmt::Display for NodeRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NodeRole::Primary => write!(f, "primary"),
NodeRole::Passive => write!(f, "passive"),
NodeRole::Promoting => write!(f, "promoting"),
NodeRole::Standalone => write!(f, "standalone"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FailoverState {
Normal,
InProgress,
Completed,
Failed,
}
#[derive(Debug, Clone)]
pub struct FailoverConfig {
pub node_id: String,
pub initial_role: NodeRole,
pub auto_promote: bool,
pub promotion_delay_ms: u64,
pub replicator_config: Option<ReplicatorConfig>,
}
impl Default for FailoverConfig {
fn default() -> Self {
Self {
node_id: "node".to_string(),
initial_role: NodeRole::Standalone,
auto_promote: true,
promotion_delay_ms: 5000,
replicator_config: None,
}
}
}
pub struct FailoverCoordinator {
config: FailoverConfig,
role: Arc<RwLock<NodeRole>>,
failover_state: Arc<RwLock<FailoverState>>,
fencing_token: AtomicU64,
role_tx: watch::Sender<NodeRole>,
role_rx: watch::Receiver<NodeRole>,
}
impl FailoverCoordinator {
pub fn new(config: FailoverConfig) -> Self {
let (role_tx, role_rx) = watch::channel(config.initial_role);
Self {
role: Arc::new(RwLock::new(config.initial_role)),
failover_state: Arc::new(RwLock::new(FailoverState::Normal)),
fencing_token: AtomicU64::new(0),
role_tx,
role_rx,
config,
}
}
pub fn role(&self) -> NodeRole {
self.role.read().map(|r| *r).unwrap_or(NodeRole::Standalone)
}
pub fn failover_state(&self) -> FailoverState {
self.failover_state.read().map(|s| *s).unwrap_or(FailoverState::Normal)
}
pub fn fencing_token(&self) -> u64 {
self.fencing_token.load(Ordering::Acquire)
}
pub fn subscribe_role_changes(&self) -> watch::Receiver<NodeRole> {
self.role_rx.clone()
}
pub async fn promote(&self, passive_handle: &PassiveHandle) -> Result<u64, FailoverError> {
let current_role = self.role();
if current_role != NodeRole::Passive {
return Err(FailoverError::InvalidRoleTransition {
from: current_role,
to: NodeRole::Primary,
});
}
{
let mut state = self.failover_state.write()
.map_err(|_| FailoverError::LockPoisoned)?;
*state = FailoverState::InProgress;
}
info!("Starting promotion to primary");
tokio::time::sleep(std::time::Duration::from_millis(self.config.promotion_delay_ms)).await;
let new_token = self.fencing_token.fetch_add(1, Ordering::AcqRel) + 1;
let integrity = passive_handle.integrity();
let validators = passive_handle.validator_states();
info!(
sequence = integrity.sequence_number,
validators = validators.len(),
fencing_token = new_token,
"Promotion state captured"
);
{
let mut role = self.role.write()
.map_err(|_| FailoverError::LockPoisoned)?;
*role = NodeRole::Primary;
}
let _ = self.role_tx.send(NodeRole::Primary);
{
let mut state = self.failover_state.write()
.map_err(|_| FailoverError::LockPoisoned)?;
*state = FailoverState::Completed;
}
info!(
fencing_token = new_token,
"Successfully promoted to primary"
);
crate::metrics::set_node_role("primary");
Ok(new_token)
}
pub fn demote(&self) -> Result<(), FailoverError> {
let current_role = self.role();
if current_role != NodeRole::Primary {
return Err(FailoverError::InvalidRoleTransition {
from: current_role,
to: NodeRole::Passive,
});
}
info!("Demoting to passive");
{
let mut role = self.role.write()
.map_err(|_| FailoverError::LockPoisoned)?;
*role = NodeRole::Passive;
}
let _ = self.role_tx.send(NodeRole::Passive);
crate::metrics::set_node_role("passive");
Ok(())
}
pub fn set_role(&self, new_role: NodeRole) -> Result<(), FailoverError> {
{
let mut role = self.role.write()
.map_err(|_| FailoverError::LockPoisoned)?;
*role = new_role;
}
let _ = self.role_tx.send(new_role);
crate::metrics::set_node_role(&new_role.to_string());
Ok(())
}
pub fn is_primary(&self) -> bool {
self.role() == NodeRole::Primary
}
pub fn is_passive(&self) -> bool {
self.role() == NodeRole::Passive
}
pub fn validate_fencing_token(&self, token: &FencingToken) -> bool {
let our_token = self.fencing_token.load(Ordering::Acquire);
token.token >= our_token
}
}
#[derive(Debug, thiserror::Error)]
pub enum FailoverError {
#[error("Invalid role transition from {from} to {to}")]
InvalidRoleTransition { from: NodeRole, to: NodeRole },
#[error("Lock poisoned")]
LockPoisoned,
#[error("Fencing token rejected")]
FencingTokenRejected,
#[error("Promotion failed: {0}")]
PromotionFailed(String),
}
pub fn start_auto_failover(
coordinator: Arc<FailoverCoordinator>,
passive_handle: PassiveHandle,
) -> watch::Receiver<bool> {
let (done_tx, done_rx) = watch::channel(false);
let mut failover_rx = passive_handle.subscribe_failover();
tokio::spawn(async move {
loop {
if failover_rx.changed().await.is_err() {
break;
}
if *failover_rx.borrow() {
info!("Failover detected, attempting promotion");
match coordinator.promote(&passive_handle).await {
Ok(token) => {
info!(fencing_token = token, "Promotion successful");
let _ = done_tx.send(true);
break;
}
Err(e) => {
error!(error = %e, "Promotion failed");
}
}
}
}
});
done_rx
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_coordinator_creation() {
let config = FailoverConfig {
node_id: "test-node".to_string(),
initial_role: NodeRole::Passive,
..Default::default()
};
let coordinator = FailoverCoordinator::new(config);
assert_eq!(coordinator.role(), NodeRole::Passive);
assert_eq!(coordinator.failover_state(), FailoverState::Normal);
}
#[test]
fn test_role_change() {
let config = FailoverConfig {
initial_role: NodeRole::Standalone,
..Default::default()
};
let coordinator = FailoverCoordinator::new(config);
assert_eq!(coordinator.role(), NodeRole::Standalone);
coordinator.set_role(NodeRole::Primary).unwrap();
assert_eq!(coordinator.role(), NodeRole::Primary);
assert!(coordinator.is_primary());
}
#[test]
fn test_fencing_token_validation() {
let config = FailoverConfig::default();
let coordinator = FailoverCoordinator::new(config);
let token = FencingToken::new(1, "node-a");
assert!(coordinator.validate_fencing_token(&token));
coordinator.fencing_token.store(5, Ordering::Release);
assert!(!coordinator.validate_fencing_token(&token));
let newer_token = FencingToken::new(5, "node-b");
assert!(coordinator.validate_fencing_token(&newer_token));
}
}