Skip to main content

acp_cli/bridge/
mod.rs

1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6    BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19/// Core bridge that manages the ACP agent process lifecycle.
20///
21/// Spawns the agent in a dedicated blocking thread with a `LocalSet` (required
22/// because ACP futures are `!Send`), and exposes an async command/event API
23/// for the main thread to drive prompts and receive streamed output.
24pub struct AcpBridge {
25    cmd_tx: mpsc::Sender<BridgeCommand>,
26    pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27    handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30/// Handle used to send cancel/shutdown commands independently of `evt_rx`.
31///
32/// Obtained via `AcpBridge::cancel_handle()` so the caller can borrow `evt_rx`
33/// mutably while still being able to send cancel commands.
34#[derive(Clone)]
35pub struct BridgeCancelHandle {
36    cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40    /// Request cancellation of the current prompt (best-effort).
41    pub async fn cancel(&self) -> Result<()> {
42        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43        Ok(())
44    }
45}
46
47impl AcpBridge {
48    /// Start the ACP bridge by spawning the agent process in a background thread.
49    ///
50    /// The agent is launched via `command` with the given `args`, and the working
51    /// directory for the ACP session is set to `cwd`.
52    pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53        let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54        let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56        let handle = tokio::task::spawn_blocking(move || {
57            let rt = tokio::runtime::Builder::new_current_thread()
58                .enable_all()
59                .build()
60                .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61            let local = tokio::task::LocalSet::new();
62            local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63        });
64
65        Ok(Self {
66            cmd_tx,
67            evt_rx,
68            handle,
69        })
70    }
71
72    /// Obtain a lightweight cancel handle that can be used while `evt_rx` is
73    /// borrowed mutably.
74    pub fn cancel_handle(&self) -> BridgeCancelHandle {
75        BridgeCancelHandle {
76            cmd_tx: self.cmd_tx.clone(),
77        }
78    }
79
80    /// Send a prompt to the agent and wait for the result.
81    ///
82    /// Text content is streamed in real-time via `BridgeEvent::TextChunk` events
83    /// on `evt_rx`. The returned `PromptResult.content` will be empty because
84    /// the main thread is expected to collect content from those events.
85    pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86        let reply_rx = self.send_prompt(messages).await?;
87        reply_rx
88            .await
89            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90    }
91
92    /// Send a prompt command without awaiting the reply.
93    ///
94    /// Returns a oneshot receiver that will resolve when the prompt completes.
95    /// This allows the caller to drive `evt_rx` concurrently with waiting for
96    /// the prompt result, avoiding borrow conflicts on `self`.
97    pub async fn send_prompt(
98        &self,
99        messages: Vec<String>,
100    ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102        self.cmd_tx
103            .send(BridgeCommand::Prompt {
104                messages,
105                reply: reply_tx,
106            })
107            .await
108            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109        Ok(reply_rx)
110    }
111
112    /// Request cancellation of the current prompt (best-effort).
113    pub async fn cancel(&self) -> Result<()> {
114        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115        Ok(())
116    }
117
118    /// Set the session mode on the ACP connection.
119    pub async fn set_mode(&self, mode: String) -> Result<()> {
120        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121        self.cmd_tx
122            .send(BridgeCommand::SetMode {
123                mode,
124                reply: reply_tx,
125            })
126            .await
127            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128        reply_rx
129            .await
130            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131    }
132
133    /// Set a session config option on the ACP connection.
134    pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136        self.cmd_tx
137            .send(BridgeCommand::SetConfig {
138                key,
139                value,
140                reply: reply_tx,
141            })
142            .await
143            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144        reply_rx
145            .await
146            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147    }
148
149    /// Gracefully shut down the bridge, killing the agent process and joining
150    /// the background thread.
151    pub async fn shutdown(self) -> Result<()> {
152        let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153        match self.handle.await {
154            Ok(Ok(())) => Ok(()),
155            Ok(Err(e)) => Err(e),
156            Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157        }
158    }
159}
160
161/// Main loop running inside `spawn_blocking` + `LocalSet`.
162///
163/// Spawns the agent child process, establishes the ACP connection, initializes
164/// the protocol, creates a session, then enters a command loop that processes
165/// `BridgeCommand` messages from the main thread.
166async fn acp_thread_main(
167    mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168    evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169    command: String,
170    args: Vec<String>,
171    cwd: PathBuf,
172) -> Result<()> {
173    // 1. Spawn agent process
174    let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175    let mut cmd = tokio::process::Command::new(&command);
176    cmd.args(&args_refs)
177        .stdin(std::process::Stdio::piped())
178        .stdout(std::process::Stdio::piped())
179        .stderr(std::process::Stdio::inherit())
180        .kill_on_drop(true);
181
182    // Remove ANTHROPIC_API_KEY to prevent OAuth setup tokens (sk-ant-oat01-*)
183    // from being misidentified as API keys by the Claude Agent SDK.
184    cmd.env_remove("ANTHROPIC_API_KEY");
185
186    // Inject auth token for non-OAuth tokens only.
187    // OAuth tokens (sk-ant-oat01-*) must NOT be injected via env var —
188    // the Claude Agent SDK's auth flow doesn't support the oauth-2025-04-20
189    // beta header, causing 401. Let the SDK resolve OAuth tokens itself
190    // from Keychain / ~/.claude.json.
191    if let Some(token) = resolve_claude_auth_token()
192        && !token.starts_with("sk-ant-oat01-")
193    {
194        cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
195    }
196
197    let mut child = cmd
198        .spawn()
199        .map_err(|e| AcpCliError::Connection(format!("spawn {command}: {e}")))?;
200
201    let stdin = child
202        .stdin
203        .take()
204        .ok_or_else(|| AcpCliError::Connection("agent has no stdin".into()))?;
205    let stdout = child
206        .stdout
207        .take()
208        .ok_or_else(|| AcpCliError::Connection("agent has no stdout".into()))?;
209
210    let client = BridgedAcpClient::new(evt_tx.clone());
211
212    let (conn, handle_io) =
213        acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
214            tokio::task::spawn_local(fut);
215        });
216
217    // Drive the I/O loop in the background on the local task set.
218    tokio::task::spawn_local(async move {
219        if let Err(e) = handle_io.await {
220            eprintln!("[acp-cli] I/O error: {e}");
221        }
222    });
223
224    let result = async {
225        // 2. Initialize
226        conn.initialize(
227            acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
228                acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
229            ),
230        )
231        .await
232        .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
233
234        // 3. Create session
235        let session = conn
236            .new_session(acp::NewSessionRequest::new(cwd))
237            .await
238            .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
239
240        let session_id = session.session_id;
241        let _ = evt_tx.send(BridgeEvent::SessionCreated {
242            session_id: session_id.0.to_string(),
243        });
244
245        // 4. Command loop
246        while let Some(cmd) = cmd_rx.recv().await {
247            match cmd {
248                BridgeCommand::Prompt { messages, reply } => {
249                    let content_blocks: Vec<acp::ContentBlock> =
250                        messages.into_iter().map(|m| m.into()).collect();
251                    let result = conn
252                        .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
253                        .await;
254                    match result {
255                        Ok(response) => {
256                            let stop_reason = serde_json::to_value(response.stop_reason)
257                                .ok()
258                                .and_then(|v| v.as_str().map(String::from))
259                                .unwrap_or_else(|| "unknown".to_string());
260                            let _ = evt_tx.send(BridgeEvent::PromptDone {
261                                stop_reason: stop_reason.clone(),
262                            });
263                            // Content was already streamed via session_notification -> TextChunk.
264                            // The main thread collects content from BridgeEvent::TextChunk events.
265                            let _ = reply.send(Ok(PromptResult {
266                                content: String::new(),
267                                stop_reason,
268                            }));
269                        }
270                        Err(e) => {
271                            let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
272                        }
273                    }
274                }
275                BridgeCommand::Cancel => {
276                    // ACP cancel not yet implemented in SDK
277                }
278                BridgeCommand::SetMode { mode, reply } => {
279                    let mode_id = acp::SessionModeId::new(mode);
280                    let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
281                    match conn.set_session_mode(request).await {
282                        Ok(_) => {
283                            let _ = reply.send(Ok(()));
284                        }
285                        Err(e) => {
286                            let _ = reply
287                                .send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
288                        }
289                    }
290                }
291                BridgeCommand::SetConfig { key, value, reply } => {
292                    let config_id = acp::SessionConfigId::new(key);
293                    let value_id = acp::SessionConfigValueId::new(value);
294                    let request = acp::SetSessionConfigOptionRequest::new(
295                        session_id.clone(),
296                        config_id,
297                        value_id,
298                    );
299                    match conn.set_session_config_option(request).await {
300                        Ok(_) => {
301                            let _ = reply.send(Ok(()));
302                        }
303                        Err(e) => {
304                            let _ = reply.send(Err(AcpCliError::Agent(format!(
305                                "set_session_config_option: {e}"
306                            ))));
307                        }
308                    }
309                }
310                BridgeCommand::Shutdown => break,
311            }
312        }
313        Ok(())
314    }
315    .await;
316
317    // Cleanup: always reap child to avoid zombie accumulation.
318    reap_child_process(&mut child).await;
319    result
320}
321
322async fn reap_child_process(child: &mut tokio::process::Child) {
323    if !matches!(child.try_wait(), Ok(Some(_))) {
324        let _ = child.start_kill();
325    }
326    let _ = child.wait().await;
327}
328
329// ---------------------------------------------------------------------------
330// Claude auth token resolution
331// ---------------------------------------------------------------------------
332
333/// Resolve an OAuth token for claude-agent-acp, checking (in order):
334/// 1. `ANTHROPIC_AUTH_TOKEN` env var (already set externally)
335/// 2. `~/.acp-cli/config.json` → `auth_token` (set via `acp-cli init`)
336/// 3. `~/.claude.json` → `oauthAccount.accessToken`
337/// 4. macOS Keychain (`security find-generic-password`)
338fn resolve_claude_auth_token() -> Option<String> {
339    // 1. Already set externally
340    if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
341        .ok()
342        .filter(|t| !t.is_empty())
343    {
344        return Some(t);
345    }
346
347    // 2. acp-cli config
348    let config = crate::config::AcpCliConfig::load();
349    if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
350        return Some(token);
351    }
352
353    // 3. ~/.claude.json
354    if let Some(token) = read_claude_json_token() {
355        return Some(token);
356    }
357
358    // 4. macOS Keychain
359    #[cfg(target_os = "macos")]
360    if let Some(token) = read_keychain_token() {
361        return Some(token);
362    }
363
364    None
365}
366
367/// Read the OAuth access token from `~/.claude.json`.
368fn read_claude_json_token() -> Option<String> {
369    let path = dirs::home_dir()?.join(".claude.json");
370    let content = std::fs::read_to_string(path).ok()?;
371    let json: serde_json::Value = serde_json::from_str(&content).ok()?;
372
373    json.pointer("/oauthAccount/accessToken")
374        .or_else(|| json.get("accessToken"))
375        .and_then(|v| v.as_str())
376        .filter(|s| !s.is_empty())
377        .map(|s| s.to_string())
378}
379
380/// Read the Claude Code OAuth token from the macOS Keychain.
381#[cfg(target_os = "macos")]
382fn read_keychain_token() -> Option<String> {
383    // Try known service names used by Claude Code
384    for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
385        let output = std::process::Command::new("security")
386            .args(["find-generic-password", "-s", service, "-w"])
387            .stderr(std::process::Stdio::null())
388            .output()
389            .ok()?;
390        if output.status.success() {
391            let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
392            if !token.is_empty() {
393                return Some(token);
394            }
395        }
396    }
397    None
398}
399
400#[cfg(test)]
401mod tests {
402    use super::{AcpCliError, BridgeCommand, BridgeEvent, acp_thread_main, reap_child_process};
403    use tokio::sync::mpsc;
404
405    fn exited_child_command() -> tokio::process::Command {
406        if cfg!(windows) {
407            let mut c = tokio::process::Command::new("cmd");
408            c.arg("/C").arg("exit 0");
409            c
410        } else {
411            let mut c = tokio::process::Command::new("sh");
412            c.arg("-c").arg("exit 0");
413            c
414        }
415    }
416
417    fn running_child_command() -> tokio::process::Command {
418        if cfg!(windows) {
419            let mut c = tokio::process::Command::new("cmd");
420            c.arg("/C").arg("ping -n 30 127.0.0.1 >NUL");
421            c
422        } else {
423            let mut c = tokio::process::Command::new("sh");
424            c.arg("-c").arg("sleep 10");
425            c
426        }
427    }
428
429    #[tokio::test]
430    async fn reap_child_process_handles_already_exited_child() {
431        let mut child = exited_child_command().spawn().expect("spawn child");
432
433        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
434        reap_child_process(&mut child).await;
435
436        let status = child.wait().await.expect("wait after reap");
437        assert!(status.success());
438    }
439
440    #[tokio::test]
441    async fn reap_child_process_kills_running_child() {
442        let mut child = running_child_command().spawn().expect("spawn child");
443
444        reap_child_process(&mut child).await;
445        let status = child.wait().await.expect("wait after kill");
446        assert!(!status.success());
447    }
448
449    #[cfg(unix)]
450    #[tokio::test]
451    async fn initialize_error_still_reaps_child_process() {
452        use std::os::unix::fs::PermissionsExt;
453        use tempfile::tempdir;
454
455        let temp = tempdir().expect("create tempdir");
456        let pid_file = temp.path().join("agent.pid");
457        let script = temp.path().join("fake-acp-agent.sh");
458        std::fs::write(
459            &script,
460            format!(
461                "#!/bin/sh\necho $$ > \"{}\"\nexec 1>&-\nsleep 30\n",
462                pid_file.display()
463            ),
464        )
465        .expect("write script");
466        let mut perms = std::fs::metadata(&script)
467            .expect("script metadata")
468            .permissions();
469        perms.set_mode(0o755);
470        std::fs::set_permissions(&script, perms).expect("chmod script");
471
472        let (_cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(2);
473        let (evt_tx, _evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
474
475        let local = tokio::task::LocalSet::new();
476        let result = local
477            .run_until(tokio::time::timeout(
478                std::time::Duration::from_secs(5),
479                acp_thread_main(
480                    cmd_rx,
481                    evt_tx,
482                    script.to_string_lossy().to_string(),
483                    vec![],
484                    temp.path().to_path_buf(),
485                ),
486            ))
487            .await
488            .expect("acp_thread_main should not hang")
489            .expect_err("initialize should fail for non-ACP output");
490
491        assert!(
492            matches!(result, AcpCliError::Connection(_)),
493            "expected connection error, got: {result:?}"
494        );
495
496        let pid_raw = std::fs::read_to_string(&pid_file).expect("pid file");
497        let pid = pid_raw.trim().parse::<i32>().expect("parse pid");
498
499        // Verify child is gone; if still alive, kill it to avoid test leak.
500        let alive = unsafe { libc::kill(pid, 0) == 0 };
501        if alive {
502            let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
503        }
504        assert!(!alive, "child process {pid} should have been reaped");
505    }
506}