use crate::cluster::{ClusterError, ClusterResult};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, Mutex};
use tokio::time::{interval, timeout};
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailoverConfig {
pub health_check_interval_secs: u64,
pub node_timeout_secs: u64,
pub max_failed_nodes: usize,
pub enable_auto_leader_election: bool,
pub leader_election_timeout_ms: u64,
pub enable_auto_recovery: bool,
pub recovery_timeout_ms: u64,
pub max_recovery_attempts: u32,
pub enable_backup_promotion: bool,
pub backup_promotion_delay_ms: u64,
}
impl Default for FailoverConfig {
fn default() -> Self {
Self {
health_check_interval_secs: 10,
node_timeout_secs: 30,
max_failed_nodes: 2,
enable_auto_leader_election: true,
leader_election_timeout_ms: 5000,
enable_auto_recovery: true,
recovery_timeout_ms: 30000,
max_recovery_attempts: 3,
enable_backup_promotion: true,
backup_promotion_delay_ms: 5000,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeHealthStatus {
Healthy,
Degraded,
Failed,
Recovering,
Maintenance,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailoverEvent {
pub event_id: Uuid,
pub event_type: FailoverEventType,
pub node_id: Uuid,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub description: String,
pub severity: FailoverSeverity,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FailoverEventType {
NodeFailure,
LeaderFailure,
NetworkPartition,
DataInconsistency,
RecoveryInitiated,
RecoveryCompleted,
LeaderElectionInitiated,
LeaderElectionCompleted,
BackupPromoted,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FailoverSeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FailoverMessage {
HealthCheckRequest {
requester_id: Uuid,
timestamp: chrono::DateTime<chrono::Utc>,
},
HealthCheckResponse {
responder_id: Uuid,
status: NodeHealthStatus,
timestamp: chrono::DateTime<chrono::Utc>,
metrics: HashMap<String, String>,
},
FailoverNotification {
event: FailoverEvent,
},
LeaderElectionRequest {
candidate_id: Uuid,
term: u64,
timestamp: chrono::DateTime<chrono::Utc>,
},
LeaderElectionResponse {
voter_id: Uuid,
candidate_id: Uuid,
vote_granted: bool,
term: u64,
timestamp: chrono::DateTime<chrono::Utc>,
},
RecoveryRequest {
failed_node_id: Uuid,
recovery_type: RecoveryType,
timestamp: chrono::DateTime<chrono::Utc>,
},
RecoveryResponse {
request_id: Uuid,
success: bool,
message: String,
timestamp: chrono::DateTime<chrono::Utc>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RecoveryType {
FullRecovery,
DataRecovery,
ConfigRecovery,
ServiceRestart,
}
#[derive(Debug, thiserror::Error)]
pub enum FailoverError {
#[error("Node failure detected: {0}")]
NodeFailure(Uuid),
#[error("Leader election failed: {0}")]
LeaderElectionFailed(String),
#[error("Recovery failed: {0}")]
RecoveryFailed(String),
#[error("Network partition detected")]
NetworkPartition,
#[error("Insufficient healthy nodes: {0}/{1}")]
InsufficientHealthyNodes(usize, usize),
#[error("Backup promotion failed: {0}")]
BackupPromotionFailed(String),
#[error("Failover timeout")]
FailoverTimeout,
#[error("Invalid failover configuration: {0}")]
InvalidConfiguration(String),
}
pub struct FailoverManager {
node_id: Uuid,
config: FailoverConfig,
cluster_nodes: Arc<RwLock<HashSet<Uuid>>>,
node_health: Arc<RwLock<HashMap<Uuid, NodeHealthStatus>>>,
failover_events: Arc<RwLock<Vec<FailoverEvent>>>,
current_leader: Arc<RwLock<Option<Uuid>>>,
callbacks: Arc<Mutex<Vec<Box<dyn FailoverCallback + Send + Sync>>>>,
network_sender: Arc<Mutex<dyn FailoverNetworkSender + Send + Sync>>,
recovery_manager: Arc<Mutex<dyn RecoveryManager + Send + Sync>>,
}
#[async_trait::async_trait]
pub trait FailoverCallback {
async fn on_node_failure(&self, node_id: Uuid, event: &FailoverEvent);
async fn on_leader_election(&self, new_leader: Uuid, event: &FailoverEvent);
async fn on_recovery_initiated(&self, node_id: Uuid, event: &FailoverEvent);
async fn on_recovery_completed(&self, node_id: Uuid, event: &FailoverEvent);
async fn on_backup_promotion(&self, backup_node: Uuid, event: &FailoverEvent);
}
#[async_trait::async_trait]
pub trait FailoverNetworkSender {
async fn send_failover_message(&self, target: Uuid, message: FailoverMessage) -> ClusterResult<()>;
async fn broadcast_failover_message(&self, message: FailoverMessage) -> ClusterResult<()>;
}
#[async_trait::async_trait]
pub trait RecoveryManager {
async fn initiate_recovery(&self, node_id: Uuid, recovery_type: RecoveryType) -> ClusterResult<()>;
async fn check_recovery_status(&self, node_id: Uuid) -> ClusterResult<Option<RecoveryStatus>>;
async fn promote_backup(&self, backup_node: Uuid) -> ClusterResult<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoveryStatus {
pub node_id: Uuid,
pub recovery_type: RecoveryType,
pub progress: u8,
pub status: NodeHealthStatus,
pub estimated_time_remaining: Option<Duration>,
pub message: String,
}
impl FailoverManager {
pub fn new(
node_id: Uuid,
config: FailoverConfig,
network_sender: Arc<Mutex<dyn FailoverNetworkSender + Send + Sync>>,
recovery_manager: Arc<Mutex<dyn RecoveryManager + Send + Sync>>,
) -> Self {
Self {
node_id,
config,
cluster_nodes: Arc::new(RwLock::new(HashSet::new())),
node_health: Arc::new(RwLock::new(HashMap::new())),
failover_events: Arc::new(RwLock::new(Vec::new())),
current_leader: Arc::new(RwLock::new(None)),
callbacks: Arc::new(Mutex::new(Vec::new())),
network_sender,
recovery_manager,
}
}
pub async fn start(&self) -> ClusterResult<()> {
info!("Starting failover manager for node {}", self.node_id);
let failover = self.clone();
tokio::spawn(async move {
failover.health_check_loop().await;
});
let failover = self.clone();
tokio::spawn(async move {
failover.event_processing_loop().await;
});
Ok(())
}
async fn health_check_loop(&self) {
let mut interval = interval(Duration::from_secs(self.config.health_check_interval_secs));
loop {
interval.tick().await;
if let Err(e) = self.perform_health_checks().await {
error!("Health check failed: {}", e);
}
}
}
async fn event_processing_loop(&self) {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(e) = self.process_failover_events().await {
error!("Event processing failed: {}", e);
}
}
}
async fn perform_health_checks(&self) -> ClusterResult<()> {
let cluster_nodes = self.cluster_nodes.read().await;
let nodes_to_check: Vec<Uuid> = cluster_nodes.iter().cloned().collect();
drop(cluster_nodes);
let mut failed_nodes = Vec::new();
for node_id in nodes_to_check {
if let Err(e) = self.check_node_health(node_id).await {
warn!("Health check failed for node {}: {}", node_id, e);
failed_nodes.push(node_id);
}
}
for node_id in failed_nodes {
self.handle_node_failure(node_id).await?;
}
Ok(())
}
async fn check_node_health(&self, node_id: Uuid) -> ClusterResult<()> {
let health_request = FailoverMessage::HealthCheckRequest {
requester_id: self.node_id,
timestamp: chrono::Utc::now(),
};
let sender = self.network_sender.lock().await;
let timeout_duration = Duration::from_secs(self.config.node_timeout_secs);
match timeout(timeout_duration, sender.send_failover_message(node_id, health_request)).await {
Ok(Ok(())) => {
let mut health = self.node_health.write().await;
health.insert(node_id, NodeHealthStatus::Healthy);
Ok(())
}
Ok(Err(_e)) => {
let mut health = self.node_health.write().await;
health.insert(node_id, NodeHealthStatus::Failed);
Err(ClusterError::Failover(FailoverError::NodeFailure(node_id)))
}
Err(_) => {
let mut health = self.node_health.write().await;
health.insert(node_id, NodeHealthStatus::Failed);
Err(ClusterError::Failover(FailoverError::FailoverTimeout))
}
}
}
async fn handle_node_failure(&self, node_id: Uuid) -> ClusterResult<()> {
warn!("Handling node failure for {}", node_id);
let event = FailoverEvent {
event_id: Uuid::new_v4(),
event_type: FailoverEventType::NodeFailure,
node_id,
timestamp: chrono::Utc::now(),
description: format!("Node {} has failed", node_id),
severity: FailoverSeverity::High,
metadata: HashMap::new(),
};
{
let mut events = self.failover_events.write().await;
events.push(event.clone());
}
let is_leader = {
let leader = self.current_leader.read().await;
*leader == Some(node_id)
};
if is_leader {
self.handle_leader_failure(node_id, &event).await?;
}
if self.config.enable_auto_recovery {
self.initiate_recovery(node_id, RecoveryType::FullRecovery, &event).await?;
}
let callbacks = self.callbacks.lock().await;
for callback in callbacks.iter() {
callback.on_node_failure(node_id, &event).await;
}
Ok(())
}
async fn handle_leader_failure(&self, failed_leader: Uuid, event: &FailoverEvent) -> ClusterResult<()> {
error!("Leader {} has failed, initiating leader election", failed_leader);
{
let mut leader = self.current_leader.write().await;
*leader = None;
}
if self.config.enable_auto_leader_election {
self.initiate_leader_election(event).await?;
}
Ok(())
}
async fn initiate_leader_election(&self, _failure_event: &FailoverEvent) -> ClusterResult<()> {
info!("Initiating leader election");
let election_event = FailoverEvent {
event_id: Uuid::new_v4(),
event_type: FailoverEventType::LeaderElectionInitiated,
node_id: self.node_id,
timestamp: chrono::Utc::now(),
description: "Leader election initiated due to leader failure".to_string(),
severity: FailoverSeverity::Critical,
metadata: HashMap::new(),
};
{
let mut events = self.failover_events.write().await;
events.push(election_event.clone());
}
let election_request = FailoverMessage::LeaderElectionRequest {
candidate_id: self.node_id,
term: self.get_current_term().await,
timestamp: chrono::Utc::now(),
};
let sender = self.network_sender.lock().await;
sender.broadcast_failover_message(election_request).await?;
Ok(())
}
async fn get_current_term(&self) -> u64 {
1
}
async fn initiate_recovery(&self, node_id: Uuid, recovery_type: RecoveryType, _failure_event: &FailoverEvent) -> ClusterResult<()> {
info!("Initiating recovery for node {}", node_id);
let recovery_event = FailoverEvent {
event_id: Uuid::new_v4(),
event_type: FailoverEventType::RecoveryInitiated,
node_id,
timestamp: chrono::Utc::now(),
description: format!("Recovery initiated for node {}", node_id),
severity: FailoverSeverity::Medium,
metadata: HashMap::new(),
};
{
let mut events = self.failover_events.write().await;
events.push(recovery_event.clone());
}
{
let mut health = self.node_health.write().await;
health.insert(node_id, NodeHealthStatus::Recovering);
}
let recovery_manager = self.recovery_manager.lock().await;
recovery_manager.initiate_recovery(node_id, recovery_type).await?;
let callbacks = self.callbacks.lock().await;
for callback in callbacks.iter() {
callback.on_recovery_initiated(node_id, &recovery_event).await;
}
Ok(())
}
async fn process_failover_events(&self) -> ClusterResult<()> {
let events_to_process: Vec<FailoverEvent> = {
let mut events = self.failover_events.write().await;
events.drain(..).collect()
};
for event in events_to_process {
match event.event_type {
FailoverEventType::NodeFailure => {
self.check_cluster_health().await?;
}
FailoverEventType::LeaderFailure => {
self.check_cluster_health().await?;
}
FailoverEventType::RecoveryInitiated => {
self.monitor_recovery_progress(event.node_id).await?;
}
_ => {
}
}
}
Ok(())
}
async fn check_cluster_health(&self) -> ClusterResult<()> {
let health = self.node_health.read().await;
let cluster_nodes = self.cluster_nodes.read().await;
let healthy_nodes = health.values()
.filter(|&status| *status == NodeHealthStatus::Healthy)
.count();
let total_nodes = cluster_nodes.len();
drop(health);
drop(cluster_nodes);
if healthy_nodes < total_nodes - self.config.max_failed_nodes {
error!("Cluster health critical: {}/{} nodes healthy", healthy_nodes, total_nodes);
self.trigger_emergency_measures(healthy_nodes, total_nodes).await?;
}
Ok(())
}
async fn trigger_emergency_measures(&self, healthy_nodes: usize, total_nodes: usize) -> ClusterResult<()> {
warn!("Triggering emergency measures: {}/{} nodes healthy", healthy_nodes, total_nodes);
if self.config.enable_backup_promotion {
self.promote_backup_nodes().await?;
}
let emergency_event = FailoverEvent {
event_id: Uuid::new_v4(),
event_type: FailoverEventType::NetworkPartition,
node_id: self.node_id,
timestamp: chrono::Utc::now(),
description: format!("Emergency measures triggered: {}/{} nodes healthy", healthy_nodes, total_nodes),
severity: FailoverSeverity::Critical,
metadata: HashMap::new(),
};
{
let mut events = self.failover_events.write().await;
events.push(emergency_event);
}
Ok(())
}
async fn promote_backup_nodes(&self) -> ClusterResult<()> {
info!("Promoting backup nodes");
let health = self.node_health.read().await;
let cluster_nodes = self.cluster_nodes.read().await;
for node_id in cluster_nodes.iter() {
if let Some(status) = health.get(node_id) {
if *status == NodeHealthStatus::Healthy {
let promotion_event = FailoverEvent {
event_id: Uuid::new_v4(),
event_type: FailoverEventType::BackupPromoted,
node_id: *node_id,
timestamp: chrono::Utc::now(),
description: format!("Backup node {} promoted", node_id),
severity: FailoverSeverity::Medium,
metadata: HashMap::new(),
};
{
let mut events = self.failover_events.write().await;
events.push(promotion_event.clone());
}
let callbacks = self.callbacks.lock().await;
for callback in callbacks.iter() {
callback.on_backup_promotion(*node_id, &promotion_event).await;
}
}
}
}
Ok(())
}
async fn monitor_recovery_progress(&self, node_id: Uuid) -> ClusterResult<()> {
let recovery_manager = self.recovery_manager.lock().await;
match recovery_manager.check_recovery_status(node_id).await {
Ok(Some(status)) => {
if status.status == NodeHealthStatus::Healthy {
let completion_event = FailoverEvent {
event_id: Uuid::new_v4(),
event_type: FailoverEventType::RecoveryCompleted,
node_id,
timestamp: chrono::Utc::now(),
description: format!("Recovery completed for node {}", node_id),
severity: FailoverSeverity::Low,
metadata: HashMap::new(),
};
{
let mut health = self.node_health.write().await;
health.insert(node_id, NodeHealthStatus::Healthy);
}
{
let mut events = self.failover_events.write().await;
events.push(completion_event.clone());
}
let callbacks = self.callbacks.lock().await;
for callback in callbacks.iter() {
callback.on_recovery_completed(node_id, &completion_event).await;
}
}
}
Ok(None) => {
}
Err(e) => {
error!("Failed to check recovery status for node {}: {}", node_id, e);
}
}
Ok(())
}
pub async fn handle_message(&self, _source: Uuid, message: FailoverMessage) -> ClusterResult<Option<FailoverMessage>> {
match message {
FailoverMessage::HealthCheckRequest { requester_id, timestamp } => {
self.handle_health_check_request(requester_id, timestamp).await
}
FailoverMessage::HealthCheckResponse { responder_id, status, timestamp, metrics } => {
self.handle_health_check_response(responder_id, status, timestamp, metrics).await?;
Ok(None)
}
FailoverMessage::FailoverNotification { event } => {
self.handle_failover_notification(event).await?;
Ok(None)
}
FailoverMessage::LeaderElectionRequest { candidate_id, term, timestamp } => {
self.handle_leader_election_request(candidate_id, term, timestamp).await
}
FailoverMessage::LeaderElectionResponse { voter_id, candidate_id, vote_granted, term, timestamp } => {
self.handle_leader_election_response(voter_id, candidate_id, vote_granted, term, timestamp).await?;
Ok(None)
}
FailoverMessage::RecoveryRequest { failed_node_id, recovery_type, timestamp } => {
self.handle_recovery_request(failed_node_id, recovery_type, timestamp).await
}
FailoverMessage::RecoveryResponse { request_id, success, message, timestamp } => {
self.handle_recovery_response(request_id, success, message, timestamp).await?;
Ok(None)
}
}
}
async fn handle_health_check_request(&self, _requester_id: Uuid, _timestamp: chrono::DateTime<chrono::Utc>) -> ClusterResult<Option<FailoverMessage>> {
let response = FailoverMessage::HealthCheckResponse {
responder_id: self.node_id,
status: NodeHealthStatus::Healthy,
timestamp: chrono::Utc::now(),
metrics: HashMap::new(),
};
Ok(Some(response))
}
async fn handle_health_check_response(&self, responder_id: Uuid, status: NodeHealthStatus, _timestamp: chrono::DateTime<chrono::Utc>, _metrics: HashMap<String, String>) -> ClusterResult<()> {
let mut health = self.node_health.write().await;
health.insert(responder_id, status);
Ok(())
}
async fn handle_failover_notification(&self, event: FailoverEvent) -> ClusterResult<()> {
{
let mut events = self.failover_events.write().await;
events.push(event.clone());
}
match event.event_type {
FailoverEventType::LeaderElectionCompleted => {
if let Some(leader_id) = event.metadata.get("new_leader") {
if let Ok(uuid) = Uuid::parse_str(leader_id) {
let mut leader = self.current_leader.write().await;
*leader = Some(uuid);
let callbacks = self.callbacks.lock().await;
for callback in callbacks.iter() {
callback.on_leader_election(uuid, &event).await;
}
}
}
}
_ => {
}
}
Ok(())
}
async fn handle_leader_election_request(&self, candidate_id: Uuid, term: u64, _timestamp: chrono::DateTime<chrono::Utc>) -> ClusterResult<Option<FailoverMessage>> {
let vote_granted = true;
let response = FailoverMessage::LeaderElectionResponse {
voter_id: self.node_id,
candidate_id,
vote_granted,
term,
timestamp: chrono::Utc::now(),
};
Ok(Some(response))
}
async fn handle_leader_election_response(&self, voter_id: Uuid, candidate_id: Uuid, vote_granted: bool, term: u64, _timestamp: chrono::DateTime<chrono::Utc>) -> ClusterResult<()> {
if vote_granted && candidate_id == self.node_id {
info!("Received vote from {} for term {}", voter_id, term);
}
Ok(())
}
async fn handle_recovery_request(&self, failed_node_id: Uuid, recovery_type: RecoveryType, _timestamp: chrono::DateTime<chrono::Utc>) -> ClusterResult<Option<FailoverMessage>> {
let recovery_manager = self.recovery_manager.lock().await;
match recovery_manager.initiate_recovery(failed_node_id, recovery_type).await {
Ok(()) => {
let response = FailoverMessage::RecoveryResponse {
request_id: Uuid::new_v4(),
success: true,
message: "Recovery initiated successfully".to_string(),
timestamp: chrono::Utc::now(),
};
Ok(Some(response))
}
Err(e) => {
let response = FailoverMessage::RecoveryResponse {
request_id: Uuid::new_v4(),
success: false,
message: format!("Recovery failed: {}", e),
timestamp: chrono::Utc::now(),
};
Ok(Some(response))
}
}
}
async fn handle_recovery_response(&self, request_id: Uuid, success: bool, message: String, _timestamp: chrono::DateTime<chrono::Utc>) -> ClusterResult<()> {
if success {
info!("Recovery successful for request {}: {}", request_id, message);
} else {
error!("Recovery failed for request {}: {}", request_id, message);
}
Ok(())
}
pub async fn add_node(&self, node_id: Uuid) {
let mut nodes = self.cluster_nodes.write().await;
nodes.insert(node_id);
let mut health = self.node_health.write().await;
health.insert(node_id, NodeHealthStatus::Healthy);
}
pub async fn remove_node(&self, node_id: Uuid) {
let mut nodes = self.cluster_nodes.write().await;
nodes.remove(&node_id);
let mut health = self.node_health.write().await;
health.remove(&node_id);
}
pub async fn add_callback(&self, callback: Box<dyn FailoverCallback + Send + Sync>) {
let mut callbacks = self.callbacks.lock().await;
callbacks.push(callback);
}
pub async fn get_current_leader(&self) -> Option<Uuid> {
*self.current_leader.read().await
}
pub async fn get_node_health(&self, node_id: Uuid) -> Option<NodeHealthStatus> {
self.node_health.read().await.get(&node_id).cloned()
}
pub async fn get_failover_events(&self) -> Vec<FailoverEvent> {
self.failover_events.read().await.clone()
}
}
impl Clone for FailoverManager {
fn clone(&self) -> Self {
Self {
node_id: self.node_id,
config: self.config.clone(),
cluster_nodes: Arc::clone(&self.cluster_nodes),
node_health: Arc::clone(&self.node_health),
failover_events: Arc::clone(&self.failover_events),
current_leader: Arc::clone(&self.current_leader),
callbacks: Arc::clone(&self.callbacks),
network_sender: Arc::clone(&self.network_sender),
recovery_manager: Arc::clone(&self.recovery_manager),
}
}
}