acp_utils/client/
tokio_agent.rs1use agent_client_protocol::schema::{McpServer, McpServerStdio};
8use agent_client_protocol::util::internal_error;
9use agent_client_protocol::{AcpAgent, ByteStreams, ConnectTo, Error, Role, util};
10use std::process::Stdio;
11use std::str::FromStr;
12use tokio::io::{AsyncBufReadExt, BufReader};
13use tokio::process::Command;
14use tokio::sync::oneshot;
15use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
16
17pub struct TokioAcpAgent {
18 stdio: McpServerStdio,
19}
20
21impl TokioAcpAgent {
22 pub fn stdio(&self) -> &McpServerStdio {
23 &self.stdio
24 }
25}
26
27impl<T: Role> ConnectTo<T> for TokioAcpAgent {
28 async fn connect_to(self, client: impl ConnectTo<T::Counterpart>) -> Result<(), Error> {
29 connect_stdio::<T>(self.stdio, client).await
30 }
31}
32
33impl FromStr for TokioAcpAgent {
34 type Err = Error;
35
36 fn from_str(s: &str) -> Result<Self, Self::Err> {
37 match AcpAgent::from_str(s)?.into_server() {
38 McpServer::Stdio(stdio) => Ok(Self { stdio }),
39 _ => Err(util::internal_error("unsupported ACP agent transport")),
40 }
41 }
42}
43
44async fn connect_stdio<T: Role>(server: McpServerStdio, client: impl ConnectTo<T::Counterpart>) -> Result<(), Error> {
45 let (stdin, stdout, stderr, mut child) = {
46 let mut cmd = Command::new(&server.command);
47 cmd.args(&server.args);
48 for env_var in &server.env {
49 cmd.env(&env_var.name, &env_var.value);
50 }
51
52 let mut child = cmd
53 .stdin(Stdio::piped())
54 .stdout(Stdio::piped())
55 .stderr(Stdio::piped())
56 .kill_on_drop(true)
57 .spawn()
58 .map_err(Error::into_internal_error)?;
59
60 let stdin = child.stdin.take().ok_or_else(|| internal_error("missing child stdin"))?;
61 let stdout = child.stdout.take().ok_or_else(|| internal_error("missing child stdout"))?;
62 let stderr = child.stderr.take().ok_or_else(|| internal_error("missing child stderr"))?;
63 (stdin, stdout, stderr, child)
64 };
65
66 let (stderr_tx, stderr_rx) = oneshot::channel::<String>();
67 tokio::spawn(async move {
68 let mut lines = BufReader::new(stderr).lines();
69 let mut buf = String::new();
70 while let Ok(Some(line)) = lines.next_line().await {
71 if !buf.is_empty() {
72 buf.push('\n');
73 }
74 buf.push_str(&line);
75 }
76 let _ = stderr_tx.send(buf);
77 });
78
79 let child_fut = async move {
80 match child.wait().await {
81 Ok(s) if s.success() => Ok(()),
82 Ok(s) => {
83 let stderr = stderr_rx.await.unwrap_or_default();
84 Err(util::internal_error(format!("agent process exited ({s}): {stderr}")))
85 }
86 Err(e) => Err(Error::into_internal_error(e)),
87 }
88 };
89
90 let bytes = ByteStreams::new(stdin.compat_write(), stdout.compat());
91 tokio::select! {
92 result = ConnectTo::<T>::connect_to(bytes, client) => result,
93 result = child_fut => result,
94 }
95}