use anyhow::Result;
use chrono::Local;
use std::sync::Arc;
use tokio::process::Command;
use crate::{store::Store, types::*, watcher};
use crate::cmd::run::RunArgs;
use crate::store::TaskCompletionUpdate;
use super::{clean_output_if_jsonl, fill_empty_output_from_log};
const FAST_FAIL_SNAPSHOT_MS: i64 = 5_000;
const STDERR_EXCERPT_LINES: usize = 8;
const STDERR_EXCERPT_CHARS: usize = 400;
pub(crate) struct RunProcessArgs<'a> {
pub agent: &'a dyn crate::agent::Agent,
pub cmd: Command,
pub task_id: &'a TaskId,
pub store: &'a Arc<Store>,
pub log_path: &'a std::path::Path,
pub output_path: Option<&'a str>,
pub model: Option<&'a str>,
pub streaming: bool,
pub workgroup_id: Option<&'a str>,
}
#[cfg(unix)]
fn cleanup_process_group(child: &tokio::process::Child) {
if let Some(pid) = child.id() {
unsafe {
libc::kill(-(pid as i32), libc::SIGTERM);
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct FailureContext {
pub working_dir: Option<String>,
pub binary_path: String,
pub worktree_path: Option<String>,
pub worktree_created: bool,
}
pub(crate) fn capture_failure_context(
store: &Store,
task_id: &TaskId,
cmd: &Command,
) -> FailureContext {
let std_cmd = cmd.as_std();
let task = store.get_task(task_id.as_str()).ok().flatten();
FailureContext {
working_dir: std_cmd
.get_current_dir()
.map(|path| path.display().to_string()),
binary_path: std_cmd.get_program().to_string_lossy().into_owned(),
worktree_path: task.as_ref().and_then(|task| task.worktree_path.clone()),
worktree_created: task
.as_ref()
.and_then(|task| task.worktree_path.as_ref())
.is_some(),
}
}
pub(crate) fn insert_phase_error_event(
store: &Store,
task_id: &TaskId,
phase: &str,
error: &str,
stderr: Option<&str>,
) {
let mut detail = format!("Failed during {phase}: {error}");
if let Some(stderr) = stderr.filter(|stderr| !stderr.is_empty()) {
detail.push_str("\nStderr: ");
detail.push_str(stderr);
}
let _ = store.insert_event(&TaskEvent {
task_id: task_id.clone(),
timestamp: Local::now(),
event_kind: EventKind::Error,
detail,
metadata: None,
});
}
pub(crate) fn stderr_excerpt(task_id: &TaskId) -> Option<String> {
let stderr_path = crate::paths::stderr_path(task_id.as_str());
let content = std::fs::read_to_string(stderr_path).ok()?;
let lines: Vec<&str> = content
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.collect();
if lines.is_empty() {
return None;
}
let start = lines.len().saturating_sub(STDERR_EXCERPT_LINES);
Some(compact_excerpt(&lines[start..].join(" | "), STDERR_EXCERPT_CHARS))
}
pub(crate) fn resolve_failure_exit_code(
store: &Store,
task_id: &TaskId,
exit_code: Option<i32>,
) -> Option<i32> {
exit_code.or_else(|| {
store
.get_events(task_id.as_str())
.ok()?
.iter()
.rev()
.find_map(|event| {
event
.detail
.rsplit_once("exit code ")
.and_then(|(_, tail)| tail.split_whitespace().next())
.and_then(|code| code.parse::<i32>().ok())
})
})
}
pub(crate) fn record_execution_failure(
store: &Store,
task_id: &TaskId,
duration_ms: i64,
exit_code: Option<i32>,
context: &FailureContext,
) {
let reason = match exit_code {
Some(code) => format!("agent exited with code {code}"),
None => "agent process failed".to_string(),
};
let stderr = stderr_excerpt(task_id);
insert_phase_error_event(store, task_id, "execution", &reason, stderr.as_deref());
insert_fast_fail_snapshot_event(store, task_id, duration_ms, exit_code, context);
}
fn insert_fast_fail_snapshot_event(
store: &Store,
task_id: &TaskId,
duration_ms: i64,
exit_code: Option<i32>,
context: &FailureContext,
) {
if duration_ms >= FAST_FAIL_SNAPSHOT_MS || !matches!(exit_code, Some(code) if code != 0) {
return;
}
let detail = format!(
"Failure context: working directory: {}; agent binary: {}; worktree path: {}; worktree created: {}",
context
.working_dir
.as_deref()
.unwrap_or("(inherit current process directory)"),
if context.binary_path.is_empty() {
"(unknown)"
} else {
context.binary_path.as_str()
},
context.worktree_path.as_deref().unwrap_or("(none)"),
context.worktree_created,
);
let _ = store.insert_event(&TaskEvent {
task_id: task_id.clone(),
timestamp: Local::now(),
event_kind: EventKind::Error,
detail,
metadata: None,
});
}
fn compact_excerpt(text: &str, max_chars: usize) -> String {
let text = text.trim();
if text.chars().count() <= max_chars {
return text.to_string();
}
let mut excerpt: String = text.chars().take(max_chars).collect();
excerpt.push_str("...");
excerpt
}
pub(crate) async fn run_agent_process_impl(args: RunProcessArgs<'_>) -> Result<()> {
let RunProcessArgs {
agent,
mut cmd,
task_id,
store,
log_path,
output_path,
model,
streaming,
workgroup_id,
} = args;
let start = std::time::Instant::now();
let idle_timeout = crate::idle_timeout::idle_timeout_from_tokio_command(&cmd);
let failure_context = capture_failure_context(store.as_ref(), task_id, &cmd);
#[cfg(unix)]
cmd.process_group(0);
crate::cmd::noninteractive_stdio::configure(&mut cmd);
let mut child = match cmd.spawn() {
Ok(child) => child,
Err(err) => {
let err = anyhow::Error::new(err).context("Failed to spawn agent process");
let stderr = stderr_excerpt(task_id)
.or_else(|| Some("unavailable (process did not start)".to_string()));
insert_phase_error_event(
store.as_ref(),
task_id,
"agent spawn",
&err.to_string(),
stderr.as_deref(),
);
return Err(err);
}
};
let info = if streaming {
watcher::watch_streaming(agent, &mut child, task_id, store, log_path, workgroup_id, Some(idle_timeout), None).await
} else {
let out = output_path.map(std::path::Path::new);
watcher::watch_buffered(agent, &mut child, task_id, store, log_path, out, workgroup_id).await
};
let info = match info {
Ok(info) => info,
Err(err) => {
let stderr = stderr_excerpt(task_id);
insert_phase_error_event(
store.as_ref(),
task_id,
"execution",
&err.to_string(),
stderr.as_deref(),
);
return Err(err);
}
};
#[cfg(unix)]
cleanup_process_group(&child);
let _ = child.kill().await;
let _ = child.wait().await;
let output_path = output_path.map(std::path::Path::new);
fill_empty_output_from_log(log_path, output_path)?;
if let Some(out_path) = output_path {
clean_output_if_jsonl(out_path)?;
}
let duration_ms = start.elapsed().as_millis() as i64;
let exit_code = resolve_failure_exit_code(store.as_ref(), task_id, info.exit_code);
if info.status == TaskStatus::Failed {
record_execution_failure(
store.as_ref(),
task_id,
duration_ms,
exit_code,
&failure_context,
);
}
let final_model = info.model.as_deref().or(model);
let cost_usd = info.cost_usd.or_else(|| info.tokens.and_then(|tokens| crate::cost::estimate_cost(tokens, final_model, agent.kind())));
store.update_task_completion(TaskCompletionUpdate {
id: task_id.as_str(),
status: info.status,
tokens: info.tokens,
duration_ms,
model: final_model,
cost_usd,
exit_code,
})?;
let duration_str = format_duration(duration_ms);
let tokens_str = info.tokens.map(|t| format!(", {} tokens", t)).unwrap_or_default();
let cost_str = if cost_usd.is_some() { format!(", {}", crate::cost::format_cost(cost_usd)) } else { String::new() };
let fail_reason = if info.status == TaskStatus::Failed {
store.latest_error(task_id.as_str())
.map(|r| format!("\n[aid] Reason: {r}"))
.unwrap_or_default()
} else {
String::new()
};
println!("Task {} {} ({}{}{}){}", task_id, info.status.label(), duration_str, tokens_str, cost_str, fail_reason);
Ok(())
}
pub(crate) fn notify_task_completion(store: &Store, task_id: &TaskId) -> Result<()> {
if let Some(task) = store.get_task(task_id.as_str())? {
crate::notify::notify_completion(&task);
}
Ok(())
}
pub(crate) fn current_branch(repo_dir: &std::path::Path) -> Option<String> {
let out = std::process::Command::new("git")
.args(["-C", &repo_dir.to_string_lossy(), "rev-parse", "--abbrev-ref", "HEAD"])
.output()
.ok()?;
if !out.status.success() { return None; }
let branch = String::from_utf8(out.stdout).ok()?.trim().to_string();
if branch == "HEAD" { return None; } Some(branch)
}
pub(crate) fn inherit_retry_base_branch_impl(repo_dir: Option<&str>, task: &Task, retry_args: &mut RunArgs) {
if retry_args.base_branch.is_some() || retry_args.worktree.is_none() { return; }
let Some(branch) = task.worktree_branch.as_deref() else { return; };
if retry_args.worktree.as_deref() == Some(branch) { return; }
let repo_dir = std::path::Path::new(task.repo_path.as_deref().or(retry_args.repo.as_deref()).or(repo_dir).unwrap_or("."));
if let Ok(true) = crate::worktree::branch_has_commits_ahead_of_main(repo_dir, branch) { retry_args.base_branch = Some(branch.to_string()); }
}
pub(crate) fn retry_target(task: &Task) -> (Option<String>, Option<String>) {
match task.worktree_path.as_ref() {
Some(path) if std::path::Path::new(path).exists() => (Some(path.clone()), None),
Some(_) => (None, task.worktree_branch.clone()),
None => (None, None),
}
}
fn format_duration(ms: i64) -> String {
let secs = ms / 1000;
if secs < 60 { format!("{secs}s") } else { format!("{}m {:02}s", secs / 60, secs % 60) }
}