use crate::batch;
use crate::cmd::run;
use crate::store::Store;
use anyhow::Result;
use std::sync::Arc;
use super::batch_args::task_to_run_args;
use super::batch_dispatch_support::{
auto_fallback_agent,
dispatch_level_with_ids,
should_auto_fallback,
wait_for_any_completion,
};
use super::batch_helpers::{resolve_hook_targets, trigger_conditional};
use super::batch_types::{BatchDispatchResult, BatchTaskOutcome};
use super::batch_validate::{find_ready_tasks, resolve_dependencies, task_label};
pub(crate) async fn dispatch_parallel(
store: Arc<Store>,
tasks: &[batch::BatchTask],
max_concurrent: Option<usize>,
auto_fallback: bool,
shared_dir_path: Option<&str>,
) -> Result<BatchDispatchResult> {
let dependencies = vec![Vec::new(); tasks.len()];
let default_max = crate::system_resources::recommended_max_concurrent().min(tasks.len());
let max_active = max_concurrent.unwrap_or(default_max).max(1);
dispatch_with_dependencies(
store,
tasks,
&dependencies,
max_active,
auto_fallback,
shared_dir_path,
)
.await
}
pub(crate) async fn dispatch_sequential(
store: Arc<Store>,
tasks: &[batch::BatchTask],
auto_fallback: bool,
shared_dir_path: Option<&str>,
) -> Result<BatchDispatchResult> {
let dependencies = vec![Vec::new(); tasks.len()];
dispatch_with_dependencies(store, tasks, &dependencies, 1, auto_fallback, shared_dir_path).await
}
pub(crate) async fn dispatch_parallel_with_dependencies(
store: Arc<Store>,
tasks: &[batch::BatchTask],
max_concurrent: Option<usize>,
auto_fallback: bool,
shared_dir_path: Option<&str>,
) -> Result<BatchDispatchResult> {
let dependencies = resolve_dependencies(tasks)?;
let default_max = crate::system_resources::recommended_max_concurrent().min(tasks.len());
let max_active = max_concurrent.unwrap_or(default_max).max(1);
dispatch_with_dependencies(
store,
tasks,
&dependencies,
max_active,
auto_fallback,
shared_dir_path,
)
.await
}
pub(crate) async fn dispatch_sequential_with_dependencies(
store: Arc<Store>,
tasks: &[batch::BatchTask],
auto_fallback: bool,
shared_dir_path: Option<&str>,
) -> Result<BatchDispatchResult> {
let dependencies = resolve_dependencies(tasks)?;
dispatch_with_dependencies(store, tasks, &dependencies, 1, auto_fallback, shared_dir_path).await
}
async fn dispatch_with_dependencies(
store: Arc<Store>,
tasks: &[batch::BatchTask],
dependencies: &[Vec<usize>],
max_active: usize,
auto_fallback: bool,
shared_dir_path: Option<&str>,
) -> Result<BatchDispatchResult> {
if tasks.is_empty() {
return Ok(BatchDispatchResult {
task_ids: Vec::new(),
outcomes: Vec::new(),
});
}
let mut rate_warned = std::collections::HashSet::new();
for task in tasks {
let agent_name = if task.agent.is_empty() {
"codex"
} else {
task.agent.as_str()
};
let Some(kind) = crate::types::AgentKind::parse_str(agent_name) else {
continue;
};
if !crate::rate_limit::is_rate_limited(&kind) || !rate_warned.insert(agent_name.to_string()) {
continue;
}
if task.fallback.is_some() {
aid_warn!("[batch] {} is rate-limited — tasks with fallback will auto-cascade", agent_name);
} else {
aid_warn!(
"[batch] {} is rate-limited — tasks without fallback may fail. Consider adding fallback.",
agent_name
);
}
}
let waiting_ids: Vec<String> = tasks
.iter()
.enumerate()
.map(|(i, task)| {
let id = task
.id
.as_ref()
.filter(|s| crate::sanitize::is_valid_task_id(s))
.map(|s| crate::types::TaskId(s.clone()))
.unwrap_or_else(crate::types::TaskId::generate);
let agent = if task.agent.is_empty() { "auto" } else { &task.agent };
let prompt_preview = if task.prompt.len() > 120 {
let mut end = 120;
while !task.prompt.is_char_boundary(end) { end -= 1; }
&task.prompt[..end]
} else {
&task.prompt
};
if let Err(e) = store.insert_waiting_task(id.as_str(), agent, prompt_preview, task.group.as_deref()) {
aid_warn!("[aid] Warning: failed to pre-create task {i}: {e}");
}
id.to_string()
})
.collect();
let name_map = batch::task_name_map(tasks)?;
let success_targets = resolve_hook_targets(tasks, &name_map, |task| task.on_success.as_deref())?;
let failure_targets = resolve_hook_targets(tasks, &name_map, |task| task.on_fail.as_deref())?;
let mut started = vec![false; tasks.len()];
let mut outcomes: Vec<Option<BatchTaskOutcome>> = vec![None; tasks.len()];
let mut retried = vec![false; tasks.len()];
let mut triggered: Vec<bool> = tasks.iter().map(|task| !task.conditional).collect();
let mut active: Vec<(usize, String)> = Vec::new();
let mut task_ids: Vec<String> = Vec::new();
let max_active = max_active.max(1);
while outcomes.iter().any(Option::is_none) {
let ready = find_ready_tasks(
&store,
tasks,
dependencies,
&started,
&mut outcomes,
&triggered,
)?;
let available = max_active.saturating_sub(active.len());
if available > 0 && !ready.is_empty() {
let dispatch_group: Vec<_> = ready.into_iter().take(available).collect();
for dispatch in dispatch_level_with_ids(
store.clone(),
tasks,
&dispatch_group,
&waiting_ids,
shared_dir_path,
)
.await? {
started[dispatch.index] = true;
match dispatch.task_id {
Some(task_id) => {
task_ids.push(task_id.clone());
active.push((dispatch.index, task_id));
}
None => {
outcomes[dispatch.index] = Some(BatchTaskOutcome::Failed);
let _ = store.update_task_status(
&waiting_ids[dispatch.index],
crate::types::TaskStatus::Skipped,
);
trigger_conditional(
BatchTaskOutcome::Failed,
dispatch.index,
&mut triggered,
&success_targets,
&failure_targets,
);
}
}
}
}
if active.is_empty() {
break;
}
for completed in wait_for_any_completion(&store, &mut active)? {
if let Some(retry_task_id) = maybe_dispatch_auto_fallback(
store.clone(),
tasks,
completed.index,
&completed.task_id,
completed.outcome,
auto_fallback,
&mut retried,
shared_dir_path,
)
.await?
{
task_ids.push(retry_task_id.clone());
active.push((completed.index, retry_task_id));
continue;
}
outcomes[completed.index] = Some(completed.outcome);
trigger_conditional(
completed.outcome,
completed.index,
&mut triggered,
&success_targets,
&failure_targets,
);
}
}
for (i, outcome) in outcomes.iter().enumerate() {
if outcome.is_none() {
let _ = store.update_task_status(&waiting_ids[i], crate::types::TaskStatus::Skipped);
}
}
let mut all_ids = waiting_ids;
all_ids.extend(task_ids);
Ok(BatchDispatchResult {
task_ids: all_ids,
outcomes: outcomes
.into_iter()
.map(|outcome| outcome.unwrap_or(BatchTaskOutcome::Skipped))
.collect(),
})
}
async fn maybe_dispatch_auto_fallback(
store: Arc<Store>,
tasks: &[batch::BatchTask],
task_idx: usize,
task_id: &str,
outcome: BatchTaskOutcome,
auto_fallback: bool,
retried: &mut [bool],
shared_dir_path: Option<&str>,
) -> Result<Option<String>> {
if !should_auto_fallback(auto_fallback, retried[task_idx], outcome) {
return Ok(None);
}
let Some((original_agent, fallback_agent)) = auto_fallback_agent(&store, task_id, tasks, task_idx)? else {
return Ok(None);
};
let siblings: Vec<_> = tasks
.iter()
.enumerate()
.filter(|(idx, _)| *idx != task_idx)
.map(|(_, task)| task)
.collect();
let mut run_args = task_to_run_args(
&tasks[task_idx],
&siblings,
true,
&store,
shared_dir_path,
);
run_args.agent_name = fallback_agent.as_str().to_string();
run_args.parent_task_id = Some(task_id.to_string());
retried[task_idx] = true;
aid_info!(
"[batch] Auto-fallback: {} -> {} for task {}",
original_agent,
fallback_agent.as_str(),
task_label(&tasks[task_idx], task_idx),
);
let retry_id = run::run(store, run_args).await?;
Ok(Some(retry_id.to_string()))
}