use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use super::{CookConfig, ExecutionEnvironment};
use crate::config::WorkflowCommand;
use crate::cook::execution::ClaudeExecutor;
use crate::cook::interaction::UserInteraction;
use crate::cook::session::{SessionManager, SessionUpdate};
use crate::cook::workflow::{ExtendedWorkflowConfig, WorkflowContext, WorkflowStep};
use crate::testing::config::TestConfiguration;
use crate::unified_session::{format_duration, TimingTracker};
pub struct ArgumentProcessor {
claude_executor: Arc<dyn ClaudeExecutor>,
session_manager: Arc<dyn SessionManager>,
user_interaction: Arc<dyn UserInteraction>,
test_config: Option<Arc<TestConfiguration>>,
}
impl ArgumentProcessor {
pub fn new(
claude_executor: Arc<dyn ClaudeExecutor>,
session_manager: Arc<dyn SessionManager>,
user_interaction: Arc<dyn UserInteraction>,
test_config: Option<Arc<TestConfiguration>>,
) -> Self {
Self {
claude_executor,
session_manager,
user_interaction,
test_config,
}
}
pub async fn execute_workflow_with_args(
&self,
env: &ExecutionEnvironment,
config: &CookConfig,
) -> Result<()> {
log::debug!("execute_workflow_with_args started");
let workflow_start = Instant::now();
let mut timing_tracker = TimingTracker::new();
log::debug!("Collecting workflow inputs");
let all_inputs = self.collect_workflow_inputs(config)?;
log::debug!("Collected {} inputs", all_inputs.len());
if all_inputs.is_empty() {
return Err(anyhow!("No inputs found from --map patterns or --args"));
}
self.user_interaction
.display_status(&format!("Total inputs to process: {}", all_inputs.len()));
for (index, input) in all_inputs.iter().enumerate() {
timing_tracker.start_iteration();
self.process_workflow_input(
env,
config,
input,
index,
all_inputs.len(),
&mut timing_tracker,
)
.await?;
if let Some(iteration_duration) = timing_tracker.complete_iteration() {
self.user_interaction.display_success(&format!(
"Input {} completed in {}",
index + 1,
format_duration(iteration_duration)
));
}
}
self.user_interaction.display_success(&format!(
"Processed all {} inputs successfully!",
all_inputs.len()
));
let total_duration = workflow_start.elapsed();
self.user_interaction.display_metric(
"Total workflow time",
&format!(
"{} for {} inputs",
format_duration(total_duration),
all_inputs.len()
),
);
Ok(())
}
async fn process_workflow_input(
&self,
env: &ExecutionEnvironment,
config: &CookConfig,
input: &str,
index: usize,
total: usize,
_timing_tracker: &mut TimingTracker,
) -> Result<()> {
self.user_interaction.display_progress(&format!(
"Processing input {}/{}: {}",
index + 1,
total,
input
));
self.session_manager
.update_session(SessionUpdate::IncrementIteration)
.await?;
let mut variables = HashMap::new();
variables.insert("ARG".to_string(), input.to_string());
variables.insert("INDEX".to_string(), (index + 1).to_string());
variables.insert("TOTAL".to_string(), total.to_string());
let steps: Vec<WorkflowStep> = config
.workflow
.commands
.iter()
.map(Self::convert_command_to_step)
.collect();
let extended_workflow = ExtendedWorkflowConfig {
name: "args-workflow".to_string(),
mode: crate::cook::workflow::WorkflowMode::Sequential,
steps,
setup_phase: None,
map_phase: None,
reduce_phase: None,
max_iterations: 1,
iterate: false,
retry_defaults: None,
environment: None,
};
let _workflow_context = WorkflowContext {
variables: variables.clone(),
captured_outputs: HashMap::new(),
iteration_vars: HashMap::new(),
validation_results: HashMap::new(),
variable_store: std::sync::Arc::new(crate::cook::workflow::VariableStore::new()),
git_tracker: None,
};
let checkpoint_storage = crate::cook::workflow::CheckpointStorage::Session {
session_id: env.session_id.to_string(),
};
let checkpoint_manager = Arc::new(crate::cook::workflow::CheckpointManager::with_storage(
checkpoint_storage,
));
let workflow_id = format!("workflow-{}", chrono::Utc::now().timestamp_millis());
let mut executor = self
.create_workflow_executor_internal(config)
.with_checkpoint_manager(checkpoint_manager.clone(), workflow_id.clone())
.with_dry_run(config.command.dry_run);
executor = executor.with_positional_args(vec![input.to_string()]);
if let Some(test_config) = &self.test_config {
executor = crate::cook::workflow::WorkflowExecutorImpl::with_test_config(
self.claude_executor.clone(),
self.session_manager.clone(),
self.user_interaction.clone(),
test_config.clone(),
)
.with_checkpoint_manager(checkpoint_manager, workflow_id)
.with_dry_run(config.command.dry_run)
.with_positional_args(vec![input.to_string()]);
}
let mut global_env: HashMap<String, crate::cook::environment::EnvValue> = config
.workflow
.env
.as_ref()
.map(|env| {
env.iter()
.map(|(k, v)| {
(
k.clone(),
crate::cook::environment::EnvValue::Static(v.clone()),
)
})
.collect()
})
.unwrap_or_default();
global_env.insert(
"PRODIGY_ARG".to_string(),
crate::cook::environment::EnvValue::Static(input.to_string()),
);
let global_env_config = crate::cook::environment::EnvironmentConfig {
global_env,
secrets: config.workflow.secrets.clone().unwrap_or_default(),
env_files: config.workflow.env_files.clone().unwrap_or_default(),
inherit: true,
profiles: config.workflow.profiles.clone().unwrap_or_default(),
active_profile: None,
};
executor = executor.with_environment_config(global_env_config)?;
executor.execute(&extended_workflow, env).await?;
Ok(())
}
fn collect_workflow_inputs(&self, config: &CookConfig) -> Result<Vec<String>> {
let mut all_inputs = Vec::new();
for pattern in &config.command.map {
self.user_interaction
.display_info(&format!("🔍 Processing file pattern: {pattern}"));
let pattern_inputs = self.process_glob_pattern(pattern)?;
all_inputs.extend(pattern_inputs);
}
if !config.command.args.is_empty() {
self.user_interaction.display_action(&format!(
"Adding {} direct arguments from --args",
config.command.args.len()
));
all_inputs.extend(config.command.args.clone());
}
Ok(all_inputs)
}
fn process_glob_pattern(&self, pattern: &str) -> Result<Vec<String>> {
let mut inputs = Vec::new();
match glob::glob(pattern) {
Ok(entries) => {
let mut pattern_matches = 0;
for path in entries.flatten() {
self.user_interaction
.display_success(&format!("Found file: {}", path.display()));
let input = self.extract_input_from_path(&path);
inputs.push(input);
pattern_matches += 1;
}
if pattern_matches == 0 {
self.user_interaction
.display_warning(&format!("No files matched pattern: {pattern}"));
} else {
self.user_interaction.display_success(&format!(
"📁 Found {pattern_matches} files matching pattern: {pattern}"
));
}
}
Err(e) => {
self.user_interaction
.display_error(&format!("Error processing pattern '{pattern}': {e}"));
}
}
Ok(inputs)
}
fn extract_input_from_path(&self, path: &Path) -> String {
if let Some(stem) = path.file_stem() {
let filename = stem.to_string_lossy();
if let Some(dash_pos) = filename.find('-') {
filename[..dash_pos].to_string()
} else {
filename.to_string()
}
} else {
path.to_string_lossy().to_string()
}
}
fn convert_command_to_step(cmd: &WorkflowCommand) -> WorkflowStep {
super::normalization::convert_command_to_step(cmd)
}
fn create_workflow_executor_internal(
&self,
config: &CookConfig,
) -> crate::cook::workflow::WorkflowExecutorImpl {
crate::cook::workflow::WorkflowExecutorImpl::new(
self.claude_executor.clone(),
self.session_manager.clone(),
self.user_interaction.clone(),
)
.with_workflow_path(config.command.playbook.clone())
.with_dry_run(config.command.dry_run)
.with_positional_args(config.command.args.clone())
}
}