use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
#[derive(Debug)]
pub enum AgentEvent {
OutputLine(usize, String),
Done(usize, i32),
}
pub struct AgentRunner {
event_rx: mpsc::UnboundedReceiver<AgentEvent>,
kill_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl AgentRunner {
pub fn spawn(run_id: usize, command: &str) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<()>();
let cmd = command.to_string();
tokio::spawn(async move {
let mut child = match Command::new("sh")
.arg("-c")
.arg(&cmd)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(AgentEvent::OutputLine(
run_id,
format!("Failed to spawn: {e}"),
));
let _ = event_tx.send(AgentEvent::Done(run_id, 1));
return;
}
};
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let tx_out = event_tx.clone();
let stdout_handle = tokio::spawn(async move {
if let Some(stdout) = stdout {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if tx_out.send(AgentEvent::OutputLine(run_id, line)).is_err() {
break;
}
}
}
});
let tx_err = event_tx.clone();
let stderr_handle = tokio::spawn(async move {
if let Some(stderr) = stderr {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if tx_err.send(AgentEvent::OutputLine(run_id, line)).is_err() {
break;
}
}
}
});
tokio::select! {
status = child.wait() => {
let _ = stdout_handle.await;
let _ = stderr_handle.await;
let code = status.map(|s| s.code().unwrap_or(1)).unwrap_or(1);
let _ = event_tx.send(AgentEvent::Done(run_id, code));
}
_ = kill_rx => {
let _ = child.kill().await;
let _ = event_tx.send(AgentEvent::OutputLine(run_id, "[Process killed]".to_string()));
let _ = event_tx.send(AgentEvent::Done(run_id, 137));
}
}
});
Self {
event_rx,
kill_tx: Some(kill_tx),
}
}
pub fn try_recv(&mut self) -> Option<AgentEvent> {
self.event_rx.try_recv().ok()
}
pub fn kill(&mut self) {
if let Some(tx) = self.kill_tx.take() {
let _ = tx.send(());
}
}
}