1use std::path::PathBuf;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use async_process::Child;
11use std::pin::pin;
12
13use crate::{Client, Conductor, Role};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum LineDirection {
18 Stdin,
20 Stdout,
22 Stderr,
24}
25
26pub struct AcpAgent {
59 server: crate::schema::McpServer,
60 debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
61}
62
63impl std::fmt::Debug for AcpAgent {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("AcpAgent")
66 .field("server", &self.server)
67 .field(
68 "debug_callback",
69 &self.debug_callback.as_ref().map(|_| "..."),
70 )
71 .finish()
72 }
73}
74
75impl AcpAgent {
76 #[must_use]
78 pub fn new(server: crate::schema::McpServer) -> Self {
79 Self {
80 server,
81 debug_callback: None,
82 }
83 }
84
85 #[must_use]
88 pub fn zed_claude_code() -> Self {
89 Self::from_str("npx -y @zed-industries/claude-code-acp@latest").expect("valid bash command")
90 }
91
92 #[must_use]
95 pub fn zed_codex() -> Self {
96 Self::from_str("npx -y @zed-industries/codex-acp@latest").expect("valid bash command")
97 }
98
99 #[must_use]
102 pub fn google_gemini() -> Self {
103 Self::from_str("npx -y -- @google/gemini-cli@latest --experimental-acp")
104 .expect("valid bash command")
105 }
106
107 #[must_use]
109 pub fn server(&self) -> &crate::schema::McpServer {
110 &self.server
111 }
112
113 #[must_use]
115 pub fn into_server(self) -> crate::schema::McpServer {
116 self.server
117 }
118
119 #[must_use]
136 pub fn with_debug<F>(mut self, callback: F) -> Self
137 where
138 F: Fn(&str, LineDirection) + Send + Sync + 'static,
139 {
140 self.debug_callback = Some(Arc::new(callback));
141 self
142 }
143
144 pub fn spawn_process(
147 &self,
148 ) -> Result<
149 (
150 async_process::ChildStdin,
151 async_process::ChildStdout,
152 async_process::ChildStderr,
153 Child,
154 ),
155 crate::Error,
156 > {
157 match &self.server {
158 crate::schema::McpServer::Stdio(stdio) => {
159 let mut cmd = async_process::Command::new(&stdio.command);
160 cmd.args(&stdio.args);
161 for env_var in &stdio.env {
162 cmd.env(&env_var.name, &env_var.value);
163 }
164 cmd.stdin(std::process::Stdio::piped())
165 .stdout(std::process::Stdio::piped())
166 .stderr(std::process::Stdio::piped());
167
168 let mut child = cmd.spawn().map_err(crate::Error::into_internal_error)?;
169
170 let child_stdin = child
171 .stdin
172 .take()
173 .ok_or_else(|| crate::util::internal_error("Failed to open stdin"))?;
174 let child_stdout = child
175 .stdout
176 .take()
177 .ok_or_else(|| crate::util::internal_error("Failed to open stdout"))?;
178 let child_stderr = child
179 .stderr
180 .take()
181 .ok_or_else(|| crate::util::internal_error("Failed to open stderr"))?;
182
183 Ok((child_stdin, child_stdout, child_stderr, child))
184 }
185 crate::schema::McpServer::Http(_) => Err(crate::util::internal_error(
186 "HTTP transport not yet supported by AcpAgent",
187 )),
188 crate::schema::McpServer::Sse(_) => Err(crate::util::internal_error(
189 "SSE transport not yet supported by AcpAgent",
190 )),
191 _ => Err(crate::util::internal_error(
192 "Unknown MCP server transport type",
193 )),
194 }
195 }
196}
197
198struct ChildGuard(Child);
200
201impl ChildGuard {
202 async fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
203 self.0.status().await
204 }
205}
206
207impl Drop for ChildGuard {
208 fn drop(&mut self) {
209 drop(self.0.kill());
210 }
211}
212
213async fn monitor_child(
218 child: Child,
219 stderr_rx: futures::channel::oneshot::Receiver<String>,
220) -> Result<(), crate::Error> {
221 let mut guard = ChildGuard(child);
222
223 let status = guard
224 .wait()
225 .await
226 .map_err(|e| crate::util::internal_error(format!("Failed to wait for process: {e}")))?;
227
228 if status.success() {
229 Ok(())
230 } else {
231 let stderr = stderr_rx.await.unwrap_or_default();
232
233 let message = if stderr.is_empty() {
234 format!("Process exited with {status}")
235 } else {
236 format!("Process exited with {status}: {stderr}")
237 };
238
239 Err(crate::util::internal_error(message))
240 }
241}
242
243pub trait AcpAgentCounterpartRole: Role {}
245
246impl AcpAgentCounterpartRole for Client {}
247
248impl AcpAgentCounterpartRole for Conductor {}
249
250impl<Counterpart: AcpAgentCounterpartRole> crate::ConnectTo<Counterpart> for AcpAgent {
251 async fn connect_to(
252 self,
253 client: impl crate::ConnectTo<Counterpart::Counterpart>,
254 ) -> Result<(), crate::Error> {
255 use futures::io::BufReader;
256 use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
257
258 let (child_stdin, child_stdout, child_stderr, child) = self.spawn_process()?;
259
260 let (stderr_tx, stderr_rx) = futures::channel::oneshot::channel::<String>();
262
263 let debug_callback = self.debug_callback.clone();
267 let stderr_future = async move {
268 let stderr_reader = BufReader::new(child_stderr);
269 let mut stderr_lines = stderr_reader.lines();
270 let mut collected = String::new();
271 while let Some(line_result) = stderr_lines.next().await {
272 if let Ok(line) = line_result {
273 if let Some(ref callback) = debug_callback {
274 callback(&line, LineDirection::Stderr);
275 }
276 if !collected.is_empty() {
277 collected.push('\n');
278 }
279 collected.push_str(&line);
280 }
281 }
282 drop(stderr_tx.send(collected));
283 };
284
285 let child_monitor = monitor_child(child, stderr_rx);
287
288 let incoming_lines: std::pin::Pin<
290 Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>,
291 > = if let Some(callback) = self.debug_callback.clone() {
292 Box::pin(BufReader::new(child_stdout).lines().inspect(move |result| {
293 if let Ok(line) = result {
294 callback(line, LineDirection::Stdout);
295 }
296 }))
297 } else {
298 Box::pin(BufReader::new(child_stdout).lines())
299 };
300
301 let outgoing_sink: std::pin::Pin<
303 Box<dyn futures::Sink<String, Error = std::io::Error> + Send>,
304 > = if let Some(callback) = self.debug_callback.clone() {
305 Box::pin(futures::sink::unfold(
306 (child_stdin, callback),
307 async move |(mut writer, callback), line: String| {
308 callback(&line, LineDirection::Stdin);
309 let mut bytes = line.into_bytes();
310 bytes.push(b'\n');
311 writer.write_all(&bytes).await?;
312 Ok::<_, std::io::Error>((writer, callback))
313 },
314 ))
315 } else {
316 Box::pin(futures::sink::unfold(
317 child_stdin,
318 async move |mut writer, line: String| {
319 let mut bytes = line.into_bytes();
320 bytes.push(b'\n');
321 writer.write_all(&bytes).await?;
322 Ok::<_, std::io::Error>(writer)
323 },
324 ))
325 };
326
327 let protocol_future = crate::ConnectTo::<Counterpart>::connect_to(
330 crate::Lines::new(outgoing_sink, incoming_lines),
331 client,
332 );
333
334 let stderr_future = pin!(stderr_future);
335 let protocol_future = pin!(protocol_future);
336 let child_monitor = pin!(child_monitor);
337
338 let main_race = async {
340 match futures::future::select(protocol_future, child_monitor).await {
341 futures::future::Either::Left((result, _))
342 | futures::future::Either::Right((result, _)) => result,
343 }
344 };
345
346 let main_race = pin!(main_race);
349 match futures::future::select(main_race, stderr_future).await {
350 futures::future::Either::Left((result, _)) => result,
351 futures::future::Either::Right(((), protocol)) => protocol.await,
352 }
353 }
354}
355
356impl AcpAgent {
357 pub fn from_args<I, T>(args: I) -> Result<Self, crate::Error>
375 where
376 I: IntoIterator<Item = T>,
377 T: ToString,
378 {
379 let args: Vec<String> = args.into_iter().map(|s| s.to_string()).collect();
380
381 if args.is_empty() {
382 return Err(crate::util::internal_error("Arguments cannot be empty"));
383 }
384
385 let mut env = vec![];
386 let mut command_idx = 0;
387
388 for (i, arg) in args.iter().enumerate() {
389 if let Some((name, value)) = parse_env_var(arg) {
390 env.push(crate::schema::EnvVariable::new(name, value));
391 command_idx = i + 1;
392 } else {
393 break;
394 }
395 }
396
397 if command_idx >= args.len() {
398 return Err(crate::util::internal_error(
399 "No command found (only environment variables provided)",
400 ));
401 }
402
403 let command = PathBuf::from(&args[command_idx]);
404 let cmd_args = args[command_idx + 1..].to_vec();
405
406 let name = command
407 .file_name()
408 .and_then(|n| n.to_str())
409 .unwrap_or("agent")
410 .to_string();
411
412 Ok(AcpAgent {
413 server: crate::schema::McpServer::Stdio(
414 crate::schema::McpServerStdio::new(name, command)
415 .args(cmd_args)
416 .env(env),
417 ),
418 debug_callback: None,
419 })
420 }
421}
422
423fn parse_env_var(s: &str) -> Option<(String, String)> {
425 let eq_pos = s.find('=')?;
426 if eq_pos == 0 {
427 return None;
428 }
429
430 let name = &s[..eq_pos];
431 let value = &s[eq_pos + 1..];
432
433 let mut chars = name.chars();
434 let first = chars.next()?;
435 if !first.is_ascii_alphabetic() && first != '_' {
436 return None;
437 }
438 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
439 return None;
440 }
441
442 Some((name.to_string(), value.to_string()))
443}
444
445impl FromStr for AcpAgent {
446 type Err = crate::Error;
447
448 fn from_str(s: &str) -> Result<Self, Self::Err> {
449 let trimmed = s.trim();
450
451 if trimmed.starts_with('{') {
452 let server: crate::schema::McpServer = serde_json::from_str(trimmed)
453 .map_err(|e| crate::util::internal_error(format!("Failed to parse JSON: {e}")))?;
454 return Ok(Self {
455 server,
456 debug_callback: None,
457 });
458 }
459
460 let parts = shell_words::split(trimmed)
461 .map_err(|e| crate::util::internal_error(format!("Failed to parse command: {e}")))?;
462
463 Self::from_args(parts)
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn test_parse_simple_command() {
473 let agent = AcpAgent::from_str("python agent.py").unwrap();
474 match agent.server {
475 crate::schema::McpServer::Stdio(stdio) => {
476 assert_eq!(stdio.name, "python");
477 assert_eq!(stdio.command, PathBuf::from("python"));
478 assert_eq!(stdio.args, vec!["agent.py"]);
479 assert!(stdio.env.is_empty());
480 }
481 _ => panic!("Expected Stdio variant"),
482 }
483 }
484
485 #[test]
486 fn test_parse_command_with_args() {
487 let agent = AcpAgent::from_str("node server.js --port 8080 --verbose").unwrap();
488 match agent.server {
489 crate::schema::McpServer::Stdio(stdio) => {
490 assert_eq!(stdio.name, "node");
491 assert_eq!(stdio.command, PathBuf::from("node"));
492 assert_eq!(stdio.args, vec!["server.js", "--port", "8080", "--verbose"]);
493 assert!(stdio.env.is_empty());
494 }
495 _ => panic!("Expected Stdio variant"),
496 }
497 }
498
499 #[test]
500 fn test_parse_command_with_quotes() {
501 let agent = AcpAgent::from_str(r#"python "my agent.py" --name "Test Agent""#).unwrap();
502 match agent.server {
503 crate::schema::McpServer::Stdio(stdio) => {
504 assert_eq!(stdio.name, "python");
505 assert_eq!(stdio.command, PathBuf::from("python"));
506 assert_eq!(stdio.args, vec!["my agent.py", "--name", "Test Agent"]);
507 assert!(stdio.env.is_empty());
508 }
509 _ => panic!("Expected Stdio variant"),
510 }
511 }
512
513 #[test]
514 fn test_parse_json_stdio() {
515 let json = r#"{
516 "type": "stdio",
517 "name": "my-agent",
518 "command": "/usr/bin/python",
519 "args": ["agent.py", "--verbose"],
520 "env": []
521 }"#;
522 let agent = AcpAgent::from_str(json).unwrap();
523 match agent.server {
524 crate::schema::McpServer::Stdio(stdio) => {
525 assert_eq!(stdio.name, "my-agent");
526 assert_eq!(stdio.command, PathBuf::from("/usr/bin/python"));
527 assert_eq!(stdio.args, vec!["agent.py", "--verbose"]);
528 assert!(stdio.env.is_empty());
529 }
530 _ => panic!("Expected Stdio variant"),
531 }
532 }
533
534 #[test]
535 fn test_parse_json_http() {
536 let json = r#"{
537 "type": "http",
538 "name": "remote-agent",
539 "url": "https://example.com/agent",
540 "headers": []
541 }"#;
542 let agent = AcpAgent::from_str(json).unwrap();
543 match agent.server {
544 crate::schema::McpServer::Http(http) => {
545 assert_eq!(http.name, "remote-agent");
546 assert_eq!(http.url, "https://example.com/agent");
547 assert!(http.headers.is_empty());
548 }
549 _ => panic!("Expected Http variant"),
550 }
551 }
552}