1use std::path::PathBuf;
7use std::str::FromStr;
8use std::sync::Arc;
9
10use sacp::{Client, Conductor, Role};
11use tokio::process::Child;
12use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum LineDirection {
17 Stdin,
19 Stdout,
21 Stderr,
23}
24
25pub struct AcpAgent {
82 server: sacp::schema::McpServer,
83 debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
84}
85
86impl std::fmt::Debug for AcpAgent {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("AcpAgent")
89 .field("server", &self.server)
90 .field(
91 "debug_callback",
92 &self.debug_callback.as_ref().map(|_| "..."),
93 )
94 .finish()
95 }
96}
97
98impl AcpAgent {
99 pub fn new(server: sacp::schema::McpServer) -> Self {
101 Self {
102 server,
103 debug_callback: None,
104 }
105 }
106
107 pub fn zed_claude_code() -> Self {
110 Self::from_str("npx -y @zed-industries/claude-code-acp@latest").expect("valid bash command")
111 }
112
113 pub fn zed_codex() -> Self {
116 Self::from_str("npx -y @zed-industries/codex-acp@latest").expect("valid bash command")
117 }
118
119 pub fn google_gemini() -> Self {
122 Self::from_str("npx -y -- @google/gemini-cli@latest --experimental-acp")
123 .expect("valid bash command")
124 }
125
126 pub fn server(&self) -> &sacp::schema::McpServer {
128 &self.server
129 }
130
131 pub fn into_server(self) -> sacp::schema::McpServer {
133 self.server
134 }
135
136 pub fn with_debug<F>(mut self, callback: F) -> Self
153 where
154 F: Fn(&str, LineDirection) + Send + Sync + 'static,
155 {
156 self.debug_callback = Some(Arc::new(callback));
157 self
158 }
159
160 pub fn spawn_process(
163 &self,
164 ) -> Result<
165 (
166 tokio::process::ChildStdin,
167 tokio::process::ChildStdout,
168 tokio::process::ChildStderr,
169 Child,
170 ),
171 sacp::Error,
172 > {
173 match &self.server {
174 sacp::schema::McpServer::Stdio(stdio) => {
175 let mut cmd = tokio::process::Command::new(&stdio.command);
176 cmd.args(&stdio.args);
177 for env_var in &stdio.env {
178 cmd.env(&env_var.name, &env_var.value);
179 }
180 cmd.stdin(std::process::Stdio::piped())
181 .stdout(std::process::Stdio::piped())
182 .stderr(std::process::Stdio::piped());
183
184 let mut child = cmd.spawn().map_err(sacp::Error::into_internal_error)?;
185
186 let child_stdin = child
187 .stdin
188 .take()
189 .ok_or_else(|| sacp::util::internal_error("Failed to open stdin"))?;
190 let child_stdout = child
191 .stdout
192 .take()
193 .ok_or_else(|| sacp::util::internal_error("Failed to open stdout"))?;
194 let child_stderr = child
195 .stderr
196 .take()
197 .ok_or_else(|| sacp::util::internal_error("Failed to open stderr"))?;
198
199 Ok((child_stdin, child_stdout, child_stderr, child))
200 }
201 sacp::schema::McpServer::Http(_) => Err(sacp::util::internal_error(
202 "HTTP transport not yet supported by AcpAgent",
203 )),
204 sacp::schema::McpServer::Sse(_) => Err(sacp::util::internal_error(
205 "SSE transport not yet supported by AcpAgent",
206 )),
207 _ => Err(sacp::util::internal_error(
208 "Unknown MCP server transport type",
209 )),
210 }
211 }
212}
213
214struct ChildGuard(Child);
216
217impl ChildGuard {
218 async fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
219 self.0.wait().await
220 }
221}
222
223impl Drop for ChildGuard {
224 fn drop(&mut self) {
225 let _ = self.0.start_kill();
226 }
227}
228
229async fn monitor_child(
234 child: Child,
235 stderr_rx: tokio::sync::oneshot::Receiver<String>,
236) -> Result<(), sacp::Error> {
237 let mut guard = ChildGuard(child);
238
239 let status = guard
241 .wait()
242 .await
243 .map_err(|e| sacp::util::internal_error(format!("Failed to wait for process: {}", e)))?;
244
245 if status.success() {
246 Ok(())
247 } else {
248 let stderr = stderr_rx.await.unwrap_or_default();
250
251 let message = if stderr.is_empty() {
252 format!("Process exited with {}", status)
253 } else {
254 format!("Process exited with {}: {}", status, stderr)
255 };
256
257 Err(sacp::util::internal_error(message))
258 }
259}
260
261pub trait AcpAgentCounterpartRole: Role {}
263
264impl AcpAgentCounterpartRole for Client {}
265
266impl AcpAgentCounterpartRole for Conductor {}
267
268impl<Counterpart: AcpAgentCounterpartRole> sacp::ConnectTo<Counterpart> for AcpAgent {
269 async fn connect_to(
270 self,
271 client: impl sacp::ConnectTo<Counterpart::Counterpart>,
272 ) -> Result<(), sacp::Error> {
273 use futures::AsyncBufReadExt;
274 use futures::AsyncWriteExt;
275 use futures::StreamExt;
276 use futures::io::BufReader;
277
278 let (child_stdin, child_stdout, child_stderr, child) = self.spawn_process()?;
279
280 let (stderr_tx, stderr_rx) = tokio::sync::oneshot::channel::<String>();
282
283 let debug_callback = self.debug_callback.clone();
285 tokio::spawn(async move {
286 let stderr_reader = BufReader::new(child_stderr.compat());
287 let mut stderr_lines = stderr_reader.lines();
288 let mut collected = String::new();
289 while let Some(line_result) = stderr_lines.next().await {
290 if let Ok(line) = line_result {
291 if let Some(ref callback) = debug_callback {
293 callback(&line, LineDirection::Stderr);
294 }
295 if !collected.is_empty() {
297 collected.push('\n');
298 }
299 collected.push_str(&line);
300 }
301 }
302 let _ = stderr_tx.send(collected);
303 });
304
305 let child_monitor = monitor_child(child, stderr_rx);
307
308 let incoming_lines = if let Some(callback) = self.debug_callback.clone() {
310 Box::pin(
311 BufReader::new(child_stdout.compat())
312 .lines()
313 .inspect(move |result| {
314 if let Ok(line) = result {
315 callback(line, LineDirection::Stdout);
316 }
317 }),
318 )
319 as std::pin::Pin<Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>>
320 } else {
321 Box::pin(BufReader::new(child_stdout.compat()).lines())
322 };
323
324 let outgoing_sink = if let Some(callback) = self.debug_callback.clone() {
326 Box::pin(futures::sink::unfold(
327 (child_stdin.compat_write(), callback),
328 async move |(mut writer, callback), line: String| {
329 callback(&line, LineDirection::Stdin);
330 let mut bytes = line.into_bytes();
331 bytes.push(b'\n');
332 writer.write_all(&bytes).await?;
333 Ok::<_, std::io::Error>((writer, callback))
334 },
335 ))
336 as std::pin::Pin<Box<dyn futures::Sink<String, Error = std::io::Error> + Send>>
337 } else {
338 Box::pin(futures::sink::unfold(
339 child_stdin.compat_write(),
340 async move |mut writer, line: String| {
341 let mut bytes = line.into_bytes();
342 bytes.push(b'\n');
343 writer.write_all(&bytes).await?;
344 Ok::<_, std::io::Error>(writer)
345 },
346 ))
347 };
348
349 let protocol_future = sacp::ConnectTo::<Counterpart>::connect_to(
352 sacp::Lines::new(outgoing_sink, incoming_lines),
353 client,
354 );
355
356 tokio::select! {
357 result = protocol_future => result,
358 result = child_monitor => result,
359 }
360 }
361}
362
363impl AcpAgent {
364 pub fn from_args<I, T>(args: I) -> Result<Self, sacp::Error>
382 where
383 I: IntoIterator<Item = T>,
384 T: ToString,
385 {
386 let args: Vec<String> = args.into_iter().map(|s| s.to_string()).collect();
387
388 if args.is_empty() {
389 return Err(sacp::util::internal_error("Arguments cannot be empty"));
390 }
391
392 let mut env = vec![];
393 let mut command_idx = 0;
394
395 for (i, arg) in args.iter().enumerate() {
397 if let Some((name, value)) = parse_env_var(arg) {
398 env.push(sacp::schema::EnvVariable::new(name, value));
399 command_idx = i + 1;
400 } else {
401 break;
402 }
403 }
404
405 if command_idx >= args.len() {
406 return Err(sacp::util::internal_error(
407 "No command found (only environment variables provided)",
408 ));
409 }
410
411 let command = PathBuf::from(&args[command_idx]);
412 let cmd_args = args[command_idx + 1..].to_vec();
413
414 let name = command
416 .file_name()
417 .and_then(|n| n.to_str())
418 .unwrap_or("agent")
419 .to_string();
420
421 Ok(AcpAgent {
422 server: sacp::schema::McpServer::Stdio(
423 sacp::schema::McpServerStdio::new(name, command)
424 .args(cmd_args)
425 .env(env),
426 ),
427 debug_callback: None,
428 })
429 }
430}
431
432fn parse_env_var(s: &str) -> Option<(String, String)> {
435 let eq_pos = s.find('=')?;
437 if eq_pos == 0 {
438 return None;
439 }
440
441 let name = &s[..eq_pos];
442 let value = &s[eq_pos + 1..];
443
444 let mut chars = name.chars();
447 let first = chars.next()?;
448 if !first.is_ascii_alphabetic() && first != '_' {
449 return None;
450 }
451 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
452 return None;
453 }
454
455 Some((name.to_string(), value.to_string()))
456}
457
458impl FromStr for AcpAgent {
459 type Err = sacp::Error;
460
461 fn from_str(s: &str) -> Result<Self, Self::Err> {
462 let trimmed = s.trim();
463
464 if trimmed.starts_with('{') {
466 let server: sacp::schema::McpServer = serde_json::from_str(trimmed)
467 .map_err(|e| sacp::util::internal_error(format!("Failed to parse JSON: {}", e)))?;
468 return Ok(Self {
469 server,
470 debug_callback: None,
471 });
472 }
473
474 let parts = shell_words::split(trimmed)
476 .map_err(|e| sacp::util::internal_error(format!("Failed to parse command: {}", e)))?;
477
478 Self::from_args(parts)
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
487 fn test_parse_simple_command() {
488 let agent = AcpAgent::from_str("python agent.py").unwrap();
489 match agent.server {
490 sacp::schema::McpServer::Stdio(stdio) => {
491 assert_eq!(stdio.name, "python");
492 assert_eq!(stdio.command, PathBuf::from("python"));
493 assert_eq!(stdio.args, vec!["agent.py"]);
494 assert!(stdio.env.is_empty());
495 }
496 _ => panic!("Expected Stdio variant"),
497 }
498 }
499
500 #[test]
501 fn test_parse_command_with_args() {
502 let agent = AcpAgent::from_str("node server.js --port 8080 --verbose").unwrap();
503 match agent.server {
504 sacp::schema::McpServer::Stdio(stdio) => {
505 assert_eq!(stdio.name, "node");
506 assert_eq!(stdio.command, PathBuf::from("node"));
507 assert_eq!(stdio.args, vec!["server.js", "--port", "8080", "--verbose"]);
508 assert!(stdio.env.is_empty());
509 }
510 _ => panic!("Expected Stdio variant"),
511 }
512 }
513
514 #[test]
515 fn test_parse_command_with_quotes() {
516 let agent = AcpAgent::from_str(r#"python "my agent.py" --name "Test Agent""#).unwrap();
517 match agent.server {
518 sacp::schema::McpServer::Stdio(stdio) => {
519 assert_eq!(stdio.name, "python");
520 assert_eq!(stdio.command, PathBuf::from("python"));
521 assert_eq!(stdio.args, vec!["my agent.py", "--name", "Test Agent"]);
522 assert!(stdio.env.is_empty());
523 }
524 _ => panic!("Expected Stdio variant"),
525 }
526 }
527
528 #[test]
529 fn test_parse_json_stdio() {
530 let json = r#"{
531 "type": "stdio",
532 "name": "my-agent",
533 "command": "/usr/bin/python",
534 "args": ["agent.py", "--verbose"],
535 "env": []
536 }"#;
537 let agent = AcpAgent::from_str(json).unwrap();
538 match agent.server {
539 sacp::schema::McpServer::Stdio(stdio) => {
540 assert_eq!(stdio.name, "my-agent");
541 assert_eq!(stdio.command, PathBuf::from("/usr/bin/python"));
542 assert_eq!(stdio.args, vec!["agent.py", "--verbose"]);
543 assert!(stdio.env.is_empty());
544 }
545 _ => panic!("Expected Stdio variant"),
546 }
547 }
548
549 #[test]
550 fn test_parse_json_http() {
551 let json = r#"{
552 "type": "http",
553 "name": "remote-agent",
554 "url": "https://example.com/agent",
555 "headers": []
556 }"#;
557 let agent = AcpAgent::from_str(json).unwrap();
558 match agent.server {
559 sacp::schema::McpServer::Http(http) => {
560 assert_eq!(http.name, "remote-agent");
561 assert_eq!(http.url, "https://example.com/agent");
562 assert!(http.headers.is_empty());
563 }
564 _ => panic!("Expected Http variant"),
565 }
566 }
567}