use std::sync::atomic::{AtomicU8, Ordering};
use parking_lot::RwLock;
use tokio::sync::broadcast;
use uuid::Uuid;
use crate::{Result, Error};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum NodeRole {
Primary = 0,
Standby = 1,
TransitioningToPrimary = 2,
TransitioningToStandby = 3,
Draining = 4,
CatchingUp = 5,
Offline = 6,
}
impl NodeRole {
pub fn from_u8(value: u8) -> Option<Self> {
match value {
0 => Some(NodeRole::Primary),
1 => Some(NodeRole::Standby),
2 => Some(NodeRole::TransitioningToPrimary),
3 => Some(NodeRole::TransitioningToStandby),
4 => Some(NodeRole::Draining),
5 => Some(NodeRole::CatchingUp),
6 => Some(NodeRole::Offline),
_ => None,
}
}
pub fn can_write(&self) -> bool {
matches!(self, NodeRole::Primary)
}
pub fn can_read(&self) -> bool {
matches!(self, NodeRole::Primary | NodeRole::Standby)
}
pub fn is_transitioning(&self) -> bool {
matches!(
self,
NodeRole::TransitioningToPrimary
| NodeRole::TransitioningToStandby
| NodeRole::Draining
| NodeRole::CatchingUp
)
}
pub fn as_str(&self) -> &'static str {
match self {
NodeRole::Primary => "primary",
NodeRole::Standby => "standby",
NodeRole::TransitioningToPrimary => "transitioning_to_primary",
NodeRole::TransitioningToStandby => "transitioning_to_standby",
NodeRole::Draining => "draining",
NodeRole::CatchingUp => "catching_up",
NodeRole::Offline => "offline",
}
}
}
impl std::fmt::Display for NodeRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct RoleChangeEvent {
pub node_id: Uuid,
pub old_role: NodeRole,
pub new_role: NodeRole,
pub timestamp: std::time::Instant,
pub reason: RoleChangeReason,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RoleChangeReason {
Switchover,
Switchback,
Failover,
ManualPromotion,
ManualDemotion,
ClusterFormation,
Rejoin,
}
impl RoleChangeReason {
pub fn as_str(&self) -> &'static str {
match self {
RoleChangeReason::Switchover => "switchover",
RoleChangeReason::Switchback => "switchback",
RoleChangeReason::Failover => "failover",
RoleChangeReason::ManualPromotion => "manual_promotion",
RoleChangeReason::ManualDemotion => "manual_demotion",
RoleChangeReason::ClusterFormation => "cluster_formation",
RoleChangeReason::Rejoin => "rejoin",
}
}
}
#[derive(Debug, Clone)]
pub struct SwitchoverState {
pub switchover_id: Uuid,
pub source_node: Uuid,
pub target_node: Uuid,
pub phase: SwitchoverPhase,
pub started_at: std::time::Instant,
pub target_lsn: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SwitchoverPhase {
Preparation,
Synchronization,
RoleChange,
Reconfiguration,
Resumption,
Completed,
Failed,
Cancelled,
}
impl SwitchoverPhase {
pub fn as_str(&self) -> &'static str {
match self {
SwitchoverPhase::Preparation => "preparation",
SwitchoverPhase::Synchronization => "synchronization",
SwitchoverPhase::RoleChange => "role_change",
SwitchoverPhase::Reconfiguration => "reconfiguration",
SwitchoverPhase::Resumption => "resumption",
SwitchoverPhase::Completed => "completed",
SwitchoverPhase::Failed => "failed",
SwitchoverPhase::Cancelled => "cancelled",
}
}
pub fn is_terminal(&self) -> bool {
matches!(
self,
SwitchoverPhase::Completed | SwitchoverPhase::Failed | SwitchoverPhase::Cancelled
)
}
}
pub struct RoleManager {
node_id: Uuid,
role: AtomicU8,
current_primary: RwLock<Option<Uuid>>,
switchover_state: RwLock<Option<SwitchoverState>>,
role_change_tx: broadcast::Sender<RoleChangeEvent>,
role_history: RwLock<Vec<RoleChangeEvent>>,
max_history: usize,
}
impl RoleManager {
pub fn new(node_id: Uuid, initial_role: NodeRole) -> Self {
let (role_change_tx, _) = broadcast::channel(64);
Self {
node_id,
role: AtomicU8::new(initial_role as u8),
current_primary: RwLock::new(if initial_role == NodeRole::Primary {
Some(node_id)
} else {
None
}),
switchover_state: RwLock::new(None),
role_change_tx,
role_history: RwLock::new(Vec::new()),
max_history: 1000,
}
}
pub fn node_id(&self) -> Uuid {
self.node_id
}
pub fn role(&self) -> NodeRole {
NodeRole::from_u8(self.role.load(Ordering::SeqCst))
.unwrap_or(NodeRole::Offline)
}
pub fn is_primary(&self) -> bool {
self.role() == NodeRole::Primary
}
pub fn is_standby(&self) -> bool {
self.role() == NodeRole::Standby
}
pub fn is_switchover_in_progress(&self) -> bool {
self.switchover_state.read().is_some()
}
pub fn current_primary(&self) -> Option<Uuid> {
*self.current_primary.read()
}
pub fn set_current_primary(&self, primary_id: Option<Uuid>) {
*self.current_primary.write() = primary_id;
}
pub fn subscribe(&self) -> broadcast::Receiver<RoleChangeEvent> {
self.role_change_tx.subscribe()
}
pub fn switchover_state(&self) -> Option<SwitchoverState> {
self.switchover_state.read().clone()
}
pub fn change_role(&self, new_role: NodeRole, reason: RoleChangeReason) -> Result<()> {
let old_role = self.role();
self.validate_transition(old_role, new_role)?;
self.role.store(new_role as u8, Ordering::SeqCst);
if new_role == NodeRole::Primary {
*self.current_primary.write() = Some(self.node_id);
}
let event = RoleChangeEvent {
node_id: self.node_id,
old_role,
new_role,
timestamp: std::time::Instant::now(),
reason,
};
{
let mut history = self.role_history.write();
history.push(event.clone());
if history.len() > self.max_history {
history.remove(0);
}
}
let _ = self.role_change_tx.send(event);
tracing::info!(
"Role changed: {} -> {} (reason: {})",
old_role,
new_role,
reason.as_str()
);
Ok(())
}
fn validate_transition(&self, from: NodeRole, to: NodeRole) -> Result<()> {
let valid = match (from, to) {
(NodeRole::Primary, NodeRole::Draining) => true,
(NodeRole::Primary, NodeRole::TransitioningToStandby) => true,
(NodeRole::Primary, NodeRole::Offline) => true,
(NodeRole::Standby, NodeRole::CatchingUp) => true,
(NodeRole::Standby, NodeRole::TransitioningToPrimary) => true,
(NodeRole::Standby, NodeRole::Offline) => true,
(NodeRole::Draining, NodeRole::TransitioningToStandby) => true,
(NodeRole::Draining, NodeRole::Primary) => true,
(NodeRole::CatchingUp, NodeRole::TransitioningToPrimary) => true,
(NodeRole::CatchingUp, NodeRole::Standby) => true,
(NodeRole::TransitioningToPrimary, NodeRole::Primary) => true,
(NodeRole::TransitioningToPrimary, NodeRole::Standby) => true,
(NodeRole::TransitioningToStandby, NodeRole::Standby) => true,
(NodeRole::TransitioningToStandby, NodeRole::Primary) => true,
(NodeRole::Offline, NodeRole::Primary) => true,
(NodeRole::Offline, NodeRole::Standby) => true,
(a, b) if a == b => true,
_ => false,
};
if valid {
Ok(())
} else {
Err(Error::ha(format!(
"Invalid role transition: {} -> {}",
from, to
)))
}
}
pub fn begin_switchover(&self, target_node: Uuid) -> Result<Uuid> {
let mut state = self.switchover_state.write();
if state.is_some() {
return Err(Error::ha("Switchover already in progress"));
}
if self.role() != NodeRole::Primary {
return Err(Error::ha("Only primary can initiate switchover"));
}
let switchover_id = Uuid::new_v4();
*state = Some(SwitchoverState {
switchover_id,
source_node: self.node_id,
target_node,
phase: SwitchoverPhase::Preparation,
started_at: std::time::Instant::now(),
target_lsn: None,
});
tracing::info!(
"Switchover {} started: {} -> {}",
switchover_id,
self.node_id,
target_node
);
Ok(switchover_id)
}
pub fn advance_switchover_phase(&self, new_phase: SwitchoverPhase) -> Result<()> {
let mut state = self.switchover_state.write();
if let Some(ref mut s) = *state {
tracing::info!(
"Switchover {} advancing: {} -> {}",
s.switchover_id,
s.phase.as_str(),
new_phase.as_str()
);
s.phase = new_phase;
if new_phase.is_terminal() {
drop(state);
*self.switchover_state.write() = None;
}
Ok(())
} else {
Err(Error::ha("No switchover in progress"))
}
}
pub fn set_switchover_target_lsn(&self, lsn: u64) -> Result<()> {
let mut state = self.switchover_state.write();
if let Some(ref mut s) = *state {
s.target_lsn = Some(lsn);
Ok(())
} else {
Err(Error::ha("No switchover in progress"))
}
}
pub fn cancel_switchover(&self) -> Result<()> {
let mut state = self.switchover_state.write();
if let Some(ref s) = *state {
tracing::warn!("Switchover {} cancelled", s.switchover_id);
*state = None;
Ok(())
} else {
Err(Error::ha("No switchover in progress"))
}
}
pub fn role_history(&self) -> Vec<RoleChangeEvent> {
self.role_history.read().clone()
}
pub fn promote_to_primary(&self, reason: RoleChangeReason) -> Result<()> {
let current = self.role();
if !matches!(current, NodeRole::CatchingUp | NodeRole::TransitioningToPrimary | NodeRole::Standby) {
return Err(Error::ha(format!(
"Cannot promote from role: {}",
current
)));
}
self.change_role(NodeRole::Primary, reason)
}
pub fn demote_to_standby(&self, reason: RoleChangeReason) -> Result<()> {
let current = self.role();
if !matches!(current, NodeRole::Draining | NodeRole::TransitioningToStandby | NodeRole::Primary) {
return Err(Error::ha(format!(
"Cannot demote from role: {}",
current
)));
}
self.change_role(NodeRole::Standby, reason)
}
}
impl std::fmt::Debug for RoleManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoleManager")
.field("node_id", &self.node_id)
.field("role", &self.role())
.field("current_primary", &self.current_primary())
.field("switchover_in_progress", &self.is_switchover_in_progress())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_role_transitions() {
let node_id = Uuid::new_v4();
let manager = RoleManager::new(node_id, NodeRole::Standby);
assert_eq!(manager.role(), NodeRole::Standby);
assert!(!manager.is_primary());
assert!(manager.is_standby());
manager.change_role(NodeRole::CatchingUp, RoleChangeReason::Switchover).unwrap();
assert_eq!(manager.role(), NodeRole::CatchingUp);
manager.change_role(NodeRole::TransitioningToPrimary, RoleChangeReason::Switchover).unwrap();
manager.change_role(NodeRole::Primary, RoleChangeReason::Switchover).unwrap();
assert!(manager.is_primary());
assert_eq!(manager.current_primary(), Some(node_id));
}
#[test]
fn test_invalid_transition() {
let node_id = Uuid::new_v4();
let manager = RoleManager::new(node_id, NodeRole::Standby);
let result = manager.change_role(NodeRole::Primary, RoleChangeReason::ManualPromotion);
}
#[test]
fn test_switchover_lifecycle() {
let node_id = Uuid::new_v4();
let target_id = Uuid::new_v4();
let manager = RoleManager::new(node_id, NodeRole::Primary);
let switchover_id = manager.begin_switchover(target_id).unwrap();
assert!(manager.is_switchover_in_progress());
manager.advance_switchover_phase(SwitchoverPhase::Synchronization).unwrap();
manager.advance_switchover_phase(SwitchoverPhase::RoleChange).unwrap();
manager.advance_switchover_phase(SwitchoverPhase::Reconfiguration).unwrap();
manager.advance_switchover_phase(SwitchoverPhase::Resumption).unwrap();
manager.advance_switchover_phase(SwitchoverPhase::Completed).unwrap();
assert!(!manager.is_switchover_in_progress());
}
}