use std::process::Stdio;
use std::sync::Arc;
use chrono::Utc;
use dashmap::DashMap;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use super::config::AgentTransport;
use super::registry::CodingAgentRegistry;
use super::status::AgentConnectionStatus;
struct ManagedAgent {
child: Child,
stdin: tokio::process::ChildStdin,
}
pub struct AgentProcessManager {
processes: DashMap<String, Arc<Mutex<ManagedAgent>>>,
registry: Arc<CodingAgentRegistry>,
}
impl AgentProcessManager {
pub fn new(registry: Arc<CodingAgentRegistry>) -> Self {
Self {
processes: DashMap::new(),
registry,
}
}
pub async fn spawn_agent(
&self,
agent_id: &str,
transport: &AgentTransport,
) -> anyhow::Result<()> {
let AgentTransport::Stdio { command, args, env } = transport else {
anyhow::bail!("spawn_agent only supports Stdio transport");
};
info!(
agent_id = %agent_id,
command = %command,
args = ?args,
"spawning coding agent process"
);
let mut cmd = Command::new(command);
cmd.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, val) in env {
cmd.env(key, val);
}
let mut child = cmd.spawn().map_err(|e| {
anyhow::anyhow!("failed to spawn agent '{}': {} (command: {})", agent_id, e, command)
})?;
let stdin = child.stdin.take()
.ok_or_else(|| anyhow::anyhow!("failed to capture stdin for agent '{}'", agent_id))?;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let agent_id_clone = agent_id.to_string();
if let Some(stdout) = stdout {
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!(agent_id = %agent_id_clone, stdout = %line, "agent stdout");
}
});
}
let agent_id_clone2 = agent_id.to_string();
if let Some(stderr) = stderr {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
warn!(agent_id = %agent_id_clone2, stderr = %line, "agent stderr");
}
});
}
let managed = ManagedAgent { child, stdin };
self.processes.insert(agent_id.to_string(), Arc::new(Mutex::new(managed)));
let _ = self.registry.update_status(agent_id, AgentConnectionStatus::Connected);
info!(agent_id = %agent_id, "coding agent process spawned successfully");
Ok(())
}
pub async fn stop_agent(&self, agent_id: &str) -> anyhow::Result<()> {
let Some((_, process)) = self.processes.remove(agent_id) else {
anyhow::bail!("agent '{}' is not running", agent_id);
};
let mut managed = process.lock().await;
let _ = managed.stdin.shutdown().await;
tokio::select! {
result = managed.child.wait() => {
match result {
Ok(status) => info!(agent_id = %agent_id, exit_code = ?status.code(), "agent process exited"),
Err(e) => warn!(agent_id = %agent_id, error = %e, "error waiting for agent exit"),
}
}
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
warn!(agent_id = %agent_id, "agent did not exit gracefully, killing");
let _ = managed.child.kill().await;
}
}
let _ = self.registry.update_status(
agent_id,
AgentConnectionStatus::Disconnected { since: Utc::now() },
);
Ok(())
}
pub fn is_running(&self, agent_id: &str) -> bool {
self.processes.contains_key(agent_id)
}
pub fn running_agents(&self) -> Vec<String> {
self.processes.iter().map(|e| e.key().clone()).collect()
}
pub async fn send_message(&self, agent_id: &str, message: &str) -> anyhow::Result<()> {
let process = self.processes.get(agent_id)
.ok_or_else(|| anyhow::anyhow!("agent '{}' is not running", agent_id))?;
let mut managed = process.lock().await;
managed.stdin.write_all(message.as_bytes()).await?;
managed.stdin.write_all(b"\n").await?;
managed.stdin.flush().await?;
Ok(())
}
pub async fn spawn_all(&self, agents: &[(String, AgentTransport)]) {
for (agent_id, transport) in agents {
if let Err(e) = self.spawn_agent(agent_id, transport).await {
error!(agent_id = %agent_id, error = %e, "failed to spawn agent on startup");
let _ = self.registry.update_status(
agent_id,
AgentConnectionStatus::Error {
message: format!("Failed to spawn: {}", e),
since: Utc::now(),
},
);
}
}
}
}
impl Drop for AgentProcessManager {
fn drop(&mut self) {
for entry in self.processes.iter() {
let agent_id = entry.key().clone();
warn!(agent_id = %agent_id, "dropping agent process manager, process may be orphaned");
}
}
}