use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::error::{Result, Error};
use crate::distributed::execution::{ExecutionPlan, ExecutionResult, ExecutionMetrics};
use crate::distributed::fault_tolerance::{FaultToleranceHandler, FaultTolerantContext, RecoveryAction, FailureType, FailureInfo};
use crate::distributed::partition::PartitionSet;
use super::context::DataFusionContext;
impl FaultTolerantContext for DataFusionContext {
fn execute_with_fault_tolerance(
&self,
plan: &ExecutionPlan,
fault_handler: &FaultToleranceHandler,
) -> Result<ExecutionResult> {
let start_time = Instant::now();
let execution_id = format!("exec-{}", uuid::Uuid::new_v4());
let result = fault_handler.execute_with_retry(|| {
self.execute(plan)
});
match result {
Ok(execution_result) => {
log::info!(
"Successfully executed plan with ID {} in {:?}",
execution_id,
start_time.elapsed()
);
Ok(execution_result)
},
Err(error) => {
log::error!(
"Execution failed for plan with ID {}: {}",
execution_id,
error
);
match fault_handler.handle_failure(plan, &error)? {
Some(recovery_action) => self.recover_from_failure(recovery_action),
None => Err(error),
}
}
}
}
fn recover_from_failure(
&self,
action: RecoveryAction,
) -> Result<ExecutionResult> {
match action {
RecoveryAction::RetryQuery { plan, delay } => {
if !delay.is_zero() {
std::thread::sleep(delay);
}
log::info!("Retrying entire query after failure");
self.execute(&plan)
},
RecoveryAction::RetryFailedPartitions { plan, partition_ids, delay } => {
if !delay.is_zero() {
std::thread::sleep(delay);
}
log::info!(
"Retrying {} failed partitions: {:?}",
partition_ids.len(),
partition_ids
);
self.execute(&plan)
},
RecoveryAction::Reroute { plan, excluded_nodes } => {
log::info!(
"Rerouting execution to avoid {} nodes: {:?}",
excluded_nodes.len(),
excluded_nodes
);
self.execute(&plan)
},
RecoveryAction::LocalFallback { plan } => {
log::info!("Falling back to local execution after distributed failure");
self.execute(&plan)
},
}
}
}
pub fn create_fault_tolerant_context(
context: DataFusionContext,
fault_handler: FaultToleranceHandler,
) -> FaultTolerantDataFusionContext {
FaultTolerantDataFusionContext {
inner: context,
fault_handler: Arc::new(fault_handler),
}
}
pub struct FaultTolerantDataFusionContext {
inner: DataFusionContext,
fault_handler: Arc<FaultToleranceHandler>,
}
impl FaultTolerantDataFusionContext {
pub fn new(context: DataFusionContext, fault_handler: FaultToleranceHandler) -> Self {
Self {
inner: context,
fault_handler: Arc::new(fault_handler),
}
}
pub fn execute(&self, plan: &ExecutionPlan) -> Result<ExecutionResult> {
self.inner.execute_with_fault_tolerance(plan, &self.fault_handler)
}
pub fn inner(&self) -> &DataFusionContext {
&self.inner
}
pub fn fault_handler(&self) -> Arc<FaultToleranceHandler> {
self.fault_handler.clone()
}
pub fn register_dataset(&mut self, name: &str, partitions: PartitionSet) -> Result<()> {
self.inner.register_dataset(name, partitions)
}
pub fn register_csv(&mut self, name: &str, path: &str) -> Result<()> {
self.inner.register_csv(name, path)
}
pub fn register_parquet(&mut self, name: &str, path: &str) -> Result<()> {
self.inner.register_parquet(name, path)
}
pub fn explain_plan(&self, plan: &ExecutionPlan, with_statistics: bool) -> Result<String> {
self.inner.explain_plan(plan, with_statistics)
}
}