use super::mode_detection::{detect_execution_mode, ExecutionMode};
use super::resource_allocation::{calculate_resources, ResourceRequirements};
use crate::cook::orchestrator::CookConfig;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecutionPlan {
pub mode: ExecutionMode,
pub resource_needs: ResourceRequirements,
pub phases: Vec<Phase>,
pub parallel_budget: usize,
}
impl ExecutionPlan {
pub fn is_dry_run(&self) -> bool {
self.mode == ExecutionMode::DryRun
}
pub fn requires_worktrees(&self) -> bool {
self.resource_needs.worktrees > 0
}
pub fn phase_count(&self) -> usize {
self.phases.len()
}
pub fn has_phase(&self, phase_type: PhaseType) -> bool {
self.phases.iter().any(|p| p.phase_type() == phase_type)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PhaseType {
Setup,
Map,
Reduce,
Commands,
DryRunAnalysis,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Phase {
Setup {
command_count: usize,
has_timeout: bool,
},
Map {
max_parallel: usize,
has_filter: bool,
has_sort: bool,
},
Reduce {
command_count: usize,
},
Commands {
command_count: usize,
},
DryRunAnalysis,
}
impl Phase {
pub fn phase_type(&self) -> PhaseType {
match self {
Phase::Setup { .. } => PhaseType::Setup,
Phase::Map { .. } => PhaseType::Map,
Phase::Reduce { .. } => PhaseType::Reduce,
Phase::Commands { .. } => PhaseType::Commands,
Phase::DryRunAnalysis => PhaseType::DryRunAnalysis,
}
}
pub fn command_count(&self) -> Option<usize> {
match self {
Phase::Setup { command_count, .. } => Some(*command_count),
Phase::Reduce { command_count } => Some(*command_count),
Phase::Commands { command_count } => Some(*command_count),
Phase::Map { .. } => None,
Phase::DryRunAnalysis => None,
}
}
}
impl std::fmt::Display for Phase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Phase::Setup { command_count, .. } => {
write!(f, "Setup ({} commands)", command_count)
}
Phase::Map { max_parallel, .. } => {
write!(f, "Map (max {} parallel)", max_parallel)
}
Phase::Reduce { command_count } => {
write!(f, "Reduce ({} commands)", command_count)
}
Phase::Commands { command_count } => {
write!(f, "Commands ({} commands)", command_count)
}
Phase::DryRunAnalysis => write!(f, "DryRunAnalysis"),
}
}
}
pub fn plan_execution(config: &CookConfig) -> ExecutionPlan {
let mode = detect_execution_mode(config);
let resource_needs = calculate_resources(config, &mode);
let phases = determine_phases(config, &mode);
let parallel_budget = compute_parallel_budget(&resource_needs);
ExecutionPlan {
mode,
resource_needs,
phases,
parallel_budget,
}
}
fn determine_phases(config: &CookConfig, mode: &ExecutionMode) -> Vec<Phase> {
match mode {
ExecutionMode::MapReduce => determine_mapreduce_phases(config),
ExecutionMode::Standard | ExecutionMode::Iterative => determine_command_phases(config),
ExecutionMode::DryRun => vec![Phase::DryRunAnalysis],
}
}
fn determine_mapreduce_phases(config: &CookConfig) -> Vec<Phase> {
let mut phases = Vec::new();
if let Some(mr_config) = &config.mapreduce_config {
if let Some(setup) = &mr_config.setup {
phases.push(Phase::Setup {
command_count: setup.commands.len(),
has_timeout: setup.timeout.is_some(),
});
}
let max_parallel = mr_config.map.max_parallel.parse::<usize>().unwrap_or(10);
phases.push(Phase::Map {
max_parallel,
has_filter: mr_config.map.filter.is_some(),
has_sort: mr_config.map.sort_by.is_some(),
});
if let Some(reduce) = &mr_config.reduce {
phases.push(Phase::Reduce {
command_count: reduce.commands.len(),
});
}
}
phases
}
fn determine_command_phases(config: &CookConfig) -> Vec<Phase> {
let command_count = config.workflow.commands.len();
vec![Phase::Commands { command_count }]
}
fn compute_parallel_budget(resources: &ResourceRequirements) -> usize {
resources.max_concurrent_commands
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::mapreduce::{AgentTemplate, MapPhaseYaml, MapReduceWorkflowConfig};
use crate::config::WorkflowConfig;
use std::path::PathBuf;
use std::sync::Arc;
fn create_default_workflow_config() -> WorkflowConfig {
WorkflowConfig {
name: None,
commands: vec![],
env: None,
secrets: None,
env_files: None,
profiles: None,
merge: None,
}
}
fn create_default_config() -> CookConfig {
CookConfig {
command: crate::cook::command::CookCommand {
playbook: PathBuf::from("workflow.yml"),
path: None,
max_iterations: 1,
map: vec![],
args: vec![],
fail_fast: false,
auto_accept: false,
resume: None,
verbosity: 0,
quiet: false,
dry_run: false,
params: Default::default(),
},
project_path: Arc::new(PathBuf::from(".")),
workflow: Arc::new(create_default_workflow_config()),
mapreduce_config: None,
}
}
fn create_mapreduce_config_with_parallel(max_parallel: usize) -> MapReduceWorkflowConfig {
MapReduceWorkflowConfig {
name: "test".to_string(),
mode: "mapreduce".to_string(),
env: None,
secrets: None,
env_files: None,
profiles: None,
setup: None,
map: MapPhaseYaml {
input: "items.json".to_string(),
json_path: "$.items[*]".to_string(),
agent_template: AgentTemplate { commands: vec![] },
max_parallel: max_parallel.to_string(),
filter: None,
sort_by: None,
max_items: None,
offset: None,
distinct: None,
agent_timeout_secs: None,
timeout_config: None,
},
reduce: None,
error_policy: Default::default(),
on_item_failure: None,
continue_on_failure: None,
max_failures: None,
failure_threshold: None,
error_collection: None,
merge: None,
}
}
fn create_mapreduce_config_with_setup_and_reduce() -> MapReduceWorkflowConfig {
use crate::config::mapreduce::{ReducePhaseYaml, SetupPhaseConfig};
use crate::cook::workflow::WorkflowStep;
fn shell_step(cmd: &str) -> WorkflowStep {
WorkflowStep {
shell: Some(cmd.to_string()),
..Default::default()
}
}
MapReduceWorkflowConfig {
name: "test".to_string(),
mode: "mapreduce".to_string(),
env: None,
secrets: None,
env_files: None,
profiles: None,
setup: Some(SetupPhaseConfig {
commands: vec![shell_step("echo setup"), shell_step("echo setup2")],
timeout: Some("300".to_string()),
capture_outputs: Default::default(),
}),
map: MapPhaseYaml {
input: "items.json".to_string(),
json_path: "$.items[*]".to_string(),
agent_template: AgentTemplate { commands: vec![] },
max_parallel: "10".to_string(),
filter: Some("status == 'active'".to_string()),
sort_by: Some("priority DESC".to_string()),
max_items: None,
offset: None,
distinct: None,
agent_timeout_secs: None,
timeout_config: None,
},
reduce: Some(ReducePhaseYaml {
commands: vec![shell_step("echo reduce")],
}),
error_policy: Default::default(),
on_item_failure: None,
continue_on_failure: None,
max_failures: None,
failure_threshold: None,
error_collection: None,
merge: None,
}
}
#[test]
fn test_plan_execution_mapreduce() {
let mut config = create_default_config();
config.mapreduce_config = Some(Arc::new(create_mapreduce_config_with_parallel(10)));
let plan = plan_execution(&config);
assert_eq!(plan.mode, ExecutionMode::MapReduce);
assert_eq!(plan.parallel_budget, 10);
assert_eq!(plan.resource_needs.worktrees, 11);
assert_eq!(plan.phases.len(), 1); assert!(plan.has_phase(PhaseType::Map));
}
#[test]
fn test_plan_execution_mapreduce_with_all_phases() {
let mut config = create_default_config();
config.mapreduce_config = Some(Arc::new(create_mapreduce_config_with_setup_and_reduce()));
let plan = plan_execution(&config);
assert_eq!(plan.mode, ExecutionMode::MapReduce);
assert_eq!(plan.phases.len(), 3); assert!(plan.has_phase(PhaseType::Setup));
assert!(plan.has_phase(PhaseType::Map));
assert!(plan.has_phase(PhaseType::Reduce));
match &plan.phases[0] {
Phase::Setup {
command_count,
has_timeout,
} => {
assert_eq!(*command_count, 2);
assert!(*has_timeout);
}
_ => panic!("Expected Setup phase"),
}
match &plan.phases[1] {
Phase::Map {
max_parallel,
has_filter,
has_sort,
} => {
assert_eq!(*max_parallel, 10);
assert!(*has_filter);
assert!(*has_sort);
}
_ => panic!("Expected Map phase"),
}
match &plan.phases[2] {
Phase::Reduce { command_count } => {
assert_eq!(*command_count, 1);
}
_ => panic!("Expected Reduce phase"),
}
}
#[test]
fn test_plan_execution_standard() {
let config = create_default_config();
let plan = plan_execution(&config);
assert_eq!(plan.mode, ExecutionMode::Standard);
assert_eq!(plan.parallel_budget, 1);
assert_eq!(plan.phases.len(), 1);
assert!(plan.has_phase(PhaseType::Commands));
}
#[test]
fn test_plan_execution_dryrun() {
let mut config = create_default_config();
config.command.dry_run = true;
let plan = plan_execution(&config);
assert_eq!(plan.mode, ExecutionMode::DryRun);
assert!(plan.is_dry_run());
assert_eq!(plan.phases.len(), 1);
assert!(plan.has_phase(PhaseType::DryRunAnalysis));
}
#[test]
fn test_plan_execution_iterative() {
let mut config = create_default_config();
config.command.args = vec!["arg1".to_string(), "arg2".to_string()];
let plan = plan_execution(&config);
assert_eq!(plan.mode, ExecutionMode::Iterative);
assert_eq!(plan.parallel_budget, 1);
assert!(plan.has_phase(PhaseType::Commands));
}
#[test]
fn test_execution_plan_requires_worktrees() {
let mut config = create_default_config();
config.mapreduce_config = Some(Arc::new(create_mapreduce_config_with_parallel(5)));
let plan = plan_execution(&config);
assert!(plan.requires_worktrees());
let standard_config = create_default_config();
let standard_plan = plan_execution(&standard_config);
assert!(!standard_plan.requires_worktrees());
}
#[test]
fn test_execution_plan_phase_count() {
let mut config = create_default_config();
config.mapreduce_config = Some(Arc::new(create_mapreduce_config_with_setup_and_reduce()));
let plan = plan_execution(&config);
assert_eq!(plan.phase_count(), 3);
}
#[test]
fn test_phase_display() {
assert_eq!(
format!(
"{}",
Phase::Setup {
command_count: 2,
has_timeout: true
}
),
"Setup (2 commands)"
);
assert_eq!(
format!(
"{}",
Phase::Map {
max_parallel: 10,
has_filter: false,
has_sort: false
}
),
"Map (max 10 parallel)"
);
assert_eq!(
format!("{}", Phase::Reduce { command_count: 1 }),
"Reduce (1 commands)"
);
assert_eq!(
format!("{}", Phase::Commands { command_count: 3 }),
"Commands (3 commands)"
);
assert_eq!(format!("{}", Phase::DryRunAnalysis), "DryRunAnalysis");
}
#[test]
fn test_phase_type() {
assert_eq!(
Phase::Setup {
command_count: 0,
has_timeout: false
}
.phase_type(),
PhaseType::Setup
);
assert_eq!(
Phase::Map {
max_parallel: 1,
has_filter: false,
has_sort: false
}
.phase_type(),
PhaseType::Map
);
assert_eq!(
Phase::Reduce { command_count: 0 }.phase_type(),
PhaseType::Reduce
);
assert_eq!(
Phase::Commands { command_count: 0 }.phase_type(),
PhaseType::Commands
);
assert_eq!(
Phase::DryRunAnalysis.phase_type(),
PhaseType::DryRunAnalysis
);
}
#[test]
fn test_phase_command_count() {
assert_eq!(
Phase::Setup {
command_count: 2,
has_timeout: true
}
.command_count(),
Some(2)
);
assert_eq!(Phase::Reduce { command_count: 3 }.command_count(), Some(3));
assert_eq!(
Phase::Commands { command_count: 5 }.command_count(),
Some(5)
);
assert_eq!(
Phase::Map {
max_parallel: 10,
has_filter: false,
has_sort: false
}
.command_count(),
None
);
assert_eq!(Phase::DryRunAnalysis.command_count(), None);
}
#[test]
fn test_planning_is_deterministic() {
let config = create_default_config();
let plan1 = plan_execution(&config);
let plan2 = plan_execution(&config);
assert_eq!(plan1, plan2, "Planning must be deterministic");
}
#[test]
fn test_planning_deterministic_mapreduce() {
let mut config = create_default_config();
config.mapreduce_config = Some(Arc::new(create_mapreduce_config_with_setup_and_reduce()));
let plan1 = plan_execution(&config);
let plan2 = plan_execution(&config);
assert_eq!(plan1, plan2, "Planning must be deterministic");
}
#[test]
fn test_planning_deterministic_dryrun() {
let mut config = create_default_config();
config.command.dry_run = true;
let plan1 = plan_execution(&config);
let plan2 = plan_execution(&config);
assert_eq!(plan1, plan2, "Planning must be deterministic");
}
}