1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6 BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19pub struct AcpBridge {
25 cmd_tx: mpsc::Sender<BridgeCommand>,
26 pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27 handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30#[derive(Clone)]
35pub struct BridgeCancelHandle {
36 cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40 pub async fn cancel(&self) -> Result<()> {
42 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43 Ok(())
44 }
45}
46
47impl AcpBridge {
48 pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53 let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54 let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56 let handle = tokio::task::spawn_blocking(move || {
57 let rt = tokio::runtime::Builder::new_current_thread()
58 .enable_all()
59 .build()
60 .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61 let local = tokio::task::LocalSet::new();
62 local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63 });
64
65 Ok(Self {
66 cmd_tx,
67 evt_rx,
68 handle,
69 })
70 }
71
72 pub fn cancel_handle(&self) -> BridgeCancelHandle {
75 BridgeCancelHandle {
76 cmd_tx: self.cmd_tx.clone(),
77 }
78 }
79
80 pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86 let reply_rx = self.send_prompt(messages).await?;
87 reply_rx
88 .await
89 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90 }
91
92 pub async fn send_prompt(
98 &self,
99 messages: Vec<String>,
100 ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102 self.cmd_tx
103 .send(BridgeCommand::Prompt {
104 messages,
105 reply: reply_tx,
106 })
107 .await
108 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109 Ok(reply_rx)
110 }
111
112 pub async fn cancel(&self) -> Result<()> {
114 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115 Ok(())
116 }
117
118 pub async fn set_mode(&self, mode: String) -> Result<()> {
120 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121 self.cmd_tx
122 .send(BridgeCommand::SetMode {
123 mode,
124 reply: reply_tx,
125 })
126 .await
127 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128 reply_rx
129 .await
130 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131 }
132
133 pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136 self.cmd_tx
137 .send(BridgeCommand::SetConfig {
138 key,
139 value,
140 reply: reply_tx,
141 })
142 .await
143 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144 reply_rx
145 .await
146 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147 }
148
149 pub async fn shutdown(self) -> Result<()> {
152 let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153 match self.handle.await {
154 Ok(Ok(())) => Ok(()),
155 Ok(Err(e)) => Err(e),
156 Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157 }
158 }
159}
160
161async fn acp_thread_main(
167 mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168 evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169 command: String,
170 args: Vec<String>,
171 cwd: PathBuf,
172) -> Result<()> {
173 let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175 let mut cmd = tokio::process::Command::new(&command);
176 cmd.args(&args_refs)
177 .stdin(std::process::Stdio::piped())
178 .stdout(std::process::Stdio::piped())
179 .stderr(std::process::Stdio::inherit())
180 .kill_on_drop(true);
181
182 cmd.env_remove("ANTHROPIC_API_KEY");
185
186 if let Some(token) = resolve_claude_auth_token()
192 && !token.starts_with("sk-ant-oat01-")
193 {
194 cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
195 }
196
197 let mut child = cmd
198 .spawn()
199 .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
200
201 let stdin = child
202 .stdin
203 .take()
204 .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
205 let stdout = child
206 .stdout
207 .take()
208 .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
209
210 let client = BridgedAcpClient {
211 evt_tx: evt_tx.clone(),
212 };
213
214 let (conn, handle_io) =
215 acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
216 tokio::task::spawn_local(fut);
217 });
218
219 tokio::task::spawn_local(async move {
221 if let Err(e) = handle_io.await {
222 eprintln!("[acp-cli] I/O error: {e}");
223 }
224 });
225
226 let result = async {
227 conn.initialize(
229 acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
230 acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
231 ),
232 )
233 .await
234 .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
235
236 let session = conn
238 .new_session(acp::NewSessionRequest::new(cwd))
239 .await
240 .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
241
242 let session_id = session.session_id;
243 let _ = evt_tx.send(BridgeEvent::SessionCreated {
244 session_id: session_id.0.to_string(),
245 });
246
247 while let Some(cmd) = cmd_rx.recv().await {
249 match cmd {
250 BridgeCommand::Prompt { messages, reply } => {
251 let content_blocks: Vec<acp::ContentBlock> =
252 messages.into_iter().map(|m| m.into()).collect();
253 let result = conn
254 .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
255 .await;
256 match result {
257 Ok(response) => {
258 let stop_reason = serde_json::to_value(response.stop_reason)
259 .ok()
260 .and_then(|v| v.as_str().map(String::from))
261 .unwrap_or_else(|| "unknown".to_string());
262 let _ = evt_tx.send(BridgeEvent::PromptDone {
263 stop_reason: stop_reason.clone(),
264 });
265 let _ = reply.send(Ok(PromptResult {
268 content: String::new(),
269 stop_reason,
270 }));
271 }
272 Err(e) => {
273 let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
274 }
275 }
276 }
277 BridgeCommand::Cancel => {
278 }
280 BridgeCommand::SetMode { mode, reply } => {
281 let mode_id = acp::SessionModeId::new(mode);
282 let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
283 match conn.set_session_mode(request).await {
284 Ok(_) => {
285 let _ = reply.send(Ok(()));
286 }
287 Err(e) => {
288 let _ = reply
289 .send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
290 }
291 }
292 }
293 BridgeCommand::SetConfig { key, value, reply } => {
294 let config_id = acp::SessionConfigId::new(key);
295 let value_id = acp::SessionConfigValueId::new(value);
296 let request = acp::SetSessionConfigOptionRequest::new(
297 session_id.clone(),
298 config_id,
299 value_id,
300 );
301 match conn.set_session_config_option(request).await {
302 Ok(_) => {
303 let _ = reply.send(Ok(()));
304 }
305 Err(e) => {
306 let _ = reply.send(Err(AcpCliError::Agent(format!(
307 "set_session_config_option: {e}"
308 ))));
309 }
310 }
311 }
312 BridgeCommand::Shutdown => break,
313 }
314 }
315 Ok(())
316 }
317 .await;
318
319 reap_child_process(&mut child).await;
321 result
322}
323
324async fn reap_child_process(child: &mut tokio::process::Child) {
325 if !matches!(child.try_wait(), Ok(Some(_))) {
326 let _ = child.start_kill();
327 }
328 let _ = child.wait().await;
329}
330
331fn resolve_claude_auth_token() -> Option<String> {
341 if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
343 .ok()
344 .filter(|t| !t.is_empty())
345 {
346 return Some(t);
347 }
348
349 let config = crate::config::AcpCliConfig::load();
351 if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
352 return Some(token);
353 }
354
355 if let Some(token) = read_claude_json_token() {
357 return Some(token);
358 }
359
360 #[cfg(target_os = "macos")]
362 if let Some(token) = read_keychain_token() {
363 return Some(token);
364 }
365
366 None
367}
368
369fn read_claude_json_token() -> Option<String> {
371 let path = dirs::home_dir()?.join(".claude.json");
372 let content = std::fs::read_to_string(path).ok()?;
373 let json: serde_json::Value = serde_json::from_str(&content).ok()?;
374
375 json.pointer("/oauthAccount/accessToken")
376 .or_else(|| json.get("accessToken"))
377 .and_then(|v| v.as_str())
378 .filter(|s| !s.is_empty())
379 .map(|s| s.to_string())
380}
381
382#[cfg(target_os = "macos")]
384fn read_keychain_token() -> Option<String> {
385 for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
387 let output = std::process::Command::new("security")
388 .args(["find-generic-password", "-s", service, "-w"])
389 .stderr(std::process::Stdio::null())
390 .output()
391 .ok()?;
392 if output.status.success() {
393 let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
394 if !token.is_empty() {
395 return Some(token);
396 }
397 }
398 }
399 None
400}
401
402#[cfg(test)]
403mod tests {
404 use super::{AcpCliError, BridgeCommand, BridgeEvent, acp_thread_main, reap_child_process};
405 use tokio::sync::mpsc;
406
407 fn exited_child_command() -> tokio::process::Command {
408 if cfg!(windows) {
409 let mut c = tokio::process::Command::new("cmd");
410 c.arg("/C").arg("exit 0");
411 c
412 } else {
413 let mut c = tokio::process::Command::new("sh");
414 c.arg("-c").arg("exit 0");
415 c
416 }
417 }
418
419 fn running_child_command() -> tokio::process::Command {
420 if cfg!(windows) {
421 let mut c = tokio::process::Command::new("cmd");
422 c.arg("/C").arg("ping -n 30 127.0.0.1 >NUL");
423 c
424 } else {
425 let mut c = tokio::process::Command::new("sh");
426 c.arg("-c").arg("sleep 10");
427 c
428 }
429 }
430
431 #[tokio::test]
432 async fn reap_child_process_handles_already_exited_child() {
433 let mut child = exited_child_command().spawn().expect("spawn child");
434
435 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
436 reap_child_process(&mut child).await;
437
438 let status = child.wait().await.expect("wait after reap");
439 assert!(status.success());
440 }
441
442 #[tokio::test]
443 async fn reap_child_process_kills_running_child() {
444 let mut child = running_child_command().spawn().expect("spawn child");
445
446 reap_child_process(&mut child).await;
447 let status = child.wait().await.expect("wait after kill");
448 assert!(!status.success());
449 }
450
451 #[cfg(unix)]
452 #[tokio::test]
453 async fn initialize_error_still_reaps_child_process() {
454 use std::os::unix::fs::PermissionsExt;
455 use tempfile::tempdir;
456
457 let temp = tempdir().expect("create tempdir");
458 let pid_file = temp.path().join("agent.pid");
459 let script = temp.path().join("fake-acp-agent.sh");
460 std::fs::write(
461 &script,
462 format!(
463 "#!/bin/sh\necho $$ > \"{}\"\nexec 1>&-\nsleep 30\n",
464 pid_file.display()
465 ),
466 )
467 .expect("write script");
468 let mut perms = std::fs::metadata(&script)
469 .expect("script metadata")
470 .permissions();
471 perms.set_mode(0o755);
472 std::fs::set_permissions(&script, perms).expect("chmod script");
473
474 let (_cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(2);
475 let (evt_tx, _evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
476
477 let local = tokio::task::LocalSet::new();
478 let result = local
479 .run_until(tokio::time::timeout(
480 std::time::Duration::from_secs(5),
481 acp_thread_main(
482 cmd_rx,
483 evt_tx,
484 script.to_string_lossy().to_string(),
485 vec![],
486 temp.path().to_path_buf(),
487 ),
488 ))
489 .await
490 .expect("acp_thread_main should not hang")
491 .expect_err("initialize should fail for non-ACP output");
492
493 assert!(
494 matches!(result, AcpCliError::Connection(_)),
495 "expected connection error, got: {result:?}"
496 );
497
498 let pid_raw = std::fs::read_to_string(&pid_file).expect("pid file");
499 let pid = pid_raw.trim().parse::<i32>().expect("parse pid");
500
501 let alive = unsafe { libc::kill(pid, 0) == 0 };
503 if alive {
504 let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
505 }
506 assert!(!alive, "child process {pid} should have been reaped");
507 }
508}