Skip to main content

acp_cli/bridge/
mod.rs

1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6    BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19/// Core bridge that manages the ACP agent process lifecycle.
20///
21/// Spawns the agent in a dedicated blocking thread with a `LocalSet` (required
22/// because ACP futures are `!Send`), and exposes an async command/event API
23/// for the main thread to drive prompts and receive streamed output.
24pub struct AcpBridge {
25    cmd_tx: mpsc::Sender<BridgeCommand>,
26    pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27    handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30/// Handle used to send cancel/shutdown commands independently of `evt_rx`.
31///
32/// Obtained via `AcpBridge::cancel_handle()` so the caller can borrow `evt_rx`
33/// mutably while still being able to send cancel commands.
34#[derive(Clone)]
35pub struct BridgeCancelHandle {
36    cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40    /// Request cancellation of the current prompt (best-effort).
41    pub async fn cancel(&self) -> Result<()> {
42        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43        Ok(())
44    }
45}
46
47impl AcpBridge {
48    /// Start the ACP bridge by spawning the agent process in a background thread.
49    ///
50    /// The agent is launched via `command` with the given `args`, and the working
51    /// directory for the ACP session is set to `cwd`.
52    pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53        let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54        let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56        let handle = tokio::task::spawn_blocking(move || {
57            let rt = tokio::runtime::Builder::new_current_thread()
58                .enable_all()
59                .build()
60                .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61            let local = tokio::task::LocalSet::new();
62            local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63        });
64
65        Ok(Self {
66            cmd_tx,
67            evt_rx,
68            handle,
69        })
70    }
71
72    /// Obtain a lightweight cancel handle that can be used while `evt_rx` is
73    /// borrowed mutably.
74    pub fn cancel_handle(&self) -> BridgeCancelHandle {
75        BridgeCancelHandle {
76            cmd_tx: self.cmd_tx.clone(),
77        }
78    }
79
80    /// Send a prompt to the agent and wait for the result.
81    ///
82    /// Text content is streamed in real-time via `BridgeEvent::TextChunk` events
83    /// on `evt_rx`. The returned `PromptResult.content` will be empty because
84    /// the main thread is expected to collect content from those events.
85    pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86        let reply_rx = self.send_prompt(messages).await?;
87        reply_rx
88            .await
89            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90    }
91
92    /// Send a prompt command without awaiting the reply.
93    ///
94    /// Returns a oneshot receiver that will resolve when the prompt completes.
95    /// This allows the caller to drive `evt_rx` concurrently with waiting for
96    /// the prompt result, avoiding borrow conflicts on `self`.
97    pub async fn send_prompt(
98        &self,
99        messages: Vec<String>,
100    ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102        self.cmd_tx
103            .send(BridgeCommand::Prompt {
104                messages,
105                reply: reply_tx,
106            })
107            .await
108            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109        Ok(reply_rx)
110    }
111
112    /// Request cancellation of the current prompt (best-effort).
113    pub async fn cancel(&self) -> Result<()> {
114        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115        Ok(())
116    }
117
118    /// Set the session mode on the ACP connection.
119    pub async fn set_mode(&self, mode: String) -> Result<()> {
120        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121        self.cmd_tx
122            .send(BridgeCommand::SetMode {
123                mode,
124                reply: reply_tx,
125            })
126            .await
127            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128        reply_rx
129            .await
130            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131    }
132
133    /// Set a session config option on the ACP connection.
134    pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136        self.cmd_tx
137            .send(BridgeCommand::SetConfig {
138                key,
139                value,
140                reply: reply_tx,
141            })
142            .await
143            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144        reply_rx
145            .await
146            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147    }
148
149    /// Gracefully shut down the bridge, killing the agent process and joining
150    /// the background thread.
151    pub async fn shutdown(self) -> Result<()> {
152        let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153        match self.handle.await {
154            Ok(Ok(())) => Ok(()),
155            Ok(Err(e)) => Err(e),
156            Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157        }
158    }
159}
160
161/// Main loop running inside `spawn_blocking` + `LocalSet`.
162///
163/// Spawns the agent child process, establishes the ACP connection, initializes
164/// the protocol, creates a session, then enters a command loop that processes
165/// `BridgeCommand` messages from the main thread.
166async fn acp_thread_main(
167    mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168    evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169    command: String,
170    args: Vec<String>,
171    cwd: PathBuf,
172) -> Result<()> {
173    // 1. Spawn agent process
174    let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175    let mut cmd = tokio::process::Command::new(&command);
176    cmd.args(&args_refs)
177        .stdin(std::process::Stdio::piped())
178        .stdout(std::process::Stdio::piped())
179        .stderr(std::process::Stdio::inherit())
180        .kill_on_drop(true);
181
182    // Remove ANTHROPIC_API_KEY to prevent OAuth setup tokens (sk-ant-oat01-*)
183    // from being misidentified as API keys by the Claude Agent SDK.
184    cmd.env_remove("ANTHROPIC_API_KEY");
185
186    // Inject auth token for non-OAuth tokens only.
187    // OAuth tokens (sk-ant-oat01-*) must NOT be injected via env var —
188    // the Claude Agent SDK's auth flow doesn't support the oauth-2025-04-20
189    // beta header, causing 401. Let the SDK resolve OAuth tokens itself
190    // from Keychain / ~/.claude.json.
191    if let Some(token) = resolve_claude_auth_token() {
192        if !token.starts_with("sk-ant-oat01-") {
193            cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
194        }
195    }
196
197    let mut child = cmd
198        .spawn()
199        .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
200
201    let stdin = child
202        .stdin
203        .take()
204        .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
205    let stdout = child
206        .stdout
207        .take()
208        .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
209
210    let client = BridgedAcpClient {
211        evt_tx: evt_tx.clone(),
212    };
213
214    let (conn, handle_io) =
215        acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
216            tokio::task::spawn_local(fut);
217        });
218
219    // Drive the I/O loop in the background on the local task set.
220    tokio::task::spawn_local(async move {
221        if let Err(e) = handle_io.await {
222            eprintln!("[acp-cli] I/O error: {e}");
223        }
224    });
225
226    // 2. Initialize
227    conn.initialize(
228        acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
229            acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
230        ),
231    )
232    .await
233    .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
234
235    // 3. Create session
236    let session = conn
237        .new_session(acp::NewSessionRequest::new(cwd))
238        .await
239        .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
240
241    let session_id = session.session_id;
242    let _ = evt_tx.send(BridgeEvent::SessionCreated {
243        session_id: session_id.0.to_string(),
244    });
245
246    // 4. Command loop
247    while let Some(cmd) = cmd_rx.recv().await {
248        match cmd {
249            BridgeCommand::Prompt { messages, reply } => {
250                let content_blocks: Vec<acp::ContentBlock> =
251                    messages.into_iter().map(|m| m.into()).collect();
252                let result = conn
253                    .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
254                    .await;
255                match result {
256                    Ok(response) => {
257                        let stop_reason = serde_json::to_value(response.stop_reason)
258                            .ok()
259                            .and_then(|v| v.as_str().map(String::from))
260                            .unwrap_or_else(|| "unknown".to_string());
261                        let _ = evt_tx.send(BridgeEvent::PromptDone {
262                            stop_reason: stop_reason.clone(),
263                        });
264                        // Content was already streamed via session_notification -> TextChunk.
265                        // The main thread collects content from BridgeEvent::TextChunk events.
266                        let _ = reply.send(Ok(PromptResult {
267                            content: String::new(),
268                            stop_reason,
269                        }));
270                    }
271                    Err(e) => {
272                        let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
273                    }
274                }
275            }
276            BridgeCommand::Cancel => {
277                // ACP cancel not yet implemented in SDK
278            }
279            BridgeCommand::SetMode { mode, reply } => {
280                let mode_id = acp::SessionModeId::new(mode);
281                let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
282                match conn.set_session_mode(request).await {
283                    Ok(_) => {
284                        let _ = reply.send(Ok(()));
285                    }
286                    Err(e) => {
287                        let _ =
288                            reply.send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
289                    }
290                }
291            }
292            BridgeCommand::SetConfig { key, value, reply } => {
293                let config_id = acp::SessionConfigId::new(key);
294                let value_id = acp::SessionConfigValueId::new(value);
295                let request = acp::SetSessionConfigOptionRequest::new(
296                    session_id.clone(),
297                    config_id,
298                    value_id,
299                );
300                match conn.set_session_config_option(request).await {
301                    Ok(_) => {
302                        let _ = reply.send(Ok(()));
303                    }
304                    Err(e) => {
305                        let _ = reply.send(Err(AcpCliError::Agent(format!(
306                            "set_session_config_option: {e}"
307                        ))));
308                    }
309                }
310            }
311            BridgeCommand::Shutdown => break,
312        }
313    }
314
315    // Cleanup
316    child.kill().await.ok();
317    Ok(())
318}
319
320// ---------------------------------------------------------------------------
321// Claude auth token resolution
322// ---------------------------------------------------------------------------
323
324/// Resolve an OAuth token for claude-agent-acp, checking (in order):
325/// 1. `ANTHROPIC_AUTH_TOKEN` env var (already set externally)
326/// 2. `~/.acp-cli/config.json` → `auth_token` (set via `acp-cli init`)
327/// 3. `~/.claude.json` → `oauthAccount.accessToken`
328/// 4. macOS Keychain (`security find-generic-password`)
329fn resolve_claude_auth_token() -> Option<String> {
330    // 1. Already set externally
331    if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
332        .ok()
333        .filter(|t| !t.is_empty())
334    {
335        return Some(t);
336    }
337
338    // 2. acp-cli config
339    let config = crate::config::AcpCliConfig::load();
340    if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
341        return Some(token);
342    }
343
344    // 3. ~/.claude.json
345    if let Some(token) = read_claude_json_token() {
346        return Some(token);
347    }
348
349    // 4. macOS Keychain
350    #[cfg(target_os = "macos")]
351    if let Some(token) = read_keychain_token() {
352        return Some(token);
353    }
354
355    None
356}
357
358/// Read the OAuth access token from `~/.claude.json`.
359fn read_claude_json_token() -> Option<String> {
360    let path = dirs::home_dir()?.join(".claude.json");
361    let content = std::fs::read_to_string(path).ok()?;
362    let json: serde_json::Value = serde_json::from_str(&content).ok()?;
363
364    json.pointer("/oauthAccount/accessToken")
365        .or_else(|| json.get("accessToken"))
366        .and_then(|v| v.as_str())
367        .filter(|s| !s.is_empty())
368        .map(|s| s.to_string())
369}
370
371/// Read the Claude Code OAuth token from the macOS Keychain.
372#[cfg(target_os = "macos")]
373fn read_keychain_token() -> Option<String> {
374    // Try known service names used by Claude Code
375    for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
376        let output = std::process::Command::new("security")
377            .args(["find-generic-password", "-s", service, "-w"])
378            .stderr(std::process::Stdio::null())
379            .output()
380            .ok()?;
381        if output.status.success() {
382            let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
383            if !token.is_empty() {
384                return Some(token);
385            }
386        }
387    }
388    None
389}