use super::{PhaseContext, PhaseError, PhaseExecutor, PhaseMetrics, PhaseResult, PhaseType};
use crate::cook::execution::mapreduce::{AgentResult, MapPhase};
use async_trait::async_trait;
use serde_json::{json, Value};
use std::time::Instant;
use tracing::{debug, info, warn};
pub struct MapPhaseExecutor {
map_phase: MapPhase,
}
impl MapPhaseExecutor {
pub fn new(map_phase: MapPhase) -> Self {
Self { map_phase }
}
async fn parse_work_items(&self, context: &PhaseContext) -> Result<Vec<Value>, PhaseError> {
let input = &self.map_phase.config.input;
let work_items_path = context.environment.working_dir.join(input);
if work_items_path.exists() {
let content = std::fs::read_to_string(&work_items_path).map_err(|e| {
PhaseError::ExecutionFailed {
message: format!("Failed to read work items file: {}", e),
}
})?;
let items: Vec<Value> =
serde_json::from_str(&content).map_err(|e| PhaseError::ExecutionFailed {
message: format!("Failed to parse work items JSON: {}", e),
})?;
return Ok(items);
}
Err(PhaseError::ExecutionFailed {
message: "Dynamic work item generation not implemented in this simplified version"
.to_string(),
})
}
async fn distribute_work(
&self,
work_items: Vec<Value>,
context: &mut PhaseContext,
) -> Result<Vec<AgentResult>, PhaseError> {
info!("Distributing {} work items to agents", work_items.len());
let filtered_items = self.apply_filters(work_items);
let limited_items = self.apply_limits(filtered_items);
let mut results = Vec::new();
for (index, _item) in limited_items.iter().enumerate() {
debug!("Processing work item {}", index);
results.push(AgentResult {
item_id: format!("item-{}", index),
status: crate::cook::execution::mapreduce::AgentStatus::Success,
output: Some(format!("Processed item {}", index)),
commits: vec![format!("commit-{}", index)],
files_modified: Vec::new(),
duration: std::time::Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: Some(format!("agent-branch-{}", index)),
worktree_session_id: None,
json_log_location: None,
cleanup_status: None,
});
}
context.map_results = Some(results.clone());
Ok(results)
}
fn apply_filters(&self, items: Vec<Value>) -> Vec<Value> {
if let Some(_filter) = &self.map_phase.filter {
items
.into_iter()
.filter(|_item| {
true
})
.collect()
} else {
items
}
}
fn apply_limits(&self, items: Vec<Value>) -> Vec<Value> {
let mut limited = items;
if let Some(offset) = self.map_phase.config.offset {
limited = limited.into_iter().skip(offset).collect();
}
if let Some(max_items) = self.map_phase.config.max_items {
limited = limited.into_iter().take(max_items).collect();
}
limited
}
}
#[async_trait]
impl PhaseExecutor for MapPhaseExecutor {
async fn execute(&self, context: &mut PhaseContext) -> Result<PhaseResult, PhaseError> {
info!("Starting map phase execution");
let start_time = Instant::now();
let work_items = self.parse_work_items(context).await?;
info!("Found {} work items to process", work_items.len());
if work_items.is_empty() {
warn!("No work items to process in map phase");
return Ok(PhaseResult {
phase_type: PhaseType::Map,
success: true,
data: Some(json!({
"message": "No work items to process",
"results": []
})),
error_message: Some("No work items found".to_string()),
metrics: PhaseMetrics::default(),
});
}
let results = self.distribute_work(work_items.clone(), context).await?;
let successful = results.iter().filter(|r| r.is_success()).count();
let failed = results.iter().filter(|r| !r.is_success()).count();
let duration = start_time.elapsed();
let metrics = PhaseMetrics {
duration_secs: duration.as_secs_f64(),
items_processed: results.len(),
items_successful: successful,
items_failed: failed,
};
info!(
"Map phase completed: {} successful, {} failed out of {} total",
successful,
failed,
results.len()
);
Ok(PhaseResult {
phase_type: PhaseType::Map,
success: failed == 0,
data: Some(json!({
"total": results.len(),
"successful": successful,
"failed": failed,
"results": results,
})),
error_message: if failed > 0 {
Some(format!("{} items failed processing", failed))
} else {
None
},
metrics,
})
}
fn phase_type(&self) -> PhaseType {
PhaseType::Map
}
fn validate_context(&self, _context: &PhaseContext) -> Result<(), PhaseError> {
if self.map_phase.config.input.is_empty() {
return Err(PhaseError::ValidationError {
message: "Map phase input source is not specified".to_string(),
});
}
if self.map_phase.config.max_parallel == 0 {
return Err(PhaseError::ValidationError {
message: "max_parallel must be greater than 0".to_string(),
});
}
Ok(())
}
}