Skip to main content

sacp_tokio/
acp_agent.rs

1//! Utilities for connecting to ACP agents and proxies.
2//!
3//! This module provides [`AcpAgent`], a convenient wrapper around [`sacp::schema::McpServer`]
4//! that can be parsed from either a command string or JSON configuration.
5
6use std::path::PathBuf;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use sacp::{Client, Conductor, Role};
11use tokio::process::Child;
12use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
13
14/// Direction of a line being sent or received.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum LineDirection {
17    /// Line being sent to the agent (stdin)
18    Stdin,
19    /// Line being received from the agent (stdout)
20    Stdout,
21    /// Line being received from the agent (stderr)
22    Stderr,
23}
24
25/// A component representing an external ACP agent running in a separate process.
26///
27/// `AcpAgent` implements the [`sacp::ConnectTo`] trait for spawning and communicating with
28/// external agents or proxies via stdio. It handles process spawning, stream setup, and
29/// byte stream serialization automatically. This is the primary way to connect to agents
30/// that run as separate executables.
31///
32/// This is a wrapper around [`sacp::schema::McpServer`] that provides convenient parsing
33/// from command-line strings or JSON configurations.
34///
35/// # Use Cases
36///
37/// - **External agents**: Connect to agents written in any language (Python, Node.js, Rust, etc.)
38/// - **Proxy chains**: Spawn intermediate proxies that transform or intercept messages
39/// - **Conductor components**: Use with [`sacp_conductor::Conductor`] to build proxy chains
40/// - **Subprocess isolation**: Run potentially untrusted code in a separate process
41///
42/// # Examples
43///
44/// Parse from a command string:
45/// ```
46/// # use sacp_tokio::AcpAgent;
47/// # use std::str::FromStr;
48/// let agent = AcpAgent::from_str("python my_agent.py --verbose").unwrap();
49/// ```
50///
51/// Parse from JSON:
52/// ```
53/// # use sacp_tokio::AcpAgent;
54/// # use std::str::FromStr;
55/// let agent = AcpAgent::from_str(r#"{"type": "stdio", "name": "my-agent", "command": "python", "args": ["my_agent.py"], "env": []}"#).unwrap();
56/// ```
57///
58/// Use as a component to connect to an external agent:
59/// ```ignore
60/// use sacp::{Client, Builder};
61/// use sacp_tokio::AcpAgent;
62/// use std::str::FromStr;
63///
64/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
65/// let agent = AcpAgent::from_str("python my_agent.py")?;
66///
67/// // The agent process will be spawned automatically when connected
68/// Client.builder()
69///     .connect_to(agent)
70///     .await?
71///     .connect_with(|cx| async move {
72///         // Use the connection to communicate with the agent process
73///         Ok(())
74///     })
75///     .await?;
76/// # Ok(())
77/// # }
78/// ```
79///
80/// [`sacp_conductor::Conductor`]: https://docs.rs/sacp-conductor/latest/sacp_conductor/struct.Conductor.html
81pub struct AcpAgent {
82    server: sacp::schema::McpServer,
83    debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
84}
85
86impl std::fmt::Debug for AcpAgent {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("AcpAgent")
89            .field("server", &self.server)
90            .field(
91                "debug_callback",
92                &self.debug_callback.as_ref().map(|_| "..."),
93            )
94            .finish()
95    }
96}
97
98impl AcpAgent {
99    /// Create a new `AcpAgent` from an [`sacp::schema::McpServer`] configuration.
100    pub fn new(server: sacp::schema::McpServer) -> Self {
101        Self {
102            server,
103            debug_callback: None,
104        }
105    }
106
107    /// Create an ACP agent for Zed Industries' Claude Code tool.
108    /// Just runs `npx -y @zed-industries/claude-code-acp@latest`.
109    pub fn zed_claude_code() -> Self {
110        Self::from_str("npx -y @zed-industries/claude-code-acp@latest").expect("valid bash command")
111    }
112
113    /// Create an ACP agent for Zed Industries' Codex tool.
114    /// Just runs `npx -y @zed-industries/codex-acp@latest`.
115    pub fn zed_codex() -> Self {
116        Self::from_str("npx -y @zed-industries/codex-acp@latest").expect("valid bash command")
117    }
118
119    /// Create an ACP agent for Google's Gemini CLI.
120    /// Just runs `npx -y -- @google/gemini-cli@latest --experimental-acp`.
121    pub fn google_gemini() -> Self {
122        Self::from_str("npx -y -- @google/gemini-cli@latest --experimental-acp")
123            .expect("valid bash command")
124    }
125
126    /// Get the underlying [`sacp::schema::McpServer`] configuration.
127    pub fn server(&self) -> &sacp::schema::McpServer {
128        &self.server
129    }
130
131    /// Convert into the underlying [`sacp::schema::McpServer`] configuration.
132    pub fn into_server(self) -> sacp::schema::McpServer {
133        self.server
134    }
135
136    /// Add a debug callback that will be invoked for each line sent/received.
137    ///
138    /// The callback receives the line content and the direction (stdin/stdout/stderr).
139    /// This is useful for logging, debugging, or monitoring agent communication.
140    ///
141    /// # Example
142    ///
143    /// ```no_run
144    /// # use sacp_tokio::{AcpAgent, LineDirection};
145    /// # use std::str::FromStr;
146    /// let agent = AcpAgent::from_str("python my_agent.py")
147    ///     .unwrap()
148    ///     .with_debug(|line, direction| {
149    ///         eprintln!("{:?}: {}", direction, line);
150    ///     });
151    /// ```
152    pub fn with_debug<F>(mut self, callback: F) -> Self
153    where
154        F: Fn(&str, LineDirection) + Send + Sync + 'static,
155    {
156        self.debug_callback = Some(Arc::new(callback));
157        self
158    }
159
160    /// Spawn the process and get stdio streams.
161    /// Used internally by the Component trait implementation.
162    pub fn spawn_process(
163        &self,
164    ) -> Result<
165        (
166            tokio::process::ChildStdin,
167            tokio::process::ChildStdout,
168            tokio::process::ChildStderr,
169            Child,
170        ),
171        sacp::Error,
172    > {
173        match &self.server {
174            sacp::schema::McpServer::Stdio(stdio) => {
175                let mut cmd = tokio::process::Command::new(&stdio.command);
176                cmd.args(&stdio.args);
177                for env_var in &stdio.env {
178                    cmd.env(&env_var.name, &env_var.value);
179                }
180                cmd.stdin(std::process::Stdio::piped())
181                    .stdout(std::process::Stdio::piped())
182                    .stderr(std::process::Stdio::piped());
183
184                let mut child = cmd.spawn().map_err(sacp::Error::into_internal_error)?;
185
186                let child_stdin = child
187                    .stdin
188                    .take()
189                    .ok_or_else(|| sacp::util::internal_error("Failed to open stdin"))?;
190                let child_stdout = child
191                    .stdout
192                    .take()
193                    .ok_or_else(|| sacp::util::internal_error("Failed to open stdout"))?;
194                let child_stderr = child
195                    .stderr
196                    .take()
197                    .ok_or_else(|| sacp::util::internal_error("Failed to open stderr"))?;
198
199                Ok((child_stdin, child_stdout, child_stderr, child))
200            }
201            sacp::schema::McpServer::Http(_) => Err(sacp::util::internal_error(
202                "HTTP transport not yet supported by AcpAgent",
203            )),
204            sacp::schema::McpServer::Sse(_) => Err(sacp::util::internal_error(
205                "SSE transport not yet supported by AcpAgent",
206            )),
207            _ => Err(sacp::util::internal_error(
208                "Unknown MCP server transport type",
209            )),
210        }
211    }
212}
213
214/// A wrapper around Child that kills the process when dropped.
215struct ChildGuard(Child);
216
217impl ChildGuard {
218    async fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
219        self.0.wait().await
220    }
221}
222
223impl Drop for ChildGuard {
224    fn drop(&mut self) {
225        let _ = self.0.start_kill();
226    }
227}
228
229/// Waits for a child process and returns an error if it exits with non-zero status.
230///
231/// The error message includes any stderr output collected by the background task.
232/// When dropped, the child process is killed.
233async fn monitor_child(
234    child: Child,
235    stderr_rx: tokio::sync::oneshot::Receiver<String>,
236) -> Result<(), sacp::Error> {
237    let mut guard = ChildGuard(child);
238
239    // Wait for the child to exit
240    let status = guard
241        .wait()
242        .await
243        .map_err(|e| sacp::util::internal_error(format!("Failed to wait for process: {}", e)))?;
244
245    if status.success() {
246        Ok(())
247    } else {
248        // Get stderr content if available
249        let stderr = stderr_rx.await.unwrap_or_default();
250
251        let message = if stderr.is_empty() {
252            format!("Process exited with {}", status)
253        } else {
254            format!("Process exited with {}: {}", status, stderr)
255        };
256
257        Err(sacp::util::internal_error(message))
258    }
259}
260
261/// Roles that an ACP agent executable can potentially serve.
262pub trait AcpAgentCounterpartRole: Role {}
263
264impl AcpAgentCounterpartRole for Client {}
265
266impl AcpAgentCounterpartRole for Conductor {}
267
268impl<Counterpart: AcpAgentCounterpartRole> sacp::ConnectTo<Counterpart> for AcpAgent {
269    async fn connect_to(
270        self,
271        client: impl sacp::ConnectTo<Counterpart::Counterpart>,
272    ) -> Result<(), sacp::Error> {
273        use futures::AsyncBufReadExt;
274        use futures::AsyncWriteExt;
275        use futures::StreamExt;
276        use futures::io::BufReader;
277
278        let (child_stdin, child_stdout, child_stderr, child) = self.spawn_process()?;
279
280        // Create a channel to collect stderr for error reporting
281        let (stderr_tx, stderr_rx) = tokio::sync::oneshot::channel::<String>();
282
283        // Spawn a task to read stderr, optionally calling the debug callback
284        let debug_callback = self.debug_callback.clone();
285        tokio::spawn(async move {
286            let stderr_reader = BufReader::new(child_stderr.compat());
287            let mut stderr_lines = stderr_reader.lines();
288            let mut collected = String::new();
289            while let Some(line_result) = stderr_lines.next().await {
290                if let Ok(line) = line_result {
291                    // Call debug callback if present
292                    if let Some(ref callback) = debug_callback {
293                        callback(&line, LineDirection::Stderr);
294                    }
295                    // Always collect for error reporting
296                    if !collected.is_empty() {
297                        collected.push('\n');
298                    }
299                    collected.push_str(&line);
300                }
301            }
302            let _ = stderr_tx.send(collected);
303        });
304
305        // Create a future that monitors the child process for early exit
306        let child_monitor = monitor_child(child, stderr_rx);
307
308        // Convert stdio to line streams with optional debug inspection
309        let incoming_lines = if let Some(callback) = self.debug_callback.clone() {
310            Box::pin(
311                BufReader::new(child_stdout.compat())
312                    .lines()
313                    .inspect(move |result| {
314                        if let Ok(line) = result {
315                            callback(line, LineDirection::Stdout);
316                        }
317                    }),
318            )
319                as std::pin::Pin<Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>>
320        } else {
321            Box::pin(BufReader::new(child_stdout.compat()).lines())
322        };
323
324        // Create a sink that writes lines (with newlines) to stdin with optional debug logging
325        let outgoing_sink = if let Some(callback) = self.debug_callback.clone() {
326            Box::pin(futures::sink::unfold(
327                (child_stdin.compat_write(), callback),
328                async move |(mut writer, callback), line: String| {
329                    callback(&line, LineDirection::Stdin);
330                    let mut bytes = line.into_bytes();
331                    bytes.push(b'\n');
332                    writer.write_all(&bytes).await?;
333                    Ok::<_, std::io::Error>((writer, callback))
334                },
335            ))
336                as std::pin::Pin<Box<dyn futures::Sink<String, Error = std::io::Error> + Send>>
337        } else {
338            Box::pin(futures::sink::unfold(
339                child_stdin.compat_write(),
340                async move |mut writer, line: String| {
341                    let mut bytes = line.into_bytes();
342                    bytes.push(b'\n');
343                    writer.write_all(&bytes).await?;
344                    Ok::<_, std::io::Error>(writer)
345                },
346            ))
347        };
348
349        // Race the protocol against child process exit
350        // If the child exits early (e.g., with an error), we return that error
351        let protocol_future = sacp::ConnectTo::<Counterpart>::connect_to(
352            sacp::Lines::new(outgoing_sink, incoming_lines),
353            client,
354        );
355
356        tokio::select! {
357            result = protocol_future => result,
358            result = child_monitor => result,
359        }
360    }
361}
362
363impl AcpAgent {
364    /// Create an `AcpAgent` from an iterator of command-line arguments.
365    ///
366    /// Leading arguments of the form `NAME=value` are parsed as environment variables.
367    /// The first non-env argument is the command, and the rest are arguments.
368    ///
369    /// # Example
370    ///
371    /// ```
372    /// # use sacp_tokio::AcpAgent;
373    /// let agent = AcpAgent::from_args([
374    ///     "RUST_LOG=debug",
375    ///     "cargo",
376    ///     "run",
377    ///     "-p",
378    ///     "my-crate",
379    /// ]).unwrap();
380    /// ```
381    pub fn from_args<I, T>(args: I) -> Result<Self, sacp::Error>
382    where
383        I: IntoIterator<Item = T>,
384        T: ToString,
385    {
386        let args: Vec<String> = args.into_iter().map(|s| s.to_string()).collect();
387
388        if args.is_empty() {
389            return Err(sacp::util::internal_error("Arguments cannot be empty"));
390        }
391
392        let mut env = vec![];
393        let mut command_idx = 0;
394
395        // Parse leading FOO=bar arguments as environment variables
396        for (i, arg) in args.iter().enumerate() {
397            if let Some((name, value)) = parse_env_var(arg) {
398                env.push(sacp::schema::EnvVariable::new(name, value));
399                command_idx = i + 1;
400            } else {
401                break;
402            }
403        }
404
405        if command_idx >= args.len() {
406            return Err(sacp::util::internal_error(
407                "No command found (only environment variables provided)",
408            ));
409        }
410
411        let command = PathBuf::from(&args[command_idx]);
412        let cmd_args = args[command_idx + 1..].to_vec();
413
414        // Generate a name from the command
415        let name = command
416            .file_name()
417            .and_then(|n| n.to_str())
418            .unwrap_or("agent")
419            .to_string();
420
421        Ok(AcpAgent {
422            server: sacp::schema::McpServer::Stdio(
423                sacp::schema::McpServerStdio::new(name, command)
424                    .args(cmd_args)
425                    .env(env),
426            ),
427            debug_callback: None,
428        })
429    }
430}
431
432/// Parse a string as an environment variable assignment (NAME=value).
433/// Returns None if it doesn't match the pattern.
434fn parse_env_var(s: &str) -> Option<(String, String)> {
435    // Must contain '=' and the part before must be a valid env var name
436    let eq_pos = s.find('=')?;
437    if eq_pos == 0 {
438        return None;
439    }
440
441    let name = &s[..eq_pos];
442    let value = &s[eq_pos + 1..];
443
444    // Env var names must start with a letter or underscore, and contain only
445    // alphanumeric characters and underscores
446    let mut chars = name.chars();
447    let first = chars.next()?;
448    if !first.is_ascii_alphabetic() && first != '_' {
449        return None;
450    }
451    if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
452        return None;
453    }
454
455    Some((name.to_string(), value.to_string()))
456}
457
458impl FromStr for AcpAgent {
459    type Err = sacp::Error;
460
461    fn from_str(s: &str) -> Result<Self, Self::Err> {
462        let trimmed = s.trim();
463
464        // If it starts with '{', try to parse as JSON
465        if trimmed.starts_with('{') {
466            let server: sacp::schema::McpServer = serde_json::from_str(trimmed)
467                .map_err(|e| sacp::util::internal_error(format!("Failed to parse JSON: {}", e)))?;
468            return Ok(Self {
469                server,
470                debug_callback: None,
471            });
472        }
473
474        // Otherwise, parse as a command string
475        let parts = shell_words::split(trimmed)
476            .map_err(|e| sacp::util::internal_error(format!("Failed to parse command: {}", e)))?;
477
478        Self::from_args(parts)
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    #[test]
487    fn test_parse_simple_command() {
488        let agent = AcpAgent::from_str("python agent.py").unwrap();
489        match agent.server {
490            sacp::schema::McpServer::Stdio(stdio) => {
491                assert_eq!(stdio.name, "python");
492                assert_eq!(stdio.command, PathBuf::from("python"));
493                assert_eq!(stdio.args, vec!["agent.py"]);
494                assert!(stdio.env.is_empty());
495            }
496            _ => panic!("Expected Stdio variant"),
497        }
498    }
499
500    #[test]
501    fn test_parse_command_with_args() {
502        let agent = AcpAgent::from_str("node server.js --port 8080 --verbose").unwrap();
503        match agent.server {
504            sacp::schema::McpServer::Stdio(stdio) => {
505                assert_eq!(stdio.name, "node");
506                assert_eq!(stdio.command, PathBuf::from("node"));
507                assert_eq!(stdio.args, vec!["server.js", "--port", "8080", "--verbose"]);
508                assert!(stdio.env.is_empty());
509            }
510            _ => panic!("Expected Stdio variant"),
511        }
512    }
513
514    #[test]
515    fn test_parse_command_with_quotes() {
516        let agent = AcpAgent::from_str(r#"python "my agent.py" --name "Test Agent""#).unwrap();
517        match agent.server {
518            sacp::schema::McpServer::Stdio(stdio) => {
519                assert_eq!(stdio.name, "python");
520                assert_eq!(stdio.command, PathBuf::from("python"));
521                assert_eq!(stdio.args, vec!["my agent.py", "--name", "Test Agent"]);
522                assert!(stdio.env.is_empty());
523            }
524            _ => panic!("Expected Stdio variant"),
525        }
526    }
527
528    #[test]
529    fn test_parse_json_stdio() {
530        let json = r#"{
531            "type": "stdio",
532            "name": "my-agent",
533            "command": "/usr/bin/python",
534            "args": ["agent.py", "--verbose"],
535            "env": []
536        }"#;
537        let agent = AcpAgent::from_str(json).unwrap();
538        match agent.server {
539            sacp::schema::McpServer::Stdio(stdio) => {
540                assert_eq!(stdio.name, "my-agent");
541                assert_eq!(stdio.command, PathBuf::from("/usr/bin/python"));
542                assert_eq!(stdio.args, vec!["agent.py", "--verbose"]);
543                assert!(stdio.env.is_empty());
544            }
545            _ => panic!("Expected Stdio variant"),
546        }
547    }
548
549    #[test]
550    fn test_parse_json_http() {
551        let json = r#"{
552            "type": "http",
553            "name": "remote-agent",
554            "url": "https://example.com/agent",
555            "headers": []
556        }"#;
557        let agent = AcpAgent::from_str(json).unwrap();
558        match agent.server {
559            sacp::schema::McpServer::Http(http) => {
560                assert_eq!(http.name, "remote-agent");
561                assert_eq!(http.url, "https://example.com/agent");
562                assert!(http.headers.is_empty());
563            }
564            _ => panic!("Expected Http variant"),
565        }
566    }
567}