Skip to main content

acp_utils/client/
tokio_agent.rs

1//! Tokio-native parent-side ACP transport.
2//!
3//! `agent_client_protocol::AcpAgent` spawns the child via smol's
4//! `async_process::Command`, which wraps stdio in `blocking::Unblock`. Inside a
5//! tokio runtime that causes a busy loop. This avoids the issue by spawning stdio agents with `tokio::process::Command`
6//!
7use agent_client_protocol::schema::{McpServer, McpServerStdio};
8use agent_client_protocol::util::internal_error;
9use agent_client_protocol::{AcpAgent, ByteStreams, ConnectTo, Error, Role, util};
10use std::process::Stdio;
11use std::str::FromStr;
12use tokio::io::{AsyncBufReadExt, BufReader};
13use tokio::process::Command;
14use tokio::sync::oneshot;
15use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
16
17pub struct TokioAcpAgent {
18    stdio: McpServerStdio,
19}
20
21impl TokioAcpAgent {
22    pub fn stdio(&self) -> &McpServerStdio {
23        &self.stdio
24    }
25}
26
27impl<T: Role> ConnectTo<T> for TokioAcpAgent {
28    async fn connect_to(self, client: impl ConnectTo<T::Counterpart>) -> Result<(), Error> {
29        connect_stdio::<T>(self.stdio, client).await
30    }
31}
32
33impl FromStr for TokioAcpAgent {
34    type Err = Error;
35
36    fn from_str(s: &str) -> Result<Self, Self::Err> {
37        match AcpAgent::from_str(s)?.into_server() {
38            McpServer::Stdio(stdio) => Ok(Self { stdio }),
39            _ => Err(util::internal_error("unsupported ACP agent transport")),
40        }
41    }
42}
43
44async fn connect_stdio<T: Role>(server: McpServerStdio, client: impl ConnectTo<T::Counterpart>) -> Result<(), Error> {
45    let (stdin, stdout, stderr, mut child) = {
46        let mut cmd = Command::new(&server.command);
47        cmd.args(&server.args);
48        for env_var in &server.env {
49            cmd.env(&env_var.name, &env_var.value);
50        }
51
52        let mut child = cmd
53            .stdin(Stdio::piped())
54            .stdout(Stdio::piped())
55            .stderr(Stdio::piped())
56            .kill_on_drop(true)
57            .spawn()
58            .map_err(Error::into_internal_error)?;
59
60        let stdin = child.stdin.take().ok_or_else(|| internal_error("missing child stdin"))?;
61        let stdout = child.stdout.take().ok_or_else(|| internal_error("missing child stdout"))?;
62        let stderr = child.stderr.take().ok_or_else(|| internal_error("missing child stderr"))?;
63        (stdin, stdout, stderr, child)
64    };
65
66    let (stderr_tx, stderr_rx) = oneshot::channel::<String>();
67    tokio::spawn(async move {
68        let mut lines = BufReader::new(stderr).lines();
69        let mut buf = String::new();
70        while let Ok(Some(line)) = lines.next_line().await {
71            if !buf.is_empty() {
72                buf.push('\n');
73            }
74            buf.push_str(&line);
75        }
76        let _ = stderr_tx.send(buf);
77    });
78
79    let child_fut = async move {
80        match child.wait().await {
81            Ok(s) if s.success() => Ok(()),
82            Ok(s) => {
83                let stderr = stderr_rx.await.unwrap_or_default();
84                Err(util::internal_error(format!("agent process exited ({s}): {stderr}")))
85            }
86            Err(e) => Err(Error::into_internal_error(e)),
87        }
88    };
89
90    let bytes = ByteStreams::new(stdin.compat_write(), stdout.compat());
91    tokio::select! {
92        result = ConnectTo::<T>::connect_to(bytes, client) => result,
93        result = child_fut => result,
94    }
95}