use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use super::core::{FailureInfo, FailureType, RecoveryStrategy, RetryPolicy};
use crate::distributed::execution::{ExecutionPlan, ExecutionResult};
use crate::error::{Error, Result};
#[derive(Debug, Clone)]
pub enum RecoveryAction {
RetryQuery {
plan: ExecutionPlan,
delay: Duration,
},
RetryFailedPartitions {
plan: ExecutionPlan,
partition_ids: Vec<usize>,
delay: Duration,
},
Reroute {
plan: ExecutionPlan,
excluded_nodes: Vec<String>,
},
LocalFallback {
plan: ExecutionPlan,
},
}
pub struct FaultToleranceHandler {
retry_policy: RetryPolicy,
recovery_strategy: RecoveryStrategy,
recent_failures: Arc<RwLock<Vec<FailureInfo>>>,
node_health: Arc<RwLock<HashMap<String, bool>>>,
partition_recovery: Arc<RwLock<HashMap<usize, Vec<FailureInfo>>>>,
}
impl FaultToleranceHandler {
pub fn new(retry_policy: RetryPolicy, recovery_strategy: RecoveryStrategy) -> Self {
Self {
retry_policy,
recovery_strategy,
recent_failures: Arc::new(RwLock::new(Vec::new())),
node_health: Arc::new(RwLock::new(HashMap::new())),
partition_recovery: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn default() -> Self {
Self::new(
RetryPolicy::default_exponential(),
RecoveryStrategy::RetryFailedPartitions,
)
}
pub fn execute_with_retry<F, T>(&self, operation: F) -> Result<T>
where
F: Fn() -> Result<T>,
{
let mut attempt = 0;
loop {
match operation() {
Ok(result) => return Ok(result),
Err(error) => {
let failure_type = FailureType::from_error(&error);
let mut failure = FailureInfo::new(failure_type.clone(), error.to_string());
if let Ok(mut failures) = self.recent_failures.write() {
failures.push(failure.clone());
}
if !failure_type.is_retriable() || attempt >= self.retry_policy.max_retries() {
return Err(error);
}
attempt += 1;
failure.increment_retry();
std::thread::sleep(self.retry_policy.delay_for_attempt(attempt));
}
}
}
}
pub fn handle_failure(
&self,
plan: &ExecutionPlan,
error: &Error,
) -> Result<Option<RecoveryAction>> {
let failure_type = FailureType::from_error(error);
if !failure_type.is_retriable() {
return Err(Error::DistributedProcessing(format!(
"Non-retriable error: {}",
error
)));
}
let mut failure = FailureInfo::new(failure_type, error.to_string());
if let Ok(mut failures) = self.recent_failures.write() {
failures.push(failure.clone());
}
let action = match self.recovery_strategy {
RecoveryStrategy::RetryQuery => Some(RecoveryAction::RetryQuery {
plan: plan.clone(),
delay: self.retry_policy.delay_for_attempt(0),
}),
RecoveryStrategy::RetryFailedPartitions => {
Some(RecoveryAction::RetryFailedPartitions {
plan: plan.clone(),
partition_ids: vec![], delay: self.retry_policy.delay_for_attempt(0),
})
}
RecoveryStrategy::Reroute => {
Some(RecoveryAction::Reroute {
plan: plan.clone(),
excluded_nodes: vec![], })
}
RecoveryStrategy::LocalFallback => {
Some(RecoveryAction::LocalFallback { plan: plan.clone() })
}
};
Ok(action)
}
pub fn recent_failures(&self) -> Result<Vec<FailureInfo>> {
match self.recent_failures.read() {
Ok(failures) => Ok(failures.clone()),
Err(_) => Err(Error::DistributedProcessing(
"Failed to read recent failures".to_string(),
)),
}
}
pub fn node_health(&self) -> Result<HashMap<String, bool>> {
match self.node_health.read() {
Ok(health) => Ok(health.clone()),
Err(_) => Err(Error::DistributedProcessing(
"Failed to read node health".to_string(),
)),
}
}
pub fn update_node_health(&self, node_id: impl Into<String>, healthy: bool) -> Result<()> {
match self.node_health.write() {
Ok(mut health) => {
health.insert(node_id.into(), healthy);
Ok(())
}
Err(_) => Err(Error::DistributedProcessing(
"Failed to update node health".to_string(),
)),
}
}
pub fn clear_failures(&self) -> Result<()> {
match self.recent_failures.write() {
Ok(mut failures) => {
failures.clear();
Ok(())
}
Err(_) => Err(Error::DistributedProcessing(
"Failed to clear failures".to_string(),
)),
}
}
}
pub struct FaultTolerantExecutor<E> {
inner: E,
fault_handler: Arc<FaultToleranceHandler>,
}
impl<E> FaultTolerantExecutor<E> {
pub fn new(inner: E, fault_handler: FaultToleranceHandler) -> Self {
Self {
inner,
fault_handler: Arc::new(fault_handler),
}
}
pub fn fault_handler(&self) -> Arc<FaultToleranceHandler> {
self.fault_handler.clone()
}
}
pub trait FaultTolerantContext {
fn execute_with_fault_tolerance(
&self,
plan: &ExecutionPlan,
fault_handler: &FaultToleranceHandler,
) -> Result<ExecutionResult>;
fn recover_from_failure(&self, action: RecoveryAction) -> Result<ExecutionResult>;
}