use super::{
DefaultTransitionHandler, PhaseContext, PhaseError, PhaseExecutor, PhaseResult,
PhaseTransitionHandler, PhaseType,
};
use crate::cook::execution::errors::{MapReduceError, MapReduceResult};
use crate::cook::execution::mapreduce::{MapPhase, ReducePhase};
use crate::cook::execution::SetupPhase;
use crate::cook::orchestrator::ExecutionEnvironment;
use crate::subprocess::SubprocessManager;
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub enum PhaseTransition {
Continue(PhaseType),
Skip(PhaseType),
Complete,
Error(String),
}
pub struct PhaseCoordinator {
setup_executor: Option<Box<dyn PhaseExecutor>>,
map_executor: Box<dyn PhaseExecutor>,
reduce_executor: Option<Box<dyn PhaseExecutor>>,
transition_handler: Box<dyn PhaseTransitionHandler>,
}
impl PhaseCoordinator {
pub fn new(
setup_phase: Option<SetupPhase>,
map_phase: MapPhase,
reduce_phase: Option<ReducePhase>,
_subprocess_manager: Arc<SubprocessManager>,
) -> Self {
let setup_executor = setup_phase.map(|phase| {
Box::new(super::setup::SetupPhaseExecutor::new(phase)) as Box<dyn PhaseExecutor>
});
let map_executor =
Box::new(super::map::MapPhaseExecutor::new(map_phase)) as Box<dyn PhaseExecutor>;
let reduce_executor = reduce_phase.map(|phase| {
Box::new(super::reduce::ReducePhaseExecutor::new(phase)) as Box<dyn PhaseExecutor>
});
Self {
setup_executor,
map_executor,
reduce_executor,
transition_handler: Box::new(DefaultTransitionHandler),
}
}
pub fn with_transition_handler(mut self, handler: Box<dyn PhaseTransitionHandler>) -> Self {
self.transition_handler = handler;
self
}
pub async fn execute_workflow(
&self,
environment: ExecutionEnvironment,
subprocess_manager: Arc<SubprocessManager>,
) -> MapReduceResult<PhaseResult> {
let mut context = PhaseContext::new(environment, subprocess_manager);
let mut workflow_result;
if let Some(setup) = &self.setup_executor {
match self.execute_phase(setup.as_ref(), &mut context).await {
Ok(result) => {
info!("Setup phase completed successfully");
self.transition_handler
.on_phase_complete(PhaseType::Setup, &result);
}
Err(error) => {
let transition = Self::handle_phase_error(
self.transition_handler.as_ref(),
PhaseType::Setup,
&error,
);
if matches!(transition, PhaseTransition::Error(_)) {
return Err(error.into());
}
}
}
}
match self
.execute_phase(self.map_executor.as_ref(), &mut context)
.await
{
Ok(result) => {
info!(
"Map phase completed with {} items processed",
result.metrics.items_processed
);
self.transition_handler
.on_phase_complete(PhaseType::Map, &result);
workflow_result = Some(result);
}
Err(error) => {
let transition = Self::handle_phase_error(
self.transition_handler.as_ref(),
PhaseType::Map,
&error,
);
return Err(Self::convert_transition_to_error(transition, error));
}
}
if Self::should_execute_reduce(
self.reduce_executor.as_ref().map(|b| b.as_ref()),
context.map_results.as_ref(),
) {
let reduce = self.reduce_executor.as_ref().unwrap();
match self.execute_phase(reduce.as_ref(), &mut context).await {
Ok(result) => {
info!("Reduce phase completed successfully");
self.transition_handler
.on_phase_complete(PhaseType::Reduce, &result);
workflow_result = Some(result);
}
Err(error) => {
let transition = Self::handle_phase_error(
self.transition_handler.as_ref(),
PhaseType::Reduce,
&error,
);
if matches!(transition, PhaseTransition::Error(_)) {
return Err(error.into());
}
}
}
} else if self.reduce_executor.is_some() {
debug!("Skipping reduce phase - no map results available");
}
workflow_result.ok_or_else(|| MapReduceError::General {
message: "No phases were executed successfully".to_string(),
source: None,
})
}
async fn execute_phase(
&self,
executor: &dyn PhaseExecutor,
context: &mut PhaseContext,
) -> Result<PhaseResult, PhaseError> {
let phase_type = executor.phase_type();
if Self::should_skip_phase(self.transition_handler.as_ref(), executor, context) {
debug!("Skipping phase {}", phase_type);
return Ok(Self::create_skipped_result(phase_type));
}
executor.validate_context(context)?;
info!("Starting execution of {} phase", phase_type);
let start_time = std::time::Instant::now();
let result = executor.execute(context).await?;
let duration = start_time.elapsed();
info!(
"{} phase completed in {:.2}s",
phase_type,
duration.as_secs_f64()
);
Ok(result)
}
#[cfg(test)]
pub(crate) fn should_skip_phase(
transition_handler: &dyn PhaseTransitionHandler,
executor: &dyn PhaseExecutor,
context: &PhaseContext,
) -> bool {
!transition_handler.should_execute(executor.phase_type(), context)
|| executor.can_skip(context)
}
#[cfg(not(test))]
fn should_skip_phase(
transition_handler: &dyn PhaseTransitionHandler,
executor: &dyn PhaseExecutor,
context: &PhaseContext,
) -> bool {
!transition_handler.should_execute(executor.phase_type(), context)
|| executor.can_skip(context)
}
#[cfg(test)]
pub(crate) fn should_execute_reduce<T>(
reduce_executor: Option<&dyn PhaseExecutor>,
map_results: Option<&Vec<T>>,
) -> bool {
reduce_executor.is_some() && map_results.is_some()
}
#[cfg(not(test))]
fn should_execute_reduce<T>(
reduce_executor: Option<&dyn PhaseExecutor>,
map_results: Option<&Vec<T>>,
) -> bool {
reduce_executor.is_some() && map_results.is_some()
}
#[cfg(test)]
pub(crate) fn create_skipped_result(phase_type: PhaseType) -> PhaseResult {
PhaseResult {
phase_type,
success: true,
data: None,
error_message: Some(format!("Phase {} was skipped", phase_type)),
metrics: Default::default(),
}
}
#[cfg(not(test))]
fn create_skipped_result(phase_type: PhaseType) -> PhaseResult {
PhaseResult {
phase_type,
success: true,
data: None,
error_message: Some(format!("Phase {} was skipped", phase_type)),
metrics: Default::default(),
}
}
#[cfg(test)]
pub(crate) fn handle_phase_error(
transition_handler: &dyn PhaseTransitionHandler,
phase_type: PhaseType,
error: &PhaseError,
) -> PhaseTransition {
warn!("{} phase failed: {}", phase_type, error);
transition_handler.on_phase_error(phase_type, error)
}
#[cfg(not(test))]
fn handle_phase_error(
transition_handler: &dyn PhaseTransitionHandler,
phase_type: PhaseType,
error: &PhaseError,
) -> PhaseTransition {
warn!("{} phase failed: {}", phase_type, error);
transition_handler.on_phase_error(phase_type, error)
}
#[cfg(test)]
pub(crate) fn convert_transition_to_error(
transition: PhaseTransition,
fallback_error: PhaseError,
) -> MapReduceError {
match transition {
PhaseTransition::Error(msg) => MapReduceError::General {
message: msg,
source: None,
},
_ => fallback_error.into(),
}
}
#[cfg(not(test))]
fn convert_transition_to_error(
transition: PhaseTransition,
fallback_error: PhaseError,
) -> MapReduceError {
match transition {
PhaseTransition::Error(msg) => MapReduceError::General {
message: msg,
source: None,
},
_ => fallback_error.into(),
}
}
pub async fn resume_from_checkpoint(
&self,
checkpoint: super::PhaseCheckpoint,
environment: ExecutionEnvironment,
subprocess_manager: Arc<SubprocessManager>,
) -> MapReduceResult<PhaseResult> {
let mut context = PhaseContext::new(environment, subprocess_manager);
context.checkpoint = Some(checkpoint.clone());
let starting_phase = match checkpoint.phase_type {
PhaseType::Setup => {
if let Some(setup) = &self.setup_executor {
return self
.execute_phase(setup.as_ref(), &mut context)
.await
.map_err(|e| e.into());
}
PhaseType::Map
}
PhaseType::Map => PhaseType::Map,
PhaseType::Reduce => {
if let Some(reduce) = &self.reduce_executor {
return self
.execute_phase(reduce.as_ref(), &mut context)
.await
.map_err(|e| e.into());
}
return Ok(PhaseResult {
phase_type: PhaseType::Reduce,
success: true,
data: None,
error_message: Some("Reduce phase already completed".to_string()),
metrics: Default::default(),
});
}
};
match starting_phase {
PhaseType::Map => {
self.execute_workflow(
context.environment.clone(),
context.subprocess_manager.clone(),
)
.await
}
_ => Err(MapReduceError::General {
message: format!("Cannot resume from {} phase", starting_phase),
source: None,
}),
}
}
}