use anyhow::Result;
use chrono::Local;
use std::io::Read;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use crate::agent::Agent;
use crate::cost;
use crate::pty_bridge::PtyBridge;
use crate::pty_watch::{finalize_output, monitor_bridge, MonitorState};
use crate::store::Store;
use crate::store::TaskCompletionUpdate;
use crate::types::{CompletionInfo, TaskId};
#[allow(clippy::too_many_arguments)]
pub fn run_agent_process(
agent: &dyn Agent,
cmd: &std::process::Command,
task_id: &TaskId,
store: &Arc<Store>,
log_path: &Path,
output_path: Option<&str>,
model: Option<&str>,
streaming: bool,
) -> Result<()> {
let start = std::time::Instant::now();
let mut bridge = spawn_bridge(cmd, log_path)?;
let rx = spawn_reader_thread(bridge.take_reader()?);
let mut log_file = std::fs::File::create(log_path)?;
let mut state = MonitorState::new(streaming);
monitor_bridge(
agent,
task_id,
store,
&mut bridge,
&rx,
&mut log_file,
&mut state,
streaming,
Some(crate::idle_timeout::idle_timeout_from_command(cmd)),
None,
)?;
if bridge.is_alive() {
let _ = bridge.kill_group();
} else {
if let Some(pid) = bridge.child_pid() {
#[cfg(unix)]
unsafe {
libc::kill(-(pid as i32), libc::SIGTERM);
}
}
}
let exit_status = bridge.wait()?;
finalize_output(
agent,
task_id,
store,
output_path,
streaming,
&exit_status,
&mut state,
)?;
record_completion(
agent,
task_id,
store,
model,
start.elapsed().as_millis() as i64,
&state.info,
)
}
fn spawn_bridge(cmd: &std::process::Command, log_path: &Path) -> Result<PtyBridge> {
let (argv, dir, env) = command_parts(cmd);
match PtyBridge::spawn(&argv, dir.as_deref(), env) {
Ok(bridge) => Ok(bridge),
Err(err) => {
let error_msg = format!("Failed to spawn agent process: {err}");
aid_error!("[aid] {error_msg}");
write_spawn_error_log(log_path, &error_msg);
Err(err)
}
}
}
fn spawn_reader_thread(mut reader: Box<dyn Read + Send>) -> mpsc::Receiver<Vec<u8>> {
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
let mut buf = [0u8; 1024];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if tx.send(buf[..n].to_vec()).is_err() {
break;
}
}
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
});
rx
}
fn record_completion(
agent: &dyn Agent,
task_id: &TaskId,
store: &Arc<Store>,
model: Option<&str>,
duration_ms: i64,
info: &CompletionInfo,
) -> Result<()> {
let final_model = info.model.as_deref().or(model);
let cost_usd = info.cost_usd.or_else(|| {
info.tokens
.and_then(|tokens| cost::estimate_cost(tokens, final_model, agent.kind()))
});
let event = crate::types::TaskEvent {
task_id: task_id.clone(),
timestamp: chrono::Local::now(),
event_kind: if info.status == crate::types::TaskStatus::Done {
crate::types::EventKind::Completion
} else {
crate::types::EventKind::Error
},
detail: format!(
"{} ({}{}{})",
info.status.label(),
format_duration(duration_ms),
info.tokens
.map(|t| format!(", {} tokens", t))
.unwrap_or_default(),
cost_usd
.map(|c| format!(", {}", cost::format_cost(Some(c))))
.unwrap_or_default(),
),
metadata: None,
};
store.complete_task_atomic(
TaskCompletionUpdate {
id: task_id.as_str(),
status: info.status,
tokens: info.tokens,
duration_ms,
model: final_model,
cost_usd,
exit_code: info.exit_code,
},
&event,
)?;
crate::state::refresh_project_state(store.as_ref(), task_id);
println!(
"Task {} {} ({}{}{})",
task_id,
info.status.label(),
format_duration(duration_ms),
info.tokens
.map(|tokens| format!(", {} tokens", tokens))
.unwrap_or_default(),
cost_usd
.map(|cost| format!(", {}", cost::format_cost(Some(cost))))
.unwrap_or_default(),
);
Ok(())
}
fn format_duration(ms: i64) -> String {
let secs = ms / 1000;
if secs < 60 {
format!("{secs}s")
} else {
format!("{}m {:02}s", secs / 60, secs % 60)
}
}
fn command_parts(
cmd: &std::process::Command,
) -> (Vec<String>, Option<String>, Vec<(String, String)>) {
let argv = std::iter::once(cmd.get_program())
.chain(cmd.get_args())
.map(|value| value.to_string_lossy().into_owned())
.collect();
let dir = cmd
.get_current_dir()
.map(|path| path.to_string_lossy().into_owned());
let env = cmd
.get_envs()
.filter_map(|(key, value)| {
Some((
key.to_string_lossy().into_owned(),
value?.to_string_lossy().into_owned(),
))
})
.collect();
(argv, dir, env)
}
fn write_spawn_error_log(log_path: &Path, message: &str) {
let event = serde_json::json!({
"type": "error",
"source": "spawn",
"message": message,
"timestamp": Local::now().to_rfc3339(),
});
let _ = std::fs::write(log_path, format!("{event}\n"));
}