use std::process::Stdio;
use tokio::process::Command;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::client::{AcpClientError, SubagentConfig};
const DEFAULT_INHERIT: &[&str] = &["HOME", "PATH", "TMPDIR", "TERM", "LANG", "USER", "LOGNAME"];
pub(crate) struct SpawnedChild {
pub child: tokio::process::Child,
pub stdin: tokio::process::ChildStdin,
pub stdout: tokio::process::ChildStdout,
pub stderr: tokio::process::ChildStderr,
}
pub(crate) fn make_byte_streams(
stdin: tokio::process::ChildStdin,
stdout: tokio::process::ChildStdout,
) -> agent_client_protocol::ByteStreams<
tokio_util::compat::Compat<tokio::process::ChildStdin>,
tokio_util::compat::Compat<tokio::process::ChildStdout>,
> {
agent_client_protocol::ByteStreams::new(stdin.compat_write(), stdout.compat())
}
pub(crate) fn spawn_child(cfg: &SubagentConfig) -> Result<SpawnedChild, AcpClientError> {
let parts = shell_words::split(&cfg.command)
.map_err(|e| AcpClientError::InvalidConfig(format!("shell_words parse error: {e}")))?;
if parts.is_empty() {
return Err(AcpClientError::InvalidConfig(
"command string is empty".to_owned(),
));
}
let (program, args) = (&parts[0], &parts[1..]);
let mut cmd = Command::new(program);
cmd.args(args);
cmd.env_clear();
for key in DEFAULT_INHERIT {
if let Ok(val) = std::env::var(key) {
cmd.env(key, val);
}
}
for key in &cfg.inherit_env {
if let Ok(val) = std::env::var(key) {
cmd.env(key, val);
}
}
for (k, v) in &cfg.env {
if k.starts_with("ZEPH_") {
return Err(AcpClientError::InvalidConfig(format!(
"env key {k:?} starts with ZEPH_ and must not be forwarded to sub-agents"
)));
}
cmd.env(k, v);
}
cmd.current_dir(cfg.effective_process_cwd());
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().map_err(AcpClientError::Spawn)?;
let stdin = child.stdin.take().ok_or_else(|| {
AcpClientError::InvalidConfig("failed to open subprocess stdin".to_owned())
})?;
let stdout = child.stdout.take().ok_or_else(|| {
AcpClientError::InvalidConfig("failed to open subprocess stdout".to_owned())
})?;
let stderr = child.stderr.take().ok_or_else(|| {
AcpClientError::InvalidConfig("failed to open subprocess stderr".to_owned())
})?;
Ok(SpawnedChild {
child,
stdin,
stdout,
stderr,
})
}
pub(crate) fn spawn_stderr_drain(
stderr: tokio::process::ChildStderr,
session_hint: String,
) -> tokio::task::JoinHandle<()> {
use tokio::io::{AsyncBufReadExt, BufReader};
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::debug!(
target: "acp.client.stderr",
session = %session_hint,
"{line}"
);
}
})
}