1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6 BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19pub struct AcpBridge {
25 cmd_tx: mpsc::Sender<BridgeCommand>,
26 pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27 handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30#[derive(Clone)]
35pub struct BridgeCancelHandle {
36 cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40 pub async fn cancel(&self) -> Result<()> {
42 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43 Ok(())
44 }
45}
46
47impl AcpBridge {
48 pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53 let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54 let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56 let handle = tokio::task::spawn_blocking(move || {
57 let rt = tokio::runtime::Builder::new_current_thread()
58 .enable_all()
59 .build()
60 .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61 let local = tokio::task::LocalSet::new();
62 local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63 });
64
65 Ok(Self {
66 cmd_tx,
67 evt_rx,
68 handle,
69 })
70 }
71
72 pub fn cancel_handle(&self) -> BridgeCancelHandle {
75 BridgeCancelHandle {
76 cmd_tx: self.cmd_tx.clone(),
77 }
78 }
79
80 pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86 let reply_rx = self.send_prompt(messages).await?;
87 reply_rx
88 .await
89 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90 }
91
92 pub async fn send_prompt(
98 &self,
99 messages: Vec<String>,
100 ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102 self.cmd_tx
103 .send(BridgeCommand::Prompt {
104 messages,
105 reply: reply_tx,
106 })
107 .await
108 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109 Ok(reply_rx)
110 }
111
112 pub async fn cancel(&self) -> Result<()> {
114 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115 Ok(())
116 }
117
118 pub async fn set_mode(&self, mode: String) -> Result<()> {
120 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121 self.cmd_tx
122 .send(BridgeCommand::SetMode {
123 mode,
124 reply: reply_tx,
125 })
126 .await
127 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128 reply_rx
129 .await
130 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131 }
132
133 pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136 self.cmd_tx
137 .send(BridgeCommand::SetConfig {
138 key,
139 value,
140 reply: reply_tx,
141 })
142 .await
143 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144 reply_rx
145 .await
146 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147 }
148
149 pub async fn shutdown(self) -> Result<()> {
152 let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153 match self.handle.await {
154 Ok(Ok(())) => Ok(()),
155 Ok(Err(e)) => Err(e),
156 Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157 }
158 }
159}
160
161async fn acp_thread_main(
167 mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168 evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169 command: String,
170 args: Vec<String>,
171 cwd: PathBuf,
172) -> Result<()> {
173 let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175 let mut cmd = tokio::process::Command::new(&command);
176 cmd.args(&args_refs)
177 .stdin(std::process::Stdio::piped())
178 .stdout(std::process::Stdio::piped())
179 .stderr(std::process::Stdio::inherit())
180 .kill_on_drop(true);
181
182 cmd.env_remove("ANTHROPIC_API_KEY");
185
186 if let Some(token) = resolve_claude_auth_token() {
189 cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
190 }
191
192 let mut child = cmd
193 .spawn()
194 .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
195
196 let stdin = child
197 .stdin
198 .take()
199 .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
200 let stdout = child
201 .stdout
202 .take()
203 .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
204
205 let client = BridgedAcpClient {
206 evt_tx: evt_tx.clone(),
207 };
208
209 let (conn, handle_io) =
210 acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
211 tokio::task::spawn_local(fut);
212 });
213
214 tokio::task::spawn_local(async move {
216 if let Err(e) = handle_io.await {
217 eprintln!("[acp-cli] I/O error: {e}");
218 }
219 });
220
221 conn.initialize(
223 acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
224 acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
225 ),
226 )
227 .await
228 .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
229
230 let session = conn
232 .new_session(acp::NewSessionRequest::new(cwd))
233 .await
234 .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
235
236 let session_id = session.session_id;
237 let _ = evt_tx.send(BridgeEvent::SessionCreated {
238 session_id: session_id.0.to_string(),
239 });
240
241 while let Some(cmd) = cmd_rx.recv().await {
243 match cmd {
244 BridgeCommand::Prompt { messages, reply } => {
245 let content_blocks: Vec<acp::ContentBlock> =
246 messages.into_iter().map(|m| m.into()).collect();
247 let result = conn
248 .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
249 .await;
250 match result {
251 Ok(response) => {
252 let stop_reason = serde_json::to_value(response.stop_reason)
253 .ok()
254 .and_then(|v| v.as_str().map(String::from))
255 .unwrap_or_else(|| "unknown".to_string());
256 let _ = evt_tx.send(BridgeEvent::PromptDone {
257 stop_reason: stop_reason.clone(),
258 });
259 let _ = reply.send(Ok(PromptResult {
262 content: String::new(),
263 stop_reason,
264 }));
265 }
266 Err(e) => {
267 let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
268 }
269 }
270 }
271 BridgeCommand::Cancel => {
272 }
274 BridgeCommand::SetMode { mode, reply } => {
275 let mode_id = acp::SessionModeId::new(mode);
276 let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
277 match conn.set_session_mode(request).await {
278 Ok(_) => {
279 let _ = reply.send(Ok(()));
280 }
281 Err(e) => {
282 let _ =
283 reply.send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
284 }
285 }
286 }
287 BridgeCommand::SetConfig { key, value, reply } => {
288 let config_id = acp::SessionConfigId::new(key);
289 let value_id = acp::SessionConfigValueId::new(value);
290 let request = acp::SetSessionConfigOptionRequest::new(
291 session_id.clone(),
292 config_id,
293 value_id,
294 );
295 match conn.set_session_config_option(request).await {
296 Ok(_) => {
297 let _ = reply.send(Ok(()));
298 }
299 Err(e) => {
300 let _ = reply.send(Err(AcpCliError::Agent(format!(
301 "set_session_config_option: {e}"
302 ))));
303 }
304 }
305 }
306 BridgeCommand::Shutdown => break,
307 }
308 }
309
310 child.kill().await.ok();
312 Ok(())
313}
314
315fn resolve_claude_auth_token() -> Option<String> {
325 if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
327 .ok()
328 .filter(|t| !t.is_empty())
329 {
330 return Some(t);
331 }
332
333 let config = crate::config::AcpCliConfig::load();
335 if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
336 return Some(token);
337 }
338
339 if let Some(token) = read_claude_json_token() {
341 return Some(token);
342 }
343
344 #[cfg(target_os = "macos")]
346 if let Some(token) = read_keychain_token() {
347 return Some(token);
348 }
349
350 None
351}
352
353fn read_claude_json_token() -> Option<String> {
355 let path = dirs::home_dir()?.join(".claude.json");
356 let content = std::fs::read_to_string(path).ok()?;
357 let json: serde_json::Value = serde_json::from_str(&content).ok()?;
358
359 json.pointer("/oauthAccount/accessToken")
360 .or_else(|| json.get("accessToken"))
361 .and_then(|v| v.as_str())
362 .filter(|s| !s.is_empty())
363 .map(|s| s.to_string())
364}
365
366#[cfg(target_os = "macos")]
368fn read_keychain_token() -> Option<String> {
369 for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
371 let output = std::process::Command::new("security")
372 .args(["find-generic-password", "-s", service, "-w"])
373 .stderr(std::process::Stdio::null())
374 .output()
375 .ok()?;
376 if output.status.success() {
377 let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
378 if !token.is_empty() {
379 return Some(token);
380 }
381 }
382 }
383 None
384}