use std::path::Path;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio_util::sync::CancellationToken;
use crate::config::TaskConfig;
use crate::error::{Error, Result};
use crate::event::Event;
use crate::runner::{AgentRunner, EventStream};
const MAX_STDERR_BYTES: usize = 64 * 1024;
pub(crate) struct ChildGuard {
pid: u32,
killed: AtomicBool,
}
impl ChildGuard {
fn new(pid: u32) -> Self {
Self {
pid,
killed: AtomicBool::new(false),
}
}
#[cfg(unix)]
pub(crate) fn kill(&self) {
if self.killed.swap(true, Ordering::SeqCst) {
return;
}
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
let pgid = Pid::from_raw(self.pid as i32);
if let Err(e) = killpg(pgid, Signal::SIGTERM) {
tracing::debug!("SIGTERM to pgid {} failed: {e}", self.pid);
return; }
let pid = self.pid;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(2));
let pgid = Pid::from_raw(pid as i32);
if let Err(e) = killpg(pgid, Signal::SIGKILL) {
tracing::debug!("SIGKILL to pgid {} failed: {e}", pid);
}
});
}
#[cfg(windows)]
pub(crate) fn kill(&self) {
if self.killed.swap(true, Ordering::SeqCst) {
return;
}
if let Err(e) = std::process::Command::new("taskkill")
.args(["/PID", &self.pid.to_string(), "/T", "/F"])
.output()
{
tracing::debug!("taskkill for pid {} failed: {e}", self.pid);
}
}
#[cfg(not(any(unix, windows)))]
pub(crate) fn kill(&self) {
if self.killed.swap(true, Ordering::SeqCst) {
return;
}
tracing::warn!("process cleanup not supported on this platform (pid={})", self.pid);
}
}
impl Drop for ChildGuard {
fn drop(&mut self) {
self.kill();
}
}
pub struct StreamHandle {
pub stream: EventStream,
pub cancel_token: CancellationToken,
}
pub async fn spawn_and_stream<F>(
runner: &dyn AgentRunner,
config: &TaskConfig,
parse_line: F,
cancel_token: Option<CancellationToken>,
) -> Result<StreamHandle>
where
F: Fn(&str) -> Vec<Result<Event>> + Send + Sync + 'static,
{
let binary = runner.binary_path(config)?;
let args = runner.build_args(config);
let env_vars = runner.build_env(config);
let cwd = config
.cwd
.clone()
.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
validate_cwd(&cwd)?;
tracing::debug!(
agent = runner.name(),
binary = %binary.display(),
args = ?args,
cwd = %cwd.display(),
"spawning agent process"
);
let mut cmd = Command::new(&binary);
cmd.args(&args)
.current_dir(&cwd)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
#[cfg(unix)]
cmd.process_group(0);
for (k, v) in &env_vars {
cmd.env(k, v);
}
for (k, v) in &config.env {
cmd.env(k, v);
}
let mut child = cmd.spawn().map_err(Error::SpawnFailed)?;
let child_pid = child
.id()
.ok_or_else(|| Error::Other("failed to get child process ID".into()))?;
let guard = Arc::new(ChildGuard::new(child_pid));
let stdout = child
.stdout
.take()
.ok_or_else(|| Error::Other("failed to capture stdout".into()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| Error::Other("failed to capture stderr".into()))?;
let stderr_handle = tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
let mut buf = String::new();
while let Ok(Some(line)) = lines.next_line().await {
if buf.len() >= MAX_STDERR_BYTES {
break;
}
if !buf.is_empty() {
buf.push('\n');
}
let remaining = MAX_STDERR_BYTES - buf.len();
if line.len() > remaining {
buf.push_str(&line[..remaining]);
break;
}
buf.push_str(&line);
}
buf
});
let wait_handle = tokio::spawn(async move { child.wait().await });
let mut reader = BufReader::new(stdout).lines();
let token = cancel_token.unwrap_or_default();
let token_for_task = token.clone();
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event>>(256);
tokio::spawn(async move {
let _guard = guard;
loop {
tokio::select! {
_ = token_for_task.cancelled() => {
_guard.kill();
break;
}
line_result = reader.next_line() => {
match line_result {
Ok(Some(line)) => {
if line.trim().is_empty() {
continue;
}
let events = parse_line(&line);
for result in events {
let stamped = result.map(|e| e.stamp());
if tx.send(stamped).await.is_err() {
return; }
}
}
Ok(None) => break, Err(e) => {
let _ = tx.send(Err(Error::Io(e))).await;
break;
}
}
}
}
}
if token_for_task.is_cancelled() {
return;
}
match wait_handle.await {
Ok(Ok(status)) if !status.success() => {
let stderr_text = stderr_handle.await.unwrap_or_default();
let code = status.code().unwrap_or(-1);
let _ = tx
.send(Err(Error::ProcessFailed {
code,
stderr: stderr_text,
}))
.await;
}
Ok(Err(e)) => {
let _ = tx.send(Err(Error::Io(e))).await;
}
Err(e) => {
let _ = tx
.send(Err(Error::Other(format!("join error: {e}"))))
.await;
}
_ => {} }
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(StreamHandle {
stream: Box::pin(stream),
cancel_token: token,
})
}
fn validate_cwd(cwd: &Path) -> Result<()> {
if !cwd.exists() {
return Err(Error::InvalidWorkDir(cwd.to_path_buf()));
}
if !cwd.is_dir() {
return Err(Error::Other(format!(
"working directory is not a directory: {}",
cwd.display()
)));
}
Ok(())
}