Skip to main content

acp_cli/bridge/
mod.rs

1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6    BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19/// Core bridge that manages the ACP agent process lifecycle.
20///
21/// Spawns the agent in a dedicated blocking thread with a `LocalSet` (required
22/// because ACP futures are `!Send`), and exposes an async command/event API
23/// for the main thread to drive prompts and receive streamed output.
24pub struct AcpBridge {
25    cmd_tx: mpsc::Sender<BridgeCommand>,
26    pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27    handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30/// Handle used to send cancel/shutdown commands independently of `evt_rx`.
31///
32/// Obtained via `AcpBridge::cancel_handle()` so the caller can borrow `evt_rx`
33/// mutably while still being able to send cancel commands.
34#[derive(Clone)]
35pub struct BridgeCancelHandle {
36    cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40    /// Request cancellation of the current prompt (best-effort).
41    pub async fn cancel(&self) -> Result<()> {
42        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43        Ok(())
44    }
45}
46
47impl AcpBridge {
48    /// Start the ACP bridge by spawning the agent process in a background thread.
49    ///
50    /// The agent is launched via `command` with the given `args`, and the working
51    /// directory for the ACP session is set to `cwd`.
52    pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53        let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54        let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56        let handle = tokio::task::spawn_blocking(move || {
57            let rt = tokio::runtime::Builder::new_current_thread()
58                .enable_all()
59                .build()
60                .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61            let local = tokio::task::LocalSet::new();
62            local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63        });
64
65        Ok(Self {
66            cmd_tx,
67            evt_rx,
68            handle,
69        })
70    }
71
72    /// Obtain a lightweight cancel handle that can be used while `evt_rx` is
73    /// borrowed mutably.
74    pub fn cancel_handle(&self) -> BridgeCancelHandle {
75        BridgeCancelHandle {
76            cmd_tx: self.cmd_tx.clone(),
77        }
78    }
79
80    /// Send a prompt to the agent and wait for the result.
81    ///
82    /// Text content is streamed in real-time via `BridgeEvent::TextChunk` events
83    /// on `evt_rx`. The returned `PromptResult.content` will be empty because
84    /// the main thread is expected to collect content from those events.
85    pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86        let reply_rx = self.send_prompt(messages).await?;
87        reply_rx
88            .await
89            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90    }
91
92    /// Send a prompt command without awaiting the reply.
93    ///
94    /// Returns a oneshot receiver that will resolve when the prompt completes.
95    /// This allows the caller to drive `evt_rx` concurrently with waiting for
96    /// the prompt result, avoiding borrow conflicts on `self`.
97    pub async fn send_prompt(
98        &self,
99        messages: Vec<String>,
100    ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102        self.cmd_tx
103            .send(BridgeCommand::Prompt {
104                messages,
105                reply: reply_tx,
106            })
107            .await
108            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109        Ok(reply_rx)
110    }
111
112    /// Request cancellation of the current prompt (best-effort).
113    pub async fn cancel(&self) -> Result<()> {
114        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115        Ok(())
116    }
117
118    /// Set the session mode on the ACP connection.
119    pub async fn set_mode(&self, mode: String) -> Result<()> {
120        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121        self.cmd_tx
122            .send(BridgeCommand::SetMode {
123                mode,
124                reply: reply_tx,
125            })
126            .await
127            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128        reply_rx
129            .await
130            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131    }
132
133    /// Set a session config option on the ACP connection.
134    pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136        self.cmd_tx
137            .send(BridgeCommand::SetConfig {
138                key,
139                value,
140                reply: reply_tx,
141            })
142            .await
143            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144        reply_rx
145            .await
146            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147    }
148
149    /// Gracefully shut down the bridge, killing the agent process and joining
150    /// the background thread.
151    pub async fn shutdown(self) -> Result<()> {
152        let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153        match self.handle.await {
154            Ok(Ok(())) => Ok(()),
155            Ok(Err(e)) => Err(e),
156            Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157        }
158    }
159}
160
161/// Main loop running inside `spawn_blocking` + `LocalSet`.
162///
163/// Spawns the agent child process, establishes the ACP connection, initializes
164/// the protocol, creates a session, then enters a command loop that processes
165/// `BridgeCommand` messages from the main thread.
166async fn acp_thread_main(
167    mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168    evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169    command: String,
170    args: Vec<String>,
171    cwd: PathBuf,
172) -> Result<()> {
173    // 1. Spawn agent process
174    let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175    let mut cmd = tokio::process::Command::new(&command);
176    cmd.args(&args_refs)
177        .stdin(std::process::Stdio::piped())
178        .stdout(std::process::Stdio::piped())
179        .stderr(std::process::Stdio::inherit())
180        .kill_on_drop(true);
181
182    // Remove ANTHROPIC_API_KEY to prevent OAuth setup tokens (sk-ant-oat01-*)
183    // from being misidentified as API keys by the Claude Agent SDK.
184    cmd.env_remove("ANTHROPIC_API_KEY");
185
186    // Actively inject the OAuth token so claude-agent-acp doesn't have to
187    // resolve it from ~/.claude.json or Keychain (which can be stale).
188    if let Some(token) = resolve_claude_auth_token() {
189        cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
190    }
191
192    let mut child = cmd
193        .spawn()
194        .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
195
196    let stdin = child
197        .stdin
198        .take()
199        .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
200    let stdout = child
201        .stdout
202        .take()
203        .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
204
205    let client = BridgedAcpClient {
206        evt_tx: evt_tx.clone(),
207    };
208
209    let (conn, handle_io) =
210        acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
211            tokio::task::spawn_local(fut);
212        });
213
214    // Drive the I/O loop in the background on the local task set.
215    tokio::task::spawn_local(async move {
216        if let Err(e) = handle_io.await {
217            eprintln!("[acp-cli] I/O error: {e}");
218        }
219    });
220
221    // 2. Initialize
222    conn.initialize(
223        acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
224            acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
225        ),
226    )
227    .await
228    .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
229
230    // 3. Create session
231    let session = conn
232        .new_session(acp::NewSessionRequest::new(cwd))
233        .await
234        .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
235
236    let session_id = session.session_id;
237    let _ = evt_tx.send(BridgeEvent::SessionCreated {
238        session_id: session_id.0.to_string(),
239    });
240
241    // 4. Command loop
242    while let Some(cmd) = cmd_rx.recv().await {
243        match cmd {
244            BridgeCommand::Prompt { messages, reply } => {
245                let content_blocks: Vec<acp::ContentBlock> =
246                    messages.into_iter().map(|m| m.into()).collect();
247                let result = conn
248                    .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
249                    .await;
250                match result {
251                    Ok(response) => {
252                        let stop_reason = serde_json::to_value(response.stop_reason)
253                            .ok()
254                            .and_then(|v| v.as_str().map(String::from))
255                            .unwrap_or_else(|| "unknown".to_string());
256                        let _ = evt_tx.send(BridgeEvent::PromptDone {
257                            stop_reason: stop_reason.clone(),
258                        });
259                        // Content was already streamed via session_notification -> TextChunk.
260                        // The main thread collects content from BridgeEvent::TextChunk events.
261                        let _ = reply.send(Ok(PromptResult {
262                            content: String::new(),
263                            stop_reason,
264                        }));
265                    }
266                    Err(e) => {
267                        let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
268                    }
269                }
270            }
271            BridgeCommand::Cancel => {
272                // ACP cancel not yet implemented in SDK
273            }
274            BridgeCommand::SetMode { mode, reply } => {
275                let mode_id = acp::SessionModeId::new(mode);
276                let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
277                match conn.set_session_mode(request).await {
278                    Ok(_) => {
279                        let _ = reply.send(Ok(()));
280                    }
281                    Err(e) => {
282                        let _ =
283                            reply.send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
284                    }
285                }
286            }
287            BridgeCommand::SetConfig { key, value, reply } => {
288                let config_id = acp::SessionConfigId::new(key);
289                let value_id = acp::SessionConfigValueId::new(value);
290                let request = acp::SetSessionConfigOptionRequest::new(
291                    session_id.clone(),
292                    config_id,
293                    value_id,
294                );
295                match conn.set_session_config_option(request).await {
296                    Ok(_) => {
297                        let _ = reply.send(Ok(()));
298                    }
299                    Err(e) => {
300                        let _ = reply.send(Err(AcpCliError::Agent(format!(
301                            "set_session_config_option: {e}"
302                        ))));
303                    }
304                }
305            }
306            BridgeCommand::Shutdown => break,
307        }
308    }
309
310    // Cleanup
311    child.kill().await.ok();
312    Ok(())
313}
314
315// ---------------------------------------------------------------------------
316// Claude auth token resolution
317// ---------------------------------------------------------------------------
318
319/// Resolve an OAuth token for claude-agent-acp, checking (in order):
320/// 1. `ANTHROPIC_AUTH_TOKEN` env var (already set externally)
321/// 2. `~/.acp-cli/config.json` → `auth_token` (set via `acp-cli init`)
322/// 3. `~/.claude.json` → `oauthAccount.accessToken`
323/// 4. macOS Keychain (`security find-generic-password`)
324fn resolve_claude_auth_token() -> Option<String> {
325    // 1. Already set externally
326    if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
327        .ok()
328        .filter(|t| !t.is_empty())
329    {
330        return Some(t);
331    }
332
333    // 2. acp-cli config
334    let config = crate::config::AcpCliConfig::load();
335    if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
336        return Some(token);
337    }
338
339    // 3. ~/.claude.json
340    if let Some(token) = read_claude_json_token() {
341        return Some(token);
342    }
343
344    // 4. macOS Keychain
345    #[cfg(target_os = "macos")]
346    if let Some(token) = read_keychain_token() {
347        return Some(token);
348    }
349
350    None
351}
352
353/// Read the OAuth access token from `~/.claude.json`.
354fn read_claude_json_token() -> Option<String> {
355    let path = dirs::home_dir()?.join(".claude.json");
356    let content = std::fs::read_to_string(path).ok()?;
357    let json: serde_json::Value = serde_json::from_str(&content).ok()?;
358
359    json.pointer("/oauthAccount/accessToken")
360        .or_else(|| json.get("accessToken"))
361        .and_then(|v| v.as_str())
362        .filter(|s| !s.is_empty())
363        .map(|s| s.to_string())
364}
365
366/// Read the Claude Code OAuth token from the macOS Keychain.
367#[cfg(target_os = "macos")]
368fn read_keychain_token() -> Option<String> {
369    // Try known service names used by Claude Code
370    for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
371        let output = std::process::Command::new("security")
372            .args(["find-generic-password", "-s", service, "-w"])
373            .stderr(std::process::Stdio::null())
374            .output()
375            .ok()?;
376        if output.status.success() {
377            let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
378            if !token.is_empty() {
379                return Some(token);
380            }
381        }
382    }
383    None
384}