use super::command_executor::CommandExecutor;
use crate::cook::execution::claude::ClaudeExecutorImpl;
use crate::cook::execution::data_pipeline::DataPipeline;
use crate::cook::execution::dlq::DeadLetterQueue;
use crate::cook::execution::errors::{MapReduceError, MapReduceResult};
use crate::cook::execution::input_source::InputSource;
use crate::cook::execution::mapreduce::{
agent::{AgentConfig, AgentLifecycleManager, AgentResult, AgentStatus},
aggregation::{AggregationSummary, CollectionStrategy, ResultCollector},
dlq_integration,
event::{EventLogger, MapReduceEvent},
merge_queue::MergeQueue,
resources::git::GitOperations,
retry_tracking,
state::StateManager,
timeout::{TimeoutConfig, TimeoutEnforcer},
types::{MapPhase, ReducePhase, SetupPhase},
};
use crate::cook::execution::runner::RealCommandRunner;
use crate::cook::execution::ClaudeExecutor;
use crate::cook::interaction::UserInteraction;
use crate::cook::orchestrator::ExecutionEnvironment;
use crate::cook::session::SessionManager;
use crate::cook::workflow::{OnFailureConfig, WorkflowStep};
use crate::subprocess::SubprocessManager;
use chrono::{DateTime, Utc};
use serde_json::Value;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Semaphore};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OrphanedWorktree {
pub path: PathBuf,
pub agent_id: String,
pub item_id: String,
pub failed_at: DateTime<Utc>,
pub error: String,
}
pub struct MapReduceCoordinator {
agent_manager: Arc<dyn AgentLifecycleManager>,
_state_manager: Arc<StateManager>,
user_interaction: Arc<dyn UserInteraction>,
result_collector: Arc<ResultCollector>,
subprocess: Arc<SubprocessManager>,
project_root: PathBuf,
event_logger: Arc<EventLogger>,
job_id: String,
#[allow(dead_code)]
pub(crate) claude_executor: Arc<dyn ClaudeExecutor>,
_session_manager: Arc<dyn SessionManager>,
execution_mode: crate::cook::execution::mapreduce::dry_run::ExecutionMode,
timeout_enforcer: Arc<Mutex<Option<Arc<TimeoutEnforcer>>>>,
merge_queue: Arc<MergeQueue>,
orphaned_worktrees: Arc<Mutex<Vec<OrphanedWorktree>>>,
dlq: Arc<DeadLetterQueue>,
retry_counts: Arc<tokio::sync::RwLock<HashMap<String, u32>>>,
command_executor: CommandExecutor,
}
impl MapReduceCoordinator {
pub fn new(
agent_manager: Arc<dyn AgentLifecycleManager>,
state_manager: Arc<StateManager>,
user_interaction: Arc<dyn UserInteraction>,
subprocess: Arc<SubprocessManager>,
project_root: PathBuf,
) -> Self {
Self::with_mode(
agent_manager,
state_manager,
user_interaction,
subprocess,
project_root,
crate::cook::execution::mapreduce::dry_run::ExecutionMode::Normal,
0, )
}
pub fn with_mode(
agent_manager: Arc<dyn AgentLifecycleManager>,
state_manager: Arc<StateManager>,
user_interaction: Arc<dyn UserInteraction>,
subprocess: Arc<SubprocessManager>,
project_root: PathBuf,
execution_mode: crate::cook::execution::mapreduce::dry_run::ExecutionMode,
verbosity: u8,
) -> Self {
let result_collector = Arc::new(ResultCollector::new(CollectionStrategy::InMemory));
let job_id = format!("mapreduce-{}", chrono::Utc::now().format("%Y%m%d_%H%M%S"));
let event_logger = Arc::new(EventLogger::new(
project_root.clone(),
job_id.clone(),
None,
verbosity,
));
let command_runner = RealCommandRunner::new();
let claude_executor: Arc<dyn ClaudeExecutor> =
Arc::new(ClaudeExecutorImpl::new(command_runner));
let session_manager = Arc::new(DummySessionManager);
let git_ops = Arc::new(GitOperations::new());
let merge_queue = Arc::new(MergeQueue::new_with_claude(
git_ops,
Some(claude_executor.clone()),
verbosity,
));
let dlq = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
crate::storage::create_global_dlq(&project_root, &job_id, None)
.await
.unwrap_or_else(|e| {
warn!("Failed to create global DLQ: {}, using fallback", e);
let temp_path = std::env::temp_dir().join("prodigy_dlq");
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
DeadLetterQueue::new(job_id.clone(), temp_path, 1000, 30, None)
.await
.expect("Failed to create fallback DLQ")
})
})
})
})
});
let command_executor = CommandExecutor::new(claude_executor.clone(), subprocess.clone());
Self {
agent_manager,
_state_manager: state_manager,
user_interaction,
result_collector,
subprocess,
project_root,
event_logger,
job_id,
claude_executor,
_session_manager: session_manager,
execution_mode,
timeout_enforcer: Arc::new(Mutex::new(None)),
merge_queue,
orphaned_worktrees: Arc::new(Mutex::new(Vec::new())),
dlq: Arc::new(dlq),
retry_counts: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
command_executor,
}
}
pub async fn get_orphaned_worktrees(&self) -> Vec<OrphanedWorktree> {
self.orphaned_worktrees.lock().await.clone()
}
pub async fn register_orphaned_worktree(&self, orphaned: OrphanedWorktree) {
warn!(
"Registered orphaned worktree: {} (agent: {}, item: {})",
orphaned.path.display(),
orphaned.agent_id,
orphaned.item_id
);
let mut registry = self.orphaned_worktrees.lock().await;
registry.push(orphaned);
}
pub async fn execute_job(
&self,
setup: Option<SetupPhase>,
map_phase: MapPhase,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
) -> MapReduceResult<Vec<AgentResult>> {
info!("Starting MapReduce job execution");
if let Some(timeout_secs) = map_phase.config.agent_timeout_secs {
let timeout_config =
map_phase
.timeout_config
.clone()
.unwrap_or_else(|| TimeoutConfig {
agent_timeout_secs: Some(timeout_secs),
..TimeoutConfig::default()
});
let mut enforcer = self.timeout_enforcer.lock().await;
*enforcer = Some(Arc::new(TimeoutEnforcer::new(timeout_config)));
info!("Timeout enforcement enabled with {}s timeout", timeout_secs);
} else if let Some(timeout_config) = map_phase.timeout_config.clone() {
let mut enforcer = self.timeout_enforcer.lock().await;
*enforcer = Some(Arc::new(TimeoutEnforcer::new(timeout_config)));
info!("Timeout enforcement enabled with custom configuration");
}
if let crate::cook::execution::mapreduce::dry_run::ExecutionMode::DryRun(ref config) =
self.execution_mode
{
return self
.execute_dry_run(setup.as_ref(), &map_phase, reduce.as_ref(), config)
.await;
}
if let Some(setup_phase) = setup {
self.execute_setup_phase(setup_phase, env, &map_phase.workflow_env)
.await?;
}
let work_items = self.load_work_items(&map_phase).await?;
if work_items.is_empty() {
warn!("No work items to process");
return Ok(Vec::new());
}
info!("Processing {} work items", work_items.len());
let map_results = self
.execute_map_phase_internal(map_phase, work_items, env)
.await?;
if let Some(reduce_phase) = reduce {
self.execute_reduce_phase(reduce_phase, &map_results, env)
.await?;
}
tracing::info!("MapReduce reduce phase completed. Changes are in parent worktree.");
Ok(map_results)
}
async fn execute_dry_run(
&self,
setup: Option<&SetupPhase>,
map_phase: &MapPhase,
reduce: Option<&ReducePhase>,
_config: &crate::cook::execution::mapreduce::dry_run::DryRunConfig,
) -> MapReduceResult<Vec<AgentResult>> {
use crate::cook::execution::mapreduce::dry_run::{DryRunValidator, OutputFormatter};
info!("Executing MapReduce job in dry-run mode");
let validator = DryRunValidator::new();
match validator
.validate_workflow_phases(setup.cloned(), map_phase.clone(), reduce.cloned())
.await
{
Ok(report) => {
let formatter = OutputFormatter::new();
let output = formatter.format_human(&report);
self.user_interaction.display_info(&output);
if report.errors.is_empty() {
self.user_interaction.display_success(
"Dry-run validation successful! Workflow is ready to execute.",
);
Ok(Vec::new()) } else {
self.user_interaction.display_error(&format!(
"Dry-run validation failed with {} error(s)",
report.errors.len()
));
Err(MapReduceError::General {
message: format!(
"Dry-run validation failed with {} errors",
report.errors.len()
),
source: None,
})
}
}
Err(e) => {
self.user_interaction
.display_error(&format!("Dry-run validation failed: {}", e));
Err(MapReduceError::General {
message: format!("Dry-run validation failed: {}", e),
source: None,
})
}
}
}
fn classify_failure_reason(
error: &MapReduceError,
) -> crate::cook::execution::mapreduce::event::FailureReason {
use crate::cook::execution::mapreduce::event::FailureReason;
let error_msg = error.to_string();
let msg_lower = error_msg.to_lowercase();
if msg_lower.contains("commit required") || msg_lower.contains("commit validation") {
let command = error_msg
.lines()
.find(|line| line.contains("Command:"))
.and_then(|line| line.split("Command:").nth(1))
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "unknown".to_string());
FailureReason::CommitValidationFailed { command }
} else if msg_lower.contains("timeout") || msg_lower.contains("timed out") {
FailureReason::Timeout
} else if msg_lower.contains("merge") || msg_lower.contains("conflict") {
FailureReason::MergeConflict
} else if msg_lower.contains("worktree") {
FailureReason::WorktreeError
} else if let Some(exit_code) = Self::extract_exit_code(&error_msg) {
FailureReason::CommandFailed { exit_code }
} else {
FailureReason::Unknown
}
}
fn extract_exit_code(error_msg: &str) -> Option<i32> {
let patterns = [
regex::Regex::new(r"exit code:?\s*(\d+)").ok()?,
regex::Regex::new(r"exited with code\s*(\d+)").ok()?,
regex::Regex::new(r"status code:?\s*(\d+)").ok()?,
];
for pattern in &patterns {
if let Some(captures) = pattern.captures(error_msg) {
if let Some(code_str) = captures.get(1) {
if let Ok(code) = code_str.as_str().parse::<i32>() {
return Some(code);
}
}
}
}
None
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) fn get_step_display_name(step: &WorkflowStep) -> String {
CommandExecutor::get_step_display_name(step)
}
fn interpolate_step_with_env(
&self,
step: &WorkflowStep,
workflow_env: &HashMap<String, String>,
) -> MapReduceResult<WorkflowStep> {
use crate::cook::execution::interpolation::{InterpolationContext, InterpolationEngine};
let mut context = InterpolationContext::new();
for (key, value) in workflow_env {
context.set(key.clone(), serde_json::Value::String(value.clone()));
}
let mut engine = InterpolationEngine::new(false);
let mut interpolated = step.clone();
if let Some(shell) = &step.shell {
interpolated.shell = Some(engine.interpolate(shell, &context).map_err(|e| {
MapReduceError::ProcessingError(format!(
"Failed to interpolate shell command: {}",
e
))
})?);
}
if let Some(claude) = &step.claude {
interpolated.claude = Some(engine.interpolate(claude, &context).map_err(|e| {
MapReduceError::ProcessingError(format!(
"Failed to interpolate claude command: {}",
e
))
})?);
}
Ok(interpolated)
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) async fn execute_setup_phase(
&self,
setup_phase: SetupPhase,
env: &ExecutionEnvironment,
workflow_env: &HashMap<String, String>,
) -> MapReduceResult<()> {
info!("Executing setup phase");
info!(
"Setup phase executing in directory: {}",
env.working_dir.display()
);
for (index, step) in setup_phase.commands.iter().enumerate() {
let step_name = Self::get_step_display_name(step);
self.user_interaction.display_progress(&format!(
"Setup [{}/{}]: {}",
index + 1,
setup_phase.commands.len(),
step_name
));
info!(
"Executing setup step {}/{}: {}",
index + 1,
setup_phase.commands.len(),
step_name
);
debug!("=== Step Execution Context ===");
debug!("Step: {:?}", step);
debug!("Working Directory: {}", env.working_dir.display());
debug!("Project Directory: {}", self.project_root.display());
debug!("Worktree: {:?}", env.worktree_name);
debug!("Session ID: {}", env.session_id);
let mut env_vars = HashMap::new();
env_vars.insert("PRODIGY_AUTOMATION".to_string(), "true".to_string());
for (key, value) in workflow_env {
env_vars.insert(key.clone(), value.clone());
}
debug!("Environment Variables:");
for (key, value) in &env_vars {
debug!(" {} = {}", key, value);
}
debug!("Actual execution directory: {}", env.working_dir.display());
debug!("==============================");
info!("🔍 DEBUG: Original step shell command: {:?}", step.shell);
info!("🔍 DEBUG: Workflow env vars: {:?}", workflow_env);
let interpolated_step = self.interpolate_step_with_env(step, workflow_env)?;
info!(
"🔍 DEBUG: Interpolated step shell command: {:?}",
interpolated_step.shell
);
let result = self
.command_executor
.execute_setup_step(&interpolated_step, env, env_vars)
.await
.map_err(|e| {
MapReduceError::ProcessingError(format!(
"Setup step {} ({}) failed: {}",
index + 1,
step_name,
e
))
})?;
if result.success {
self.user_interaction.display_success(&format!(
"✓ Setup [{}/{}]: {} completed",
index + 1,
setup_phase.commands.len(),
step_name
));
}
if !result.success {
if let Some(on_failure) = &step.on_failure {
self.user_interaction.display_warning(&format!(
"Setup step {} failed, executing on_failure handler",
index + 1
));
let variables = HashMap::new();
let handler_result = Self::handle_on_failure(
on_failure,
&env.working_dir,
&variables,
&self.command_executor,
&self.user_interaction,
)
.await?;
if !handler_result {
return Err(MapReduceError::ProcessingError(format!(
"Setup step {} failed and on_failure handler failed",
index + 1
)));
}
continue;
}
let mut error_msg = format!("Setup step {} ({}) failed", index + 1, step_name);
if let Some(code) = result.exit_code {
error_msg.push_str(&format!(" (exit code: {})", code));
}
if !result.stderr.trim().is_empty() {
error_msg.push_str(&format!("\nstderr: {}", result.stderr.trim()));
}
if result.stderr.trim().is_empty() && !result.stdout.trim().is_empty() {
error_msg.push_str(&format!("\nstdout: {}", result.stdout.trim()));
}
if let Some(json_log) = &result.json_log_location {
error_msg.push_str(&format!("\n\n📝 Claude log: {}", json_log));
} else if step.claude.is_some() {
if let Ok(repo_name) = crate::storage::extract_repo_name(&self.project_root) {
let log_hint = format!(
"\n\n💡 Check Claude logs at: ~/.prodigy/events/{}/{}/*.jsonl",
repo_name, self.job_id
);
error_msg.push_str(&log_hint);
}
}
return Err(MapReduceError::ProcessingError(error_msg));
}
}
info!("Setup phase completed");
Ok(())
}
async fn load_work_items(&self, map_phase: &MapPhase) -> MapReduceResult<Vec<Value>> {
info!("Loading work items from: {}", map_phase.config.input);
let input_source =
InputSource::detect_with_base(&map_phase.config.input, &self.project_root);
let json_data = match input_source {
InputSource::JsonFile(path) => {
InputSource::load_json_file(&path, &self.project_root).await?
}
InputSource::Command(cmd) => {
let items =
InputSource::execute_command(&cmd, Duration::from_secs(300), &self.subprocess)
.await?;
serde_json::Value::Array(items)
}
};
let pipeline = DataPipeline::from_config(
map_phase.json_path.clone(),
map_phase.filter.clone(),
map_phase.sort_by.clone(),
map_phase.max_items,
)
.map_err(|e| MapReduceError::InvalidConfiguration {
reason: format!("Failed to build data pipeline: {}", e),
field: "configuration".to_string(),
value: "configuration".to_string(),
})?;
let items =
pipeline
.process(&json_data)
.map_err(|e| MapReduceError::InvalidConfiguration {
reason: format!("Failed to process work items: {}", e),
field: "input".to_string(),
value: map_phase.config.input.clone(),
})?;
debug!("Loaded {} work items", items.len());
Ok(items)
}
async fn convert_execution_result_to_agent_result(
result: MapReduceResult<AgentResult>,
agent_id: String,
item_id: String,
duration: Duration,
event_logger: Arc<EventLogger>,
) -> MapReduceResult<AgentResult> {
match result {
Ok(agent_result) => {
event_logger
.log_event(MapReduceEvent::agent_completed(
agent_id.clone(),
item_id.clone(),
chrono::Duration::from_std(duration)
.unwrap_or(chrono::Duration::seconds(0)),
None,
agent_result.commits.clone(),
agent_result.json_log_location.clone(),
))
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
Ok(agent_result)
}
Err(e) => {
let failure_reason = Self::classify_failure_reason(&e);
event_logger
.log_event(MapReduceEvent::agent_failed(
agent_id.clone(),
item_id.clone(),
e.to_string(),
failure_reason,
None,
))
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
Ok(AgentResult {
item_id: item_id.clone(),
status: AgentStatus::Failed(e.to_string()),
output: None,
commits: vec![],
duration: std::time::Duration::from_secs(0),
error: Some(e.to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: Some(agent_id),
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
})
}
}
}
async fn handle_dlq_for_failed_item(
agent_result: &AgentResult,
item: &Value,
item_id: &str,
dlq: Arc<DeadLetterQueue>,
retry_counts: Arc<tokio::sync::RwLock<HashMap<String, u32>>>,
) {
let retry_counts_read = retry_counts.read().await;
let attempt_number = retry_tracking::get_item_attempt_number(item_id, &retry_counts_read);
drop(retry_counts_read);
if let Some(dlq_item) =
dlq_integration::agent_result_to_dlq_item(agent_result, item, attempt_number)
{
if let Err(e) = dlq.add(dlq_item).await {
warn!(
"Failed to add item {} to DLQ: {}. Item tracking may be incomplete.",
agent_result.item_id, e
);
} else {
info!(
"Added failed item {} to DLQ (attempt {})",
agent_result.item_id, attempt_number
);
let mut retry_counts_write = retry_counts.write().await;
*retry_counts_write =
retry_tracking::increment_retry_count(item_id, retry_counts_write.clone());
}
}
}
#[allow(clippy::too_many_arguments)]
async fn process_single_work_item(
index: usize,
item: Value,
job_id: String,
map_phase: MapPhase,
env: ExecutionEnvironment,
semaphore: Arc<Semaphore>,
agent_manager: Arc<dyn AgentLifecycleManager>,
merge_queue: Arc<MergeQueue>,
event_logger: Arc<EventLogger>,
result_collector: Arc<ResultCollector>,
user_interaction: Arc<dyn UserInteraction>,
command_executor: CommandExecutor,
dlq: Arc<DeadLetterQueue>,
retry_counts: Arc<tokio::sync::RwLock<HashMap<String, u32>>>,
timeout_enforcer: Option<Arc<TimeoutEnforcer>>,
total_items: usize,
) -> MapReduceResult<AgentResult> {
let _permit = semaphore.acquire().await.map_err(|e| {
MapReduceError::ProcessingError(format!("Failed to acquire semaphore: {}", e))
})?;
let item_id = format!("item_{}", index);
let agent_id = format!("{}_agent_{}", job_id, index);
event_logger
.log_event(MapReduceEvent::agent_started(
agent_id.clone(),
item_id.clone(),
))
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
let start_time = Instant::now();
let item_for_dlq = item.clone();
let result = Self::execute_agent_for_item(
&agent_manager,
&merge_queue,
&agent_id,
&item_id,
item,
&map_phase,
&env,
&user_interaction,
&command_executor,
timeout_enforcer.as_ref(),
index,
total_items,
)
.await;
let duration = start_time.elapsed();
let agent_result = Self::convert_execution_result_to_agent_result(
result,
agent_id.clone(),
item_id.clone(),
duration,
event_logger.clone(),
)
.await?;
result_collector.add_result(agent_result.clone()).await;
Self::handle_dlq_for_failed_item(
&agent_result,
&item_for_dlq,
&item_id,
dlq.clone(),
retry_counts.clone(),
)
.await;
Ok(agent_result)
}
async fn collect_agent_results(
agent_futures: Vec<tokio::task::JoinHandle<MapReduceResult<AgentResult>>>,
) -> Vec<AgentResult> {
let mut results = Vec::new();
for future in agent_futures {
match future.await {
Ok(Ok(result)) => results.push(result),
Ok(Err(e)) => {
warn!("Agent execution failed: {}", e);
}
Err(e) => {
warn!("Agent task panicked: {}", e);
}
}
}
results
}
async fn execute_map_phase_internal(
&self,
map_phase: MapPhase,
work_items: Vec<Value>,
env: &ExecutionEnvironment,
) -> MapReduceResult<Vec<AgentResult>> {
info!("Executing map phase with {} items", work_items.len());
let total_items = work_items.len();
let max_parallel = map_phase.config.max_parallel.min(total_items);
self.user_interaction.display_progress(&format!(
"Processing {} items with {} parallel agents",
total_items, max_parallel
));
self.event_logger
.log_event(MapReduceEvent::map_phase_started(total_items))
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
let semaphore = Arc::new(Semaphore::new(max_parallel));
let timeout_enforcer = self.timeout_enforcer.lock().await.clone();
let agent_futures: Vec<_> = work_items
.into_iter()
.enumerate()
.map(|(index, item)| {
let sem = Arc::clone(&semaphore);
let agent_manager = Arc::clone(&self.agent_manager);
let merge_queue = Arc::clone(&self.merge_queue);
let event_logger = Arc::clone(&self.event_logger);
let result_collector = Arc::clone(&self.result_collector);
let user_interaction = Arc::clone(&self.user_interaction);
let command_executor = self.command_executor.clone();
let dlq = Arc::clone(&self.dlq);
let retry_counts = Arc::clone(&self.retry_counts);
let map_phase = map_phase.clone();
let env = env.clone();
let job_id = self.job_id.clone();
let timeout_enforcer = timeout_enforcer.clone();
tokio::spawn(Self::process_single_work_item(
index,
item,
job_id,
map_phase,
env,
sem,
agent_manager,
merge_queue,
event_logger,
result_collector,
user_interaction,
command_executor,
dlq,
retry_counts,
timeout_enforcer,
total_items,
))
})
.collect();
let results = Self::collect_agent_results(agent_futures).await;
let summary = AggregationSummary::from_results(&results);
self.event_logger
.log_event(MapReduceEvent::map_phase_completed(
summary.successful,
summary.failed,
))
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
self.display_map_summary(&summary);
Ok(results)
}
async fn register_agent_timeout(
enforcer: Option<&Arc<TimeoutEnforcer>>,
agent_id: &str,
item_id: &str,
commands: &[WorkflowStep],
) -> Option<crate::cook::execution::mapreduce::timeout::TimeoutHandle> {
if let Some(enforcer) = enforcer {
match enforcer
.register_agent_timeout(agent_id.to_string(), item_id.to_string(), commands)
.await
{
Ok(handle) => Some(handle),
Err(e) => {
warn!("Failed to register timeout for agent {}: {}", agent_id, e);
None
}
}
} else {
None
}
}
async fn register_command_lifecycle(
enforcer: Option<&Arc<TimeoutEnforcer>>,
agent_id: &str,
index: usize,
elapsed: Option<Duration>,
) -> MapReduceResult<()> {
if let Some(enforcer) = enforcer {
if let Some(duration) = elapsed {
let _ = enforcer
.register_command_completion(&agent_id.to_string(), index, duration)
.await;
} else {
let _ = enforcer
.register_command_start(&agent_id.to_string(), index)
.await;
}
}
Ok(())
}
async fn unregister_agent_timeout(
enforcer: Option<&Arc<TimeoutEnforcer>>,
agent_id: &str,
) -> MapReduceResult<()> {
if let Some(enforcer) = enforcer {
let _ = enforcer
.unregister_agent_timeout(&agent_id.to_string())
.await;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn merge_and_cleanup_agent(
agent_manager: &Arc<dyn AgentLifecycleManager>,
merge_queue: &Arc<MergeQueue>,
handle: crate::cook::execution::mapreduce::agent::AgentHandle,
config: &AgentConfig,
agent_result: &AgentResult,
env: &ExecutionEnvironment,
agent_id: &str,
item_id: &str,
) -> MapReduceResult<bool> {
if !agent_result.commits.is_empty() {
agent_manager
.create_agent_branch(handle.worktree_path(), &config.branch_name)
.await
.map_err(|e| {
MapReduceError::ProcessingError(format!("Failed to create agent branch: {}", e))
})?;
match merge_queue
.submit_merge(
agent_id.to_string(),
config.branch_name.clone(),
item_id.to_string(),
env.clone(),
)
.await
{
Ok(()) => {
info!("Successfully merged agent {} (item {})", agent_id, item_id);
if let Err(e) = agent_manager.cleanup_agent(handle).await {
warn!(
"Failed to cleanup agent {} after successful merge: {}. \
Work was successfully merged, worktree may need manual cleanup.",
agent_id, e
);
}
Ok(true)
}
Err(e) => {
warn!(
"Failed to merge agent {} (item {}): {}",
agent_id, item_id, e
);
let _ = agent_manager.cleanup_agent(handle).await;
Ok(false)
}
}
} else {
info!(
"Agent {} (item {}) completed with no commits, skipping merge",
agent_id, item_id
);
if let Err(e) = agent_manager.cleanup_agent(handle).await {
warn!(
"Failed to cleanup agent {} (no commits made): {}. \
Worktree may need manual cleanup.",
agent_id, e
);
}
Ok(false)
}
}
#[allow(clippy::too_many_arguments)]
async fn execute_agent_commands(
handle: &crate::cook::execution::mapreduce::agent::AgentHandle,
commands: &[WorkflowStep],
item: &Value,
item_id: &str,
agent_id: &str,
_env: &ExecutionEnvironment,
command_executor: &CommandExecutor,
timeout_enforcer: Option<&Arc<TimeoutEnforcer>>,
user_interaction: &Arc<dyn UserInteraction>,
workflow_env: &HashMap<String, String>,
) -> MapReduceResult<(String, Vec<String>, Vec<String>)> {
let mut output = String::new();
let mut all_commits = Vec::new();
let mut all_files_modified = Vec::new();
for (index, step) in commands.iter().enumerate() {
user_interaction.display_progress(&format!(
"Agent {}: Executing step {}/{}",
agent_id,
index + 1,
commands.len()
));
Self::register_command_lifecycle(timeout_enforcer, agent_id, index, None).await?;
let cmd_start = Instant::now();
let variables = Self::build_item_variables(item, item_id, workflow_env);
let step_result = command_executor
.execute_step_in_worktree(
handle.worktree_path(),
step,
&variables,
None, )
.await?;
Self::register_command_lifecycle(
timeout_enforcer,
agent_id,
index,
Some(cmd_start.elapsed()),
)
.await?;
if !step_result.success {
if let Some(on_failure) = &step.on_failure {
user_interaction.display_warning(&format!(
"Agent {}: Step {} failed, executing on_failure handler",
agent_id,
index + 1
));
let failure_variables = Self::build_item_variables(item, item_id, workflow_env);
let handler_result = Self::handle_on_failure(
on_failure,
handle.worktree_path(),
&failure_variables,
command_executor,
user_interaction,
)
.await?;
if !handler_result {
return Err(MapReduceError::ProcessingError(format!(
"Agent {} step {} failed and on_failure handler failed",
agent_id,
index + 1
)));
}
}
}
if !step_result.stdout.is_empty() {
output.push_str(&step_result.stdout);
output.push('\n');
}
if step.commit_required {
let commits = Self::get_worktree_commits(handle.worktree_path()).await;
all_commits.extend(commits);
let files = Self::get_worktree_modified_files(handle.worktree_path()).await;
all_files_modified.extend(files);
}
}
Ok((output, all_commits, all_files_modified))
}
fn build_item_variables(
item: &Value,
item_id: &str,
workflow_env: &HashMap<String, String>,
) -> HashMap<String, String> {
let mut variables = HashMap::new();
debug!(
item_id = %item_id,
workflow_env = ?workflow_env,
"Building agent environment from workflow env"
);
variables.extend(workflow_env.clone());
if let Value::Object(map) = item {
for (key, value) in map {
let var_key = format!("item.{}", key);
let var_value = match value {
Value::String(s) => s.clone(),
Value::Number(n) => n.to_string(),
Value::Bool(b) => b.to_string(),
Value::Null => "null".to_string(),
_ => value.to_string(),
};
variables.insert(var_key, var_value);
}
}
variables.insert(
"item_json".to_string(),
serde_json::to_string(item).unwrap_or_default(),
);
variables.insert("item_id".to_string(), item_id.to_string());
debug!(
item_id = %item_id,
variables = ?variables,
"Agent environment variables built"
);
variables
}
#[allow(clippy::too_many_arguments)]
async fn execute_agent_for_item(
agent_manager: &Arc<dyn AgentLifecycleManager>,
merge_queue: &Arc<MergeQueue>,
agent_id: &str,
item_id: &str,
item: Value,
map_phase: &MapPhase,
env: &ExecutionEnvironment,
user_interaction: &Arc<dyn UserInteraction>,
command_executor: &CommandExecutor,
timeout_enforcer: Option<&Arc<TimeoutEnforcer>>,
agent_index: usize,
total_items: usize,
) -> MapReduceResult<AgentResult> {
info!("Starting agent {} for item {}", agent_id, item_id);
let config = AgentConfig {
id: agent_id.to_string(),
item_id: item_id.to_string(),
branch_name: format!("agent-{}-{}", agent_id, item_id),
max_retries: 3,
timeout: Duration::from_secs(600),
agent_index,
total_items,
};
let commands = map_phase.agent_template.clone();
let handle = agent_manager
.create_agent(config.clone(), commands.clone())
.await
.map_err(|e| {
MapReduceError::ProcessingError(format!("Failed to create agent: {}", e))
})?;
let _timeout_handle =
Self::register_agent_timeout(timeout_enforcer, agent_id, item_id, &commands).await;
let agent_result = {
let start_time = Instant::now();
let (output, all_commits, all_files_modified) = Self::execute_agent_commands(
&handle,
&commands,
&item,
item_id,
agent_id,
env,
command_executor,
timeout_enforcer,
user_interaction,
&map_phase.workflow_env,
)
.await?;
let total_duration = start_time.elapsed();
AgentResult {
item_id: item_id.to_string(),
status: AgentStatus::Success,
output: Some(output),
commits: all_commits.clone(),
duration: total_duration,
error: None,
worktree_path: Some(handle.worktree_path().to_path_buf()),
branch_name: Some(handle.worktree_session.branch.clone()),
worktree_session_id: Some(agent_id.to_string()),
files_modified: all_files_modified.clone(),
json_log_location: None,
cleanup_status: None,
}
};
Self::unregister_agent_timeout(timeout_enforcer, agent_id).await?;
let merge_successful = Self::merge_and_cleanup_agent(
agent_manager,
merge_queue,
handle,
&config,
&agent_result,
env,
agent_id,
item_id,
)
.await?;
if !merge_successful && !agent_result.commits.is_empty() {
warn!("Agent {} completed successfully but merge failed", agent_id);
let mut final_result = agent_result;
final_result.status = AgentStatus::Failed(
"Agent execution succeeded but merge to parent worktree failed".to_string(),
);
final_result.error =
Some("Merge to parent worktree failed - changes not integrated".to_string());
return Ok(final_result);
}
Ok(agent_result)
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) async fn handle_on_failure(
on_failure: &OnFailureConfig,
worktree_path: &Path,
variables: &HashMap<String, String>,
command_executor: &CommandExecutor,
user_interaction: &Arc<dyn UserInteraction>,
) -> MapReduceResult<bool> {
let (claude_cmd, shell_cmd) = match on_failure {
OnFailureConfig::Advanced { claude, shell, .. } => {
(claude.as_deref(), shell.as_deref())
}
OnFailureConfig::SingleCommand(cmd) => {
if cmd.starts_with("/") {
(Some(cmd.as_str()), None)
} else {
(None, Some(cmd.as_str()))
}
}
_ => (None, None),
};
if let Some(cmd) = claude_cmd {
user_interaction
.display_progress(&format!("on_failure: Executing Claude command: {}", cmd));
info!("Executing on_failure Claude command: {}", cmd);
let step = WorkflowStep {
claude: Some(cmd.to_string()),
..Default::default()
};
let result = command_executor
.execute_step_in_worktree(worktree_path, &step, variables, None)
.await?;
return Ok(result.success);
}
if let Some(cmd) = shell_cmd {
user_interaction
.display_progress(&format!("on_failure: Executing shell command: {}", cmd));
info!("Executing on_failure shell command: {}", cmd);
let step = WorkflowStep {
shell: Some(cmd.to_string()),
..Default::default()
};
let result = command_executor
.execute_step_in_worktree(worktree_path, &step, variables, None)
.await?;
return Ok(result.success);
}
Ok(true)
}
async fn get_worktree_commits(worktree_path: &Path) -> Vec<String> {
use crate::cook::execution::mapreduce::resources::git_operations::{
GitOperationsConfig, GitOperationsService, GitResultExt,
};
let mut service = GitOperationsService::new(GitOperationsConfig::default());
match service
.get_worktree_commits(worktree_path, None, None)
.await
{
Ok(commits) => commits.to_string_list(),
Err(e) => {
warn!("Failed to get worktree commits: {}", e);
vec![]
}
}
}
async fn get_worktree_modified_files(worktree_path: &Path) -> Vec<String> {
use crate::cook::execution::mapreduce::resources::git_operations::{
GitOperationsConfig, GitOperationsService, GitResultExt,
};
let mut service = GitOperationsService::new(GitOperationsConfig::default());
match service
.get_worktree_modified_files(worktree_path, None)
.await
{
Ok(files) => files.to_string_list(),
Err(e) => {
warn!("Failed to get modified files: {}", e);
vec![]
}
}
}
#[cfg_attr(test, allow(dead_code))]
pub(crate) fn build_reduce_interpolation_context(
map_results: &[AgentResult],
summary: &AggregationSummary,
) -> MapReduceResult<crate::cook::execution::interpolation::InterpolationContext> {
use crate::cook::execution::interpolation::InterpolationContext;
let mut context = InterpolationContext::new();
context.set("map.successful", serde_json::json!(summary.successful));
context.set("map.failed", serde_json::json!(summary.failed));
context.set("map.total", serde_json::json!(summary.total));
let results_value = serde_json::to_value(map_results).map_err(|e| {
MapReduceError::ProcessingError(format!("Failed to serialize map results: {}", e))
})?;
context.set("map.results", results_value);
Ok(context)
}
async fn execute_reduce_phase(
&self,
reduce: ReducePhase,
map_results: &[AgentResult],
env: &ExecutionEnvironment,
) -> MapReduceResult<()> {
info!("Executing reduce phase");
self.user_interaction
.display_progress("Starting reduce phase...");
let summary = AggregationSummary::from_results(map_results);
self.display_reduce_summary(&summary);
self.event_logger
.log_event(MapReduceEvent::reduce_phase_started())
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
let mut variables = HashMap::new();
variables.insert("map.successful".to_string(), summary.successful.to_string());
variables.insert("map.failed".to_string(), summary.failed.to_string());
variables.insert("map.total".to_string(), summary.total.to_string());
let full_context = Self::build_reduce_interpolation_context(map_results, &summary)?;
for (index, step) in reduce.commands.iter().enumerate() {
self.user_interaction.display_progress(&format!(
"Reduce phase: Executing step {}/{}",
index + 1,
reduce.commands.len()
));
let step_result = self
.command_executor
.execute_step_in_worktree(
&env.working_dir,
step,
&variables,
Some(&full_context), )
.await?;
if !step_result.success {
if let Some(on_failure) = &step.on_failure {
self.user_interaction.display_warning(&format!(
"Reduce step {} failed, executing on_failure handler",
index + 1
));
let handler_result = Self::handle_on_failure(
on_failure,
&env.working_dir,
&variables,
&self.command_executor,
&self.user_interaction,
)
.await?;
if !handler_result {
return Err(MapReduceError::ProcessingError(format!(
"Reduce step {} failed and on_failure handler failed",
index + 1
)));
}
}
}
}
self.event_logger
.log_event(MapReduceEvent::reduce_phase_completed())
.await
.map_err(|e| MapReduceError::ProcessingError(e.to_string()))?;
self.user_interaction
.display_success("Reduce phase completed");
Ok(())
}
fn display_map_summary(&self, summary: &AggregationSummary) {
let message = format!(
"Map phase completed: {} successful, {} failed (total: {})",
summary.successful, summary.failed, summary.total
);
if summary.failed > 0 {
self.user_interaction.display_warning(&message);
} else {
self.user_interaction.display_success(&message);
}
}
fn display_reduce_summary(&self, summary: &AggregationSummary) {
self.user_interaction.display_info(&format!(
"Reduce phase input: {} items ({} successful, {} failed)",
summary.total, summary.successful, summary.failed
));
}
pub async fn get_results(&self) -> Vec<AgentResult> {
self.result_collector.get_results().await
}
pub async fn clear_results(&self) {
self.result_collector.clear().await;
}
}
struct DummySessionManager;
#[async_trait::async_trait]
impl SessionManager for DummySessionManager {
async fn start_session(&self, _session_id: &str) -> anyhow::Result<()> {
Ok(())
}
async fn update_session(
&self,
_update: crate::cook::session::SessionUpdate,
) -> anyhow::Result<()> {
Ok(())
}
async fn complete_session(&self) -> anyhow::Result<crate::cook::session::SessionSummary> {
Ok(crate::cook::session::SessionSummary {
iterations: 0,
files_changed: 0,
})
}
fn get_state(&self) -> anyhow::Result<crate::cook::session::SessionState> {
Ok(crate::cook::session::SessionState::new(
"dummy".to_string(),
std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
))
}
async fn save_state(&self, _path: &Path) -> anyhow::Result<()> {
Ok(())
}
async fn load_state(&self, _path: &Path) -> anyhow::Result<()> {
Ok(())
}
async fn load_session(
&self,
_session_id: &str,
) -> anyhow::Result<crate::cook::session::SessionState> {
Ok(crate::cook::session::SessionState::new(
"dummy".to_string(),
std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
))
}
async fn save_checkpoint(
&self,
_state: &crate::cook::session::SessionState,
) -> anyhow::Result<()> {
Ok(())
}
async fn list_resumable(&self) -> anyhow::Result<Vec<crate::cook::session::SessionInfo>> {
Ok(vec![])
}
async fn get_last_interrupted(&self) -> anyhow::Result<Option<String>> {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_build_item_variables_with_simple_types() {
let item = json!({
"name": "test-item",
"count": 42,
"enabled": true,
"optional": null
});
let workflow_env = HashMap::new();
let variables =
MapReduceCoordinator::build_item_variables(&item, "item-123", &workflow_env);
assert_eq!(variables.get("item.name"), Some(&"test-item".to_string()));
assert_eq!(variables.get("item.count"), Some(&"42".to_string()));
assert_eq!(variables.get("item.enabled"), Some(&"true".to_string()));
assert_eq!(variables.get("item.optional"), Some(&"null".to_string()));
assert_eq!(variables.get("item_id"), Some(&"item-123".to_string()));
assert!(variables.contains_key("item_json"));
}
#[test]
fn test_build_item_variables_with_nested_objects() {
let item = json!({
"name": "test",
"metadata": {
"key": "value"
}
});
let workflow_env = HashMap::new();
let variables =
MapReduceCoordinator::build_item_variables(&item, "item-456", &workflow_env);
assert_eq!(variables.get("item.name"), Some(&"test".to_string()));
assert!(variables.get("item.metadata").unwrap().contains("key"));
assert_eq!(variables.get("item_id"), Some(&"item-456".to_string()));
}
#[test]
fn test_build_item_variables_with_empty_object() {
let item = json!({});
let workflow_env = HashMap::new();
let variables =
MapReduceCoordinator::build_item_variables(&item, "item-789", &workflow_env);
assert_eq!(variables.get("item_id"), Some(&"item-789".to_string()));
assert_eq!(variables.get("item_json"), Some(&"{}".to_string()));
assert_eq!(variables.len(), 2); }
#[test]
fn test_build_item_variables_with_non_object() {
let item = json!("just a string");
let workflow_env = HashMap::new();
let variables =
MapReduceCoordinator::build_item_variables(&item, "item-999", &workflow_env);
assert_eq!(variables.get("item_id"), Some(&"item-999".to_string()));
assert_eq!(
variables.get("item_json"),
Some(&"\"just a string\"".to_string())
);
assert_eq!(variables.len(), 2); }
}