Skip to main content

acp_cli/bridge/
mod.rs

1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6    BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19/// Core bridge that manages the ACP agent process lifecycle.
20///
21/// Spawns the agent in a dedicated blocking thread with a `LocalSet` (required
22/// because ACP futures are `!Send`), and exposes an async command/event API
23/// for the main thread to drive prompts and receive streamed output.
24pub struct AcpBridge {
25    cmd_tx: mpsc::Sender<BridgeCommand>,
26    pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27    handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30/// Handle used to send cancel/shutdown commands independently of `evt_rx`.
31///
32/// Obtained via `AcpBridge::cancel_handle()` so the caller can borrow `evt_rx`
33/// mutably while still being able to send cancel commands.
34#[derive(Clone)]
35pub struct BridgeCancelHandle {
36    cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40    /// Request cancellation of the current prompt (best-effort).
41    pub async fn cancel(&self) -> Result<()> {
42        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43        Ok(())
44    }
45}
46
47impl AcpBridge {
48    /// Start the ACP bridge by spawning the agent process in a background thread.
49    ///
50    /// The agent is launched via `command` with the given `args`, and the working
51    /// directory for the ACP session is set to `cwd`.
52    pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53        let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54        let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56        let handle = tokio::task::spawn_blocking(move || {
57            let rt = tokio::runtime::Builder::new_current_thread()
58                .enable_all()
59                .build()
60                .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61            let local = tokio::task::LocalSet::new();
62            local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63        });
64
65        Ok(Self {
66            cmd_tx,
67            evt_rx,
68            handle,
69        })
70    }
71
72    /// Obtain a lightweight cancel handle that can be used while `evt_rx` is
73    /// borrowed mutably.
74    pub fn cancel_handle(&self) -> BridgeCancelHandle {
75        BridgeCancelHandle {
76            cmd_tx: self.cmd_tx.clone(),
77        }
78    }
79
80    /// Send a prompt to the agent and wait for the result.
81    ///
82    /// Text content is streamed in real-time via `BridgeEvent::TextChunk` events
83    /// on `evt_rx`. The returned `PromptResult.content` will be empty because
84    /// the main thread is expected to collect content from those events.
85    pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86        let reply_rx = self.send_prompt(messages).await?;
87        reply_rx
88            .await
89            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90    }
91
92    /// Send a prompt command without awaiting the reply.
93    ///
94    /// Returns a oneshot receiver that will resolve when the prompt completes.
95    /// This allows the caller to drive `evt_rx` concurrently with waiting for
96    /// the prompt result, avoiding borrow conflicts on `self`.
97    pub async fn send_prompt(
98        &self,
99        messages: Vec<String>,
100    ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102        self.cmd_tx
103            .send(BridgeCommand::Prompt {
104                messages,
105                reply: reply_tx,
106            })
107            .await
108            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109        Ok(reply_rx)
110    }
111
112    /// Request cancellation of the current prompt (best-effort).
113    pub async fn cancel(&self) -> Result<()> {
114        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115        Ok(())
116    }
117
118    /// Set the session mode on the ACP connection.
119    pub async fn set_mode(&self, mode: String) -> Result<()> {
120        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121        self.cmd_tx
122            .send(BridgeCommand::SetMode {
123                mode,
124                reply: reply_tx,
125            })
126            .await
127            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128        reply_rx
129            .await
130            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131    }
132
133    /// Set a session config option on the ACP connection.
134    pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136        self.cmd_tx
137            .send(BridgeCommand::SetConfig {
138                key,
139                value,
140                reply: reply_tx,
141            })
142            .await
143            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144        reply_rx
145            .await
146            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147    }
148
149    /// Gracefully shut down the bridge, killing the agent process and joining
150    /// the background thread.
151    pub async fn shutdown(self) -> Result<()> {
152        let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153        match self.handle.await {
154            Ok(Ok(())) => Ok(()),
155            Ok(Err(e)) => Err(e),
156            Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157        }
158    }
159}
160
161/// Main loop running inside `spawn_blocking` + `LocalSet`.
162///
163/// Spawns the agent child process, establishes the ACP connection, initializes
164/// the protocol, creates a session, then enters a command loop that processes
165/// `BridgeCommand` messages from the main thread.
166async fn acp_thread_main(
167    mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168    evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169    command: String,
170    args: Vec<String>,
171    cwd: PathBuf,
172) -> Result<()> {
173    // 1. Spawn agent process
174    let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175    let mut cmd = tokio::process::Command::new(&command);
176    cmd.args(&args_refs)
177        .stdin(std::process::Stdio::piped())
178        .stdout(std::process::Stdio::piped())
179        .stderr(std::process::Stdio::inherit())
180        .kill_on_drop(true);
181
182    // Remove ANTHROPIC_API_KEY to prevent OAuth setup tokens (sk-ant-oat01-*)
183    // from being misidentified as API keys by the Claude Agent SDK.
184    cmd.env_remove("ANTHROPIC_API_KEY");
185
186    // Inject auth token for non-OAuth tokens only.
187    // OAuth tokens (sk-ant-oat01-*) must NOT be injected via env var —
188    // the Claude Agent SDK's auth flow doesn't support the oauth-2025-04-20
189    // beta header, causing 401. Let the SDK resolve OAuth tokens itself
190    // from Keychain / ~/.claude.json.
191    if let Some(token) = resolve_claude_auth_token()
192        && !token.starts_with("sk-ant-oat01-")
193    {
194        cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
195    }
196
197    let mut child = cmd
198        .spawn()
199        .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
200
201    let stdin = child
202        .stdin
203        .take()
204        .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
205    let stdout = child
206        .stdout
207        .take()
208        .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
209
210    let client = BridgedAcpClient {
211        evt_tx: evt_tx.clone(),
212    };
213
214    let (conn, handle_io) =
215        acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
216            tokio::task::spawn_local(fut);
217        });
218
219    // Drive the I/O loop in the background on the local task set.
220    tokio::task::spawn_local(async move {
221        if let Err(e) = handle_io.await {
222            eprintln!("[acp-cli] I/O error: {e}");
223        }
224    });
225
226    let result = async {
227        // 2. Initialize
228        conn.initialize(
229            acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
230                acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
231            ),
232        )
233        .await
234        .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
235
236        // 3. Create session
237        let session = conn
238            .new_session(acp::NewSessionRequest::new(cwd))
239            .await
240            .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
241
242        let session_id = session.session_id;
243        let _ = evt_tx.send(BridgeEvent::SessionCreated {
244            session_id: session_id.0.to_string(),
245        });
246
247        // 4. Command loop
248        while let Some(cmd) = cmd_rx.recv().await {
249            match cmd {
250                BridgeCommand::Prompt { messages, reply } => {
251                    let content_blocks: Vec<acp::ContentBlock> =
252                        messages.into_iter().map(|m| m.into()).collect();
253                    let result = conn
254                        .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
255                        .await;
256                    match result {
257                        Ok(response) => {
258                            let stop_reason = serde_json::to_value(response.stop_reason)
259                                .ok()
260                                .and_then(|v| v.as_str().map(String::from))
261                                .unwrap_or_else(|| "unknown".to_string());
262                            let _ = evt_tx.send(BridgeEvent::PromptDone {
263                                stop_reason: stop_reason.clone(),
264                            });
265                            // Content was already streamed via session_notification -> TextChunk.
266                            // The main thread collects content from BridgeEvent::TextChunk events.
267                            let _ = reply.send(Ok(PromptResult {
268                                content: String::new(),
269                                stop_reason,
270                            }));
271                        }
272                        Err(e) => {
273                            let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
274                        }
275                    }
276                }
277                BridgeCommand::Cancel => {
278                    // ACP cancel not yet implemented in SDK
279                }
280                BridgeCommand::SetMode { mode, reply } => {
281                    let mode_id = acp::SessionModeId::new(mode);
282                    let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
283                    match conn.set_session_mode(request).await {
284                        Ok(_) => {
285                            let _ = reply.send(Ok(()));
286                        }
287                        Err(e) => {
288                            let _ = reply
289                                .send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
290                        }
291                    }
292                }
293                BridgeCommand::SetConfig { key, value, reply } => {
294                    let config_id = acp::SessionConfigId::new(key);
295                    let value_id = acp::SessionConfigValueId::new(value);
296                    let request = acp::SetSessionConfigOptionRequest::new(
297                        session_id.clone(),
298                        config_id,
299                        value_id,
300                    );
301                    match conn.set_session_config_option(request).await {
302                        Ok(_) => {
303                            let _ = reply.send(Ok(()));
304                        }
305                        Err(e) => {
306                            let _ = reply.send(Err(AcpCliError::Agent(format!(
307                                "set_session_config_option: {e}"
308                            ))));
309                        }
310                    }
311                }
312                BridgeCommand::Shutdown => break,
313            }
314        }
315        Ok(())
316    }
317    .await;
318
319    // Cleanup: always reap child to avoid zombie accumulation.
320    reap_child_process(&mut child).await;
321    result
322}
323
324async fn reap_child_process(child: &mut tokio::process::Child) {
325    if !matches!(child.try_wait(), Ok(Some(_))) {
326        let _ = child.start_kill();
327    }
328    let _ = child.wait().await;
329}
330
331// ---------------------------------------------------------------------------
332// Claude auth token resolution
333// ---------------------------------------------------------------------------
334
335/// Resolve an OAuth token for claude-agent-acp, checking (in order):
336/// 1. `ANTHROPIC_AUTH_TOKEN` env var (already set externally)
337/// 2. `~/.acp-cli/config.json` → `auth_token` (set via `acp-cli init`)
338/// 3. `~/.claude.json` → `oauthAccount.accessToken`
339/// 4. macOS Keychain (`security find-generic-password`)
340fn resolve_claude_auth_token() -> Option<String> {
341    // 1. Already set externally
342    if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
343        .ok()
344        .filter(|t| !t.is_empty())
345    {
346        return Some(t);
347    }
348
349    // 2. acp-cli config
350    let config = crate::config::AcpCliConfig::load();
351    if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
352        return Some(token);
353    }
354
355    // 3. ~/.claude.json
356    if let Some(token) = read_claude_json_token() {
357        return Some(token);
358    }
359
360    // 4. macOS Keychain
361    #[cfg(target_os = "macos")]
362    if let Some(token) = read_keychain_token() {
363        return Some(token);
364    }
365
366    None
367}
368
369/// Read the OAuth access token from `~/.claude.json`.
370fn read_claude_json_token() -> Option<String> {
371    let path = dirs::home_dir()?.join(".claude.json");
372    let content = std::fs::read_to_string(path).ok()?;
373    let json: serde_json::Value = serde_json::from_str(&content).ok()?;
374
375    json.pointer("/oauthAccount/accessToken")
376        .or_else(|| json.get("accessToken"))
377        .and_then(|v| v.as_str())
378        .filter(|s| !s.is_empty())
379        .map(|s| s.to_string())
380}
381
382/// Read the Claude Code OAuth token from the macOS Keychain.
383#[cfg(target_os = "macos")]
384fn read_keychain_token() -> Option<String> {
385    // Try known service names used by Claude Code
386    for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
387        let output = std::process::Command::new("security")
388            .args(["find-generic-password", "-s", service, "-w"])
389            .stderr(std::process::Stdio::null())
390            .output()
391            .ok()?;
392        if output.status.success() {
393            let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
394            if !token.is_empty() {
395                return Some(token);
396            }
397        }
398    }
399    None
400}
401
402#[cfg(test)]
403mod tests {
404    use super::{AcpCliError, BridgeCommand, BridgeEvent, acp_thread_main, reap_child_process};
405    use tokio::sync::mpsc;
406
407    fn exited_child_command() -> tokio::process::Command {
408        if cfg!(windows) {
409            let mut c = tokio::process::Command::new("cmd");
410            c.arg("/C").arg("exit 0");
411            c
412        } else {
413            let mut c = tokio::process::Command::new("sh");
414            c.arg("-c").arg("exit 0");
415            c
416        }
417    }
418
419    fn running_child_command() -> tokio::process::Command {
420        if cfg!(windows) {
421            let mut c = tokio::process::Command::new("cmd");
422            c.arg("/C").arg("ping -n 30 127.0.0.1 >NUL");
423            c
424        } else {
425            let mut c = tokio::process::Command::new("sh");
426            c.arg("-c").arg("sleep 10");
427            c
428        }
429    }
430
431    #[tokio::test]
432    async fn reap_child_process_handles_already_exited_child() {
433        let mut child = exited_child_command().spawn().expect("spawn child");
434
435        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
436        reap_child_process(&mut child).await;
437
438        let status = child.wait().await.expect("wait after reap");
439        assert!(status.success());
440    }
441
442    #[tokio::test]
443    async fn reap_child_process_kills_running_child() {
444        let mut child = running_child_command().spawn().expect("spawn child");
445
446        reap_child_process(&mut child).await;
447        let status = child.wait().await.expect("wait after kill");
448        assert!(!status.success());
449    }
450
451    #[cfg(unix)]
452    #[tokio::test]
453    async fn initialize_error_still_reaps_child_process() {
454        use std::os::unix::fs::PermissionsExt;
455        use tempfile::tempdir;
456
457        let temp = tempdir().expect("create tempdir");
458        let pid_file = temp.path().join("agent.pid");
459        let script = temp.path().join("fake-acp-agent.sh");
460        std::fs::write(
461            &script,
462            format!(
463                "#!/bin/sh\necho $$ > \"{}\"\nexec 1>&-\nsleep 30\n",
464                pid_file.display()
465            ),
466        )
467        .expect("write script");
468        let mut perms = std::fs::metadata(&script)
469            .expect("script metadata")
470            .permissions();
471        perms.set_mode(0o755);
472        std::fs::set_permissions(&script, perms).expect("chmod script");
473
474        let (_cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(2);
475        let (evt_tx, _evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
476
477        let local = tokio::task::LocalSet::new();
478        let result = local
479            .run_until(tokio::time::timeout(
480                std::time::Duration::from_secs(5),
481                acp_thread_main(
482                    cmd_rx,
483                    evt_tx,
484                    script.to_string_lossy().to_string(),
485                    vec![],
486                    temp.path().to_path_buf(),
487                ),
488            ))
489            .await
490            .expect("acp_thread_main should not hang")
491            .expect_err("initialize should fail for non-ACP output");
492
493        assert!(
494            matches!(result, AcpCliError::Connection(_)),
495            "expected connection error, got: {result:?}"
496        );
497
498        let pid_raw = std::fs::read_to_string(&pid_file).expect("pid file");
499        let pid = pid_raw.trim().parse::<i32>().expect("parse pid");
500
501        // Verify child is gone; if still alive, kill it to avoid test leak.
502        let alive = unsafe { libc::kill(pid, 0) == 0 };
503        if alive {
504            let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
505        }
506        assert!(!alive, "child process {pid} should have been reaped");
507    }
508}