1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6 BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19pub struct AcpBridge {
25 cmd_tx: mpsc::Sender<BridgeCommand>,
26 pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27 handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30#[derive(Clone)]
35pub struct BridgeCancelHandle {
36 cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40 pub async fn cancel(&self) -> Result<()> {
42 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43 Ok(())
44 }
45}
46
47impl AcpBridge {
48 pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53 let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54 let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56 let handle = tokio::task::spawn_blocking(move || {
57 let rt = tokio::runtime::Builder::new_current_thread()
58 .enable_all()
59 .build()
60 .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61 let local = tokio::task::LocalSet::new();
62 local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63 });
64
65 Ok(Self {
66 cmd_tx,
67 evt_rx,
68 handle,
69 })
70 }
71
72 pub fn cancel_handle(&self) -> BridgeCancelHandle {
75 BridgeCancelHandle {
76 cmd_tx: self.cmd_tx.clone(),
77 }
78 }
79
80 pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86 let reply_rx = self.send_prompt(messages).await?;
87 reply_rx
88 .await
89 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90 }
91
92 pub async fn send_prompt(
98 &self,
99 messages: Vec<String>,
100 ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102 self.cmd_tx
103 .send(BridgeCommand::Prompt {
104 messages,
105 reply: reply_tx,
106 })
107 .await
108 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109 Ok(reply_rx)
110 }
111
112 pub async fn cancel(&self) -> Result<()> {
114 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115 Ok(())
116 }
117
118 pub async fn set_mode(&self, mode: String) -> Result<()> {
120 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121 self.cmd_tx
122 .send(BridgeCommand::SetMode {
123 mode,
124 reply: reply_tx,
125 })
126 .await
127 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128 reply_rx
129 .await
130 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131 }
132
133 pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136 self.cmd_tx
137 .send(BridgeCommand::SetConfig {
138 key,
139 value,
140 reply: reply_tx,
141 })
142 .await
143 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144 reply_rx
145 .await
146 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147 }
148
149 pub async fn shutdown(self) -> Result<()> {
152 let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153 match self.handle.await {
154 Ok(Ok(())) => Ok(()),
155 Ok(Err(e)) => Err(e),
156 Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157 }
158 }
159}
160
161async fn acp_thread_main(
167 mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168 evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169 command: String,
170 args: Vec<String>,
171 cwd: PathBuf,
172) -> Result<()> {
173 let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175 let mut cmd = tokio::process::Command::new(&command);
176 cmd.args(&args_refs)
177 .stdin(std::process::Stdio::piped())
178 .stdout(std::process::Stdio::piped())
179 .stderr(std::process::Stdio::inherit())
180 .kill_on_drop(true);
181
182 cmd.env_remove("ANTHROPIC_API_KEY");
185
186 if let Some(token) = resolve_claude_auth_token()
192 && !token.starts_with("sk-ant-oat01-")
193 {
194 cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
195 }
196
197 let mut child = cmd
198 .spawn()
199 .map_err(|e| AcpCliError::Connection(format!("spawn {command}: {e}")))?;
200
201 let stdin = child
202 .stdin
203 .take()
204 .ok_or_else(|| AcpCliError::Connection("agent has no stdin".into()))?;
205 let stdout = child
206 .stdout
207 .take()
208 .ok_or_else(|| AcpCliError::Connection("agent has no stdout".into()))?;
209
210 let client = BridgedAcpClient::new(evt_tx.clone());
211
212 let (conn, handle_io) =
213 acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
214 tokio::task::spawn_local(fut);
215 });
216
217 tokio::task::spawn_local(async move {
219 if let Err(e) = handle_io.await {
220 eprintln!("[acp-cli] I/O error: {e}");
221 }
222 });
223
224 let result = async {
225 conn.initialize(
227 acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
228 acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
229 ),
230 )
231 .await
232 .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
233
234 let session = conn
236 .new_session(acp::NewSessionRequest::new(cwd))
237 .await
238 .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
239
240 let session_id = session.session_id;
241 let _ = evt_tx.send(BridgeEvent::SessionCreated {
242 session_id: session_id.0.to_string(),
243 });
244
245 while let Some(cmd) = cmd_rx.recv().await {
247 match cmd {
248 BridgeCommand::Prompt { messages, reply } => {
249 let content_blocks: Vec<acp::ContentBlock> =
250 messages.into_iter().map(|m| m.into()).collect();
251 let result = conn
252 .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
253 .await;
254 match result {
255 Ok(response) => {
256 let stop_reason = serde_json::to_value(response.stop_reason)
257 .ok()
258 .and_then(|v| v.as_str().map(String::from))
259 .unwrap_or_else(|| "unknown".to_string());
260 let _ = evt_tx.send(BridgeEvent::PromptDone {
261 stop_reason: stop_reason.clone(),
262 });
263 let _ = reply.send(Ok(PromptResult {
266 content: String::new(),
267 stop_reason,
268 }));
269 }
270 Err(e) => {
271 let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
272 }
273 }
274 }
275 BridgeCommand::Cancel => {
276 }
278 BridgeCommand::SetMode { mode, reply } => {
279 let mode_id = acp::SessionModeId::new(mode);
280 let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
281 match conn.set_session_mode(request).await {
282 Ok(_) => {
283 let _ = reply.send(Ok(()));
284 }
285 Err(e) => {
286 let _ = reply
287 .send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
288 }
289 }
290 }
291 BridgeCommand::SetConfig { key, value, reply } => {
292 let config_id = acp::SessionConfigId::new(key);
293 let value_id = acp::SessionConfigValueId::new(value);
294 let request = acp::SetSessionConfigOptionRequest::new(
295 session_id.clone(),
296 config_id,
297 value_id,
298 );
299 match conn.set_session_config_option(request).await {
300 Ok(_) => {
301 let _ = reply.send(Ok(()));
302 }
303 Err(e) => {
304 let _ = reply.send(Err(AcpCliError::Agent(format!(
305 "set_session_config_option: {e}"
306 ))));
307 }
308 }
309 }
310 BridgeCommand::Shutdown => break,
311 }
312 }
313 Ok(())
314 }
315 .await;
316
317 reap_child_process(&mut child).await;
319 result
320}
321
322async fn reap_child_process(child: &mut tokio::process::Child) {
323 if !matches!(child.try_wait(), Ok(Some(_))) {
324 let _ = child.start_kill();
325 }
326 let _ = child.wait().await;
327}
328
329fn resolve_claude_auth_token() -> Option<String> {
339 if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
341 .ok()
342 .filter(|t| !t.is_empty())
343 {
344 return Some(t);
345 }
346
347 let config = crate::config::AcpCliConfig::load();
349 if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
350 return Some(token);
351 }
352
353 if let Some(token) = read_claude_json_token() {
355 return Some(token);
356 }
357
358 #[cfg(target_os = "macos")]
360 if let Some(token) = read_keychain_token() {
361 return Some(token);
362 }
363
364 None
365}
366
367fn read_claude_json_token() -> Option<String> {
369 let path = dirs::home_dir()?.join(".claude.json");
370 let content = std::fs::read_to_string(path).ok()?;
371 let json: serde_json::Value = serde_json::from_str(&content).ok()?;
372
373 json.pointer("/oauthAccount/accessToken")
374 .or_else(|| json.get("accessToken"))
375 .and_then(|v| v.as_str())
376 .filter(|s| !s.is_empty())
377 .map(|s| s.to_string())
378}
379
380#[cfg(target_os = "macos")]
382fn read_keychain_token() -> Option<String> {
383 for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
385 let output = std::process::Command::new("security")
386 .args(["find-generic-password", "-s", service, "-w"])
387 .stderr(std::process::Stdio::null())
388 .output()
389 .ok()?;
390 if output.status.success() {
391 let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
392 if !token.is_empty() {
393 return Some(token);
394 }
395 }
396 }
397 None
398}
399
400#[cfg(test)]
401mod tests {
402 use super::{AcpCliError, BridgeCommand, BridgeEvent, acp_thread_main, reap_child_process};
403 use tokio::sync::mpsc;
404
405 fn exited_child_command() -> tokio::process::Command {
406 if cfg!(windows) {
407 let mut c = tokio::process::Command::new("cmd");
408 c.arg("/C").arg("exit 0");
409 c
410 } else {
411 let mut c = tokio::process::Command::new("sh");
412 c.arg("-c").arg("exit 0");
413 c
414 }
415 }
416
417 fn running_child_command() -> tokio::process::Command {
418 if cfg!(windows) {
419 let mut c = tokio::process::Command::new("cmd");
420 c.arg("/C").arg("ping -n 30 127.0.0.1 >NUL");
421 c
422 } else {
423 let mut c = tokio::process::Command::new("sh");
424 c.arg("-c").arg("sleep 10");
425 c
426 }
427 }
428
429 #[tokio::test]
430 async fn reap_child_process_handles_already_exited_child() {
431 let mut child = exited_child_command().spawn().expect("spawn child");
432
433 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
434 reap_child_process(&mut child).await;
435
436 let status = child.wait().await.expect("wait after reap");
437 assert!(status.success());
438 }
439
440 #[tokio::test]
441 async fn reap_child_process_kills_running_child() {
442 let mut child = running_child_command().spawn().expect("spawn child");
443
444 reap_child_process(&mut child).await;
445 let status = child.wait().await.expect("wait after kill");
446 assert!(!status.success());
447 }
448
449 #[cfg(unix)]
450 #[tokio::test]
451 async fn initialize_error_still_reaps_child_process() {
452 use std::os::unix::fs::PermissionsExt;
453 use tempfile::tempdir;
454
455 let temp = tempdir().expect("create tempdir");
456 let pid_file = temp.path().join("agent.pid");
457 let script = temp.path().join("fake-acp-agent.sh");
458 std::fs::write(
459 &script,
460 format!(
461 "#!/bin/sh\necho $$ > \"{}\"\nexec 1>&-\nsleep 30\n",
462 pid_file.display()
463 ),
464 )
465 .expect("write script");
466 let mut perms = std::fs::metadata(&script)
467 .expect("script metadata")
468 .permissions();
469 perms.set_mode(0o755);
470 std::fs::set_permissions(&script, perms).expect("chmod script");
471
472 let (_cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(2);
473 let (evt_tx, _evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
474
475 let local = tokio::task::LocalSet::new();
476 let result = local
477 .run_until(tokio::time::timeout(
478 std::time::Duration::from_secs(5),
479 acp_thread_main(
480 cmd_rx,
481 evt_tx,
482 script.to_string_lossy().to_string(),
483 vec![],
484 temp.path().to_path_buf(),
485 ),
486 ))
487 .await
488 .expect("acp_thread_main should not hang")
489 .expect_err("initialize should fail for non-ACP output");
490
491 assert!(
492 matches!(result, AcpCliError::Connection(_)),
493 "expected connection error, got: {result:?}"
494 );
495
496 let pid_raw = std::fs::read_to_string(&pid_file).expect("pid file");
497 let pid = pid_raw.trim().parse::<i32>().expect("parse pid");
498
499 let alive = unsafe { libc::kill(pid, 0) == 0 };
501 if alive {
502 let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
503 }
504 assert!(!alive, "child process {pid} should have been reaped");
505 }
506}