Skip to main content

agent_client_protocol/
acp_agent.rs

1//! Utilities for connecting to ACP agents and proxies.
2//!
3//! This module provides [`AcpAgent`], a convenient wrapper around [`crate::schema::McpServer`]
4//! that can be parsed from either a command string or JSON configuration.
5
6use std::path::PathBuf;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use async_process::Child;
11use std::pin::pin;
12
13use crate::{Client, Conductor, Role};
14
15/// Direction of a line being sent or received.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum LineDirection {
18    /// Line being sent to the agent (stdin)
19    Stdin,
20    /// Line being received from the agent (stdout)
21    Stdout,
22    /// Line being received from the agent (stderr)
23    Stderr,
24}
25
26/// A component representing an external ACP agent running in a separate process.
27///
28/// `AcpAgent` implements the [`ConnectTo`](`crate::ConnectTo`) trait for spawning and communicating with
29/// external agents or proxies via stdio. It handles process spawning, stream setup, and
30/// byte stream serialization automatically. This is the primary way to connect to agents
31/// that run as separate executables.
32///
33/// This is a wrapper around [`crate::schema::McpServer`] that provides convenient parsing
34/// from command-line strings or JSON configurations.
35///
36/// # Use Cases
37///
38/// - **External agents**: Connect to agents written in any language (Python, Node.js, Rust, etc.)
39/// - **Proxy chains**: Spawn intermediate proxies that transform or intercept messages
40/// - **Conductor components**: Use with the conductor to build proxy chains
41/// - **Subprocess isolation**: Run potentially untrusted code in a separate process
42///
43/// # Examples
44///
45/// Parse from a command string:
46/// ```
47/// # use agent_client_protocol::AcpAgent;
48/// # use std::str::FromStr;
49/// let agent = AcpAgent::from_str("python my_agent.py --verbose").unwrap();
50/// ```
51///
52/// Parse from JSON:
53/// ```
54/// # use agent_client_protocol::AcpAgent;
55/// # use std::str::FromStr;
56/// let agent = AcpAgent::from_str(r#"{"type": "stdio", "name": "my-agent", "command": "python", "args": ["my_agent.py"], "env": []}"#).unwrap();
57/// ```
58pub struct AcpAgent {
59    server: crate::schema::McpServer,
60    debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
61}
62
63impl std::fmt::Debug for AcpAgent {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("AcpAgent")
66            .field("server", &self.server)
67            .field(
68                "debug_callback",
69                &self.debug_callback.as_ref().map(|_| "..."),
70            )
71            .finish()
72    }
73}
74
75impl AcpAgent {
76    /// Create a new `AcpAgent` from an [`crate::schema::McpServer`] configuration.
77    #[must_use]
78    pub fn new(server: crate::schema::McpServer) -> Self {
79        Self {
80            server,
81            debug_callback: None,
82        }
83    }
84
85    /// Create an ACP agent for Zed Industries' Claude Code tool.
86    /// Just runs `npx -y @zed-industries/claude-code-acp@latest`.
87    #[must_use]
88    pub fn zed_claude_code() -> Self {
89        Self::from_str("npx -y @zed-industries/claude-code-acp@latest").expect("valid bash command")
90    }
91
92    /// Create an ACP agent for Zed Industries' Codex tool.
93    /// Just runs `npx -y @zed-industries/codex-acp@latest`.
94    #[must_use]
95    pub fn zed_codex() -> Self {
96        Self::from_str("npx -y @zed-industries/codex-acp@latest").expect("valid bash command")
97    }
98
99    /// Create an ACP agent for Google's Gemini CLI.
100    /// Just runs `npx -y -- @google/gemini-cli@latest --experimental-acp`.
101    #[must_use]
102    pub fn google_gemini() -> Self {
103        Self::from_str("npx -y -- @google/gemini-cli@latest --experimental-acp")
104            .expect("valid bash command")
105    }
106
107    /// Get the underlying [`crate::schema::McpServer`] configuration.
108    #[must_use]
109    pub fn server(&self) -> &crate::schema::McpServer {
110        &self.server
111    }
112
113    /// Convert into the underlying [`crate::schema::McpServer`] configuration.
114    #[must_use]
115    pub fn into_server(self) -> crate::schema::McpServer {
116        self.server
117    }
118
119    /// Add a debug callback that will be invoked for each line sent/received.
120    ///
121    /// The callback receives the line content and the direction (stdin/stdout/stderr).
122    /// This is useful for logging, debugging, or monitoring agent communication.
123    ///
124    /// # Example
125    ///
126    /// ```no_run
127    /// # use agent_client_protocol::{AcpAgent, LineDirection};
128    /// # use std::str::FromStr;
129    /// let agent = AcpAgent::from_str("python my_agent.py")
130    ///     .unwrap()
131    ///     .with_debug(|line, direction| {
132    ///         eprintln!("{:?}: {}", direction, line);
133    ///     });
134    /// ```
135    #[must_use]
136    pub fn with_debug<F>(mut self, callback: F) -> Self
137    where
138        F: Fn(&str, LineDirection) + Send + Sync + 'static,
139    {
140        self.debug_callback = Some(Arc::new(callback));
141        self
142    }
143
144    /// Spawn the process and get stdio streams.
145    /// Used internally by the `ConnectTo` trait implementation.
146    pub fn spawn_process(
147        &self,
148    ) -> Result<
149        (
150            async_process::ChildStdin,
151            async_process::ChildStdout,
152            async_process::ChildStderr,
153            Child,
154        ),
155        crate::Error,
156    > {
157        match &self.server {
158            crate::schema::McpServer::Stdio(stdio) => {
159                let mut cmd = async_process::Command::new(&stdio.command);
160                cmd.args(&stdio.args);
161                for env_var in &stdio.env {
162                    cmd.env(&env_var.name, &env_var.value);
163                }
164                cmd.stdin(std::process::Stdio::piped())
165                    .stdout(std::process::Stdio::piped())
166                    .stderr(std::process::Stdio::piped());
167
168                let mut child = cmd.spawn().map_err(crate::Error::into_internal_error)?;
169
170                let child_stdin = child
171                    .stdin
172                    .take()
173                    .ok_or_else(|| crate::util::internal_error("Failed to open stdin"))?;
174                let child_stdout = child
175                    .stdout
176                    .take()
177                    .ok_or_else(|| crate::util::internal_error("Failed to open stdout"))?;
178                let child_stderr = child
179                    .stderr
180                    .take()
181                    .ok_or_else(|| crate::util::internal_error("Failed to open stderr"))?;
182
183                Ok((child_stdin, child_stdout, child_stderr, child))
184            }
185            crate::schema::McpServer::Http(_) => Err(crate::util::internal_error(
186                "HTTP transport not yet supported by AcpAgent",
187            )),
188            crate::schema::McpServer::Sse(_) => Err(crate::util::internal_error(
189                "SSE transport not yet supported by AcpAgent",
190            )),
191            _ => Err(crate::util::internal_error(
192                "Unknown MCP server transport type",
193            )),
194        }
195    }
196}
197
198/// A wrapper around Child that kills the process when dropped.
199struct ChildGuard(Child);
200
201impl ChildGuard {
202    async fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
203        self.0.status().await
204    }
205}
206
207impl Drop for ChildGuard {
208    fn drop(&mut self) {
209        drop(self.0.kill());
210    }
211}
212
213/// Waits for a child process and returns an error if it exits with non-zero status.
214///
215/// The error message includes any stderr output collected concurrently.
216/// When dropped, the child process is killed.
217async fn monitor_child(
218    child: Child,
219    stderr_rx: futures::channel::oneshot::Receiver<String>,
220) -> Result<(), crate::Error> {
221    let mut guard = ChildGuard(child);
222
223    let status = guard
224        .wait()
225        .await
226        .map_err(|e| crate::util::internal_error(format!("Failed to wait for process: {e}")))?;
227
228    if status.success() {
229        Ok(())
230    } else {
231        let stderr = stderr_rx.await.unwrap_or_default();
232
233        let message = if stderr.is_empty() {
234            format!("Process exited with {status}")
235        } else {
236            format!("Process exited with {status}: {stderr}")
237        };
238
239        Err(crate::util::internal_error(message))
240    }
241}
242
243/// Roles that an ACP agent executable can potentially serve.
244pub trait AcpAgentCounterpartRole: Role {}
245
246impl AcpAgentCounterpartRole for Client {}
247
248impl AcpAgentCounterpartRole for Conductor {}
249
250impl<Counterpart: AcpAgentCounterpartRole> crate::ConnectTo<Counterpart> for AcpAgent {
251    async fn connect_to(
252        self,
253        client: impl crate::ConnectTo<Counterpart::Counterpart>,
254    ) -> Result<(), crate::Error> {
255        use futures::io::BufReader;
256        use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
257
258        let (child_stdin, child_stdout, child_stderr, child) = self.spawn_process()?;
259
260        // Create a channel to collect stderr for error reporting
261        let (stderr_tx, stderr_rx) = futures::channel::oneshot::channel::<String>();
262
263        // Read stderr concurrently, optionally calling the debug callback.
264        // We use futures::future::select below to race this against the protocol,
265        // so this runs as part of the same task — no tokio::spawn needed.
266        let debug_callback = self.debug_callback.clone();
267        let stderr_future = async move {
268            let stderr_reader = BufReader::new(child_stderr);
269            let mut stderr_lines = stderr_reader.lines();
270            let mut collected = String::new();
271            while let Some(line_result) = stderr_lines.next().await {
272                if let Ok(line) = line_result {
273                    if let Some(ref callback) = debug_callback {
274                        callback(&line, LineDirection::Stderr);
275                    }
276                    if !collected.is_empty() {
277                        collected.push('\n');
278                    }
279                    collected.push_str(&line);
280                }
281            }
282            drop(stderr_tx.send(collected));
283        };
284
285        // Create a future that monitors the child process for early exit
286        let child_monitor = monitor_child(child, stderr_rx);
287
288        // Convert stdio to line streams with optional debug inspection
289        let incoming_lines: std::pin::Pin<
290            Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>,
291        > = if let Some(callback) = self.debug_callback.clone() {
292            Box::pin(BufReader::new(child_stdout).lines().inspect(move |result| {
293                if let Ok(line) = result {
294                    callback(line, LineDirection::Stdout);
295                }
296            }))
297        } else {
298            Box::pin(BufReader::new(child_stdout).lines())
299        };
300
301        // Create a sink that writes lines (with newlines) to stdin with optional debug logging
302        let outgoing_sink: std::pin::Pin<
303            Box<dyn futures::Sink<String, Error = std::io::Error> + Send>,
304        > = if let Some(callback) = self.debug_callback.clone() {
305            Box::pin(futures::sink::unfold(
306                (child_stdin, callback),
307                async move |(mut writer, callback), line: String| {
308                    callback(&line, LineDirection::Stdin);
309                    let mut bytes = line.into_bytes();
310                    bytes.push(b'\n');
311                    writer.write_all(&bytes).await?;
312                    Ok::<_, std::io::Error>((writer, callback))
313                },
314            ))
315        } else {
316            Box::pin(futures::sink::unfold(
317                child_stdin,
318                async move |mut writer, line: String| {
319                    let mut bytes = line.into_bytes();
320                    bytes.push(b'\n');
321                    writer.write_all(&bytes).await?;
322                    Ok::<_, std::io::Error>(writer)
323                },
324            ))
325        };
326
327        // Race the protocol against child process exit.
328        // Also run stderr collection concurrently.
329        let protocol_future = crate::ConnectTo::<Counterpart>::connect_to(
330            crate::Lines::new(outgoing_sink, incoming_lines),
331            client,
332        );
333
334        let stderr_future = pin!(stderr_future);
335        let protocol_future = pin!(protocol_future);
336        let child_monitor = pin!(child_monitor);
337
338        // Run stderr reader alongside the main race
339        let main_race = async {
340            match futures::future::select(protocol_future, child_monitor).await {
341                futures::future::Either::Left((result, _))
342                | futures::future::Either::Right((result, _)) => result,
343            }
344        };
345
346        // Run stderr collection concurrently with the main logic.
347        // When main_race completes, we don't need stderr anymore.
348        let main_race = pin!(main_race);
349        match futures::future::select(main_race, stderr_future).await {
350            futures::future::Either::Left((result, _)) => result,
351            futures::future::Either::Right(((), protocol)) => protocol.await,
352        }
353    }
354}
355
356impl AcpAgent {
357    /// Create an `AcpAgent` from an iterator of command-line arguments.
358    ///
359    /// Leading arguments of the form `NAME=value` are parsed as environment variables.
360    /// The first non-env argument is the command, and the rest are arguments.
361    ///
362    /// # Example
363    ///
364    /// ```
365    /// # use agent_client_protocol::AcpAgent;
366    /// let agent = AcpAgent::from_args([
367    ///     "RUST_LOG=debug",
368    ///     "cargo",
369    ///     "run",
370    ///     "-p",
371    ///     "my-crate",
372    /// ]).unwrap();
373    /// ```
374    pub fn from_args<I, T>(args: I) -> Result<Self, crate::Error>
375    where
376        I: IntoIterator<Item = T>,
377        T: ToString,
378    {
379        let args: Vec<String> = args.into_iter().map(|s| s.to_string()).collect();
380
381        if args.is_empty() {
382            return Err(crate::util::internal_error("Arguments cannot be empty"));
383        }
384
385        let mut env = vec![];
386        let mut command_idx = 0;
387
388        for (i, arg) in args.iter().enumerate() {
389            if let Some((name, value)) = parse_env_var(arg) {
390                env.push(crate::schema::EnvVariable::new(name, value));
391                command_idx = i + 1;
392            } else {
393                break;
394            }
395        }
396
397        if command_idx >= args.len() {
398            return Err(crate::util::internal_error(
399                "No command found (only environment variables provided)",
400            ));
401        }
402
403        let command = PathBuf::from(&args[command_idx]);
404        let cmd_args = args[command_idx + 1..].to_vec();
405
406        let name = command
407            .file_name()
408            .and_then(|n| n.to_str())
409            .unwrap_or("agent")
410            .to_string();
411
412        Ok(AcpAgent {
413            server: crate::schema::McpServer::Stdio(
414                crate::schema::McpServerStdio::new(name, command)
415                    .args(cmd_args)
416                    .env(env),
417            ),
418            debug_callback: None,
419        })
420    }
421}
422
423/// Parse a string as an environment variable assignment (NAME=value).
424fn parse_env_var(s: &str) -> Option<(String, String)> {
425    let eq_pos = s.find('=')?;
426    if eq_pos == 0 {
427        return None;
428    }
429
430    let name = &s[..eq_pos];
431    let value = &s[eq_pos + 1..];
432
433    let mut chars = name.chars();
434    let first = chars.next()?;
435    if !first.is_ascii_alphabetic() && first != '_' {
436        return None;
437    }
438    if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
439        return None;
440    }
441
442    Some((name.to_string(), value.to_string()))
443}
444
445impl FromStr for AcpAgent {
446    type Err = crate::Error;
447
448    fn from_str(s: &str) -> Result<Self, Self::Err> {
449        let trimmed = s.trim();
450
451        if trimmed.starts_with('{') {
452            let server: crate::schema::McpServer = serde_json::from_str(trimmed)
453                .map_err(|e| crate::util::internal_error(format!("Failed to parse JSON: {e}")))?;
454            return Ok(Self {
455                server,
456                debug_callback: None,
457            });
458        }
459
460        let parts = shell_words::split(trimmed)
461            .map_err(|e| crate::util::internal_error(format!("Failed to parse command: {e}")))?;
462
463        Self::from_args(parts)
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_parse_simple_command() {
473        let agent = AcpAgent::from_str("python agent.py").unwrap();
474        match agent.server {
475            crate::schema::McpServer::Stdio(stdio) => {
476                assert_eq!(stdio.name, "python");
477                assert_eq!(stdio.command, PathBuf::from("python"));
478                assert_eq!(stdio.args, vec!["agent.py"]);
479                assert!(stdio.env.is_empty());
480            }
481            _ => panic!("Expected Stdio variant"),
482        }
483    }
484
485    #[test]
486    fn test_parse_command_with_args() {
487        let agent = AcpAgent::from_str("node server.js --port 8080 --verbose").unwrap();
488        match agent.server {
489            crate::schema::McpServer::Stdio(stdio) => {
490                assert_eq!(stdio.name, "node");
491                assert_eq!(stdio.command, PathBuf::from("node"));
492                assert_eq!(stdio.args, vec!["server.js", "--port", "8080", "--verbose"]);
493                assert!(stdio.env.is_empty());
494            }
495            _ => panic!("Expected Stdio variant"),
496        }
497    }
498
499    #[test]
500    fn test_parse_command_with_quotes() {
501        let agent = AcpAgent::from_str(r#"python "my agent.py" --name "Test Agent""#).unwrap();
502        match agent.server {
503            crate::schema::McpServer::Stdio(stdio) => {
504                assert_eq!(stdio.name, "python");
505                assert_eq!(stdio.command, PathBuf::from("python"));
506                assert_eq!(stdio.args, vec!["my agent.py", "--name", "Test Agent"]);
507                assert!(stdio.env.is_empty());
508            }
509            _ => panic!("Expected Stdio variant"),
510        }
511    }
512
513    #[test]
514    fn test_parse_json_stdio() {
515        let json = r#"{
516            "type": "stdio",
517            "name": "my-agent",
518            "command": "/usr/bin/python",
519            "args": ["agent.py", "--verbose"],
520            "env": []
521        }"#;
522        let agent = AcpAgent::from_str(json).unwrap();
523        match agent.server {
524            crate::schema::McpServer::Stdio(stdio) => {
525                assert_eq!(stdio.name, "my-agent");
526                assert_eq!(stdio.command, PathBuf::from("/usr/bin/python"));
527                assert_eq!(stdio.args, vec!["agent.py", "--verbose"]);
528                assert!(stdio.env.is_empty());
529            }
530            _ => panic!("Expected Stdio variant"),
531        }
532    }
533
534    #[test]
535    fn test_parse_json_http() {
536        let json = r#"{
537            "type": "http",
538            "name": "remote-agent",
539            "url": "https://example.com/agent",
540            "headers": []
541        }"#;
542        let agent = AcpAgent::from_str(json).unwrap();
543        match agent.server {
544            crate::schema::McpServer::Http(http) => {
545                assert_eq!(http.name, "remote-agent");
546                assert_eq!(http.url, "https://example.com/agent");
547                assert!(http.headers.is_empty());
548            }
549            _ => panic!("Expected Http variant"),
550        }
551    }
552}