pub mod commands;
pub mod events;
pub use commands::BridgeCommand;
pub use events::{
BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
};
use std::path::PathBuf;
use agent_client_protocol::{self as acp, Agent as _};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::client::BridgedAcpClient;
use crate::error::{AcpCliError, Result};
pub struct AcpBridge {
cmd_tx: mpsc::Sender<BridgeCommand>,
pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
handle: JoinHandle<std::result::Result<(), AcpCliError>>,
}
#[derive(Clone)]
pub struct BridgeCancelHandle {
cmd_tx: mpsc::Sender<BridgeCommand>,
}
impl BridgeCancelHandle {
pub async fn cancel(&self) -> Result<()> {
let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
Ok(())
}
}
impl AcpBridge {
pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
let handle = tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
let local = tokio::task::LocalSet::new();
local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
});
Ok(Self {
cmd_tx,
evt_rx,
handle,
})
}
pub fn cancel_handle(&self) -> BridgeCancelHandle {
BridgeCancelHandle {
cmd_tx: self.cmd_tx.clone(),
}
}
pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
let reply_rx = self.send_prompt(messages).await?;
reply_rx
.await
.map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
}
pub async fn send_prompt(
&self,
messages: Vec<String>,
) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(BridgeCommand::Prompt {
messages,
reply: reply_tx,
})
.await
.map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
Ok(reply_rx)
}
pub async fn cancel(&self) -> Result<()> {
let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
Ok(())
}
pub async fn set_mode(&self, mode: String) -> Result<()> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(BridgeCommand::SetMode {
mode,
reply: reply_tx,
})
.await
.map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
reply_rx
.await
.map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
}
pub async fn set_config(&self, key: String, value: String) -> Result<()> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(BridgeCommand::SetConfig {
key,
value,
reply: reply_tx,
})
.await
.map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
reply_rx
.await
.map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
}
pub async fn shutdown(self) -> Result<()> {
let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
match self.handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
}
}
}
async fn acp_thread_main(
mut cmd_rx: mpsc::Receiver<BridgeCommand>,
evt_tx: mpsc::UnboundedSender<BridgeEvent>,
command: String,
args: Vec<String>,
cwd: PathBuf,
) -> Result<()> {
let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let mut cmd = tokio::process::Command::new(&command);
cmd.args(&args_refs)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.kill_on_drop(true);
cmd.env_remove("ANTHROPIC_API_KEY");
if let Some(token) = resolve_claude_auth_token()
&& !token.starts_with("sk-ant-oat01-")
{
cmd.env("ANTHROPIC_AUTH_TOKEN", &token);
}
let mut child = cmd
.spawn()
.map_err(|e| AcpCliError::Connection(format!("spawn {command}: {e}")))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| AcpCliError::Connection("agent has no stdin".into()))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| AcpCliError::Connection("agent has no stdout".into()))?;
let client = BridgedAcpClient::new(evt_tx.clone());
let (conn, handle_io) =
acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
tokio::task::spawn_local(fut);
});
tokio::task::spawn_local(async move {
if let Err(e) = handle_io.await {
eprintln!("[acp-cli] I/O error: {e}");
}
});
let result = async {
conn.initialize(
acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
),
)
.await
.map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
let session = conn
.new_session(acp::NewSessionRequest::new(cwd))
.await
.map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
let session_id = session.session_id;
let _ = evt_tx.send(BridgeEvent::SessionCreated {
session_id: session_id.0.to_string(),
});
while let Some(cmd) = cmd_rx.recv().await {
match cmd {
BridgeCommand::Prompt { messages, reply } => {
let content_blocks: Vec<acp::ContentBlock> =
messages.into_iter().map(|m| m.into()).collect();
let result = conn
.prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
.await;
match result {
Ok(response) => {
let stop_reason = serde_json::to_value(response.stop_reason)
.ok()
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| "unknown".to_string());
let _ = evt_tx.send(BridgeEvent::PromptDone {
stop_reason: stop_reason.clone(),
});
let _ = reply.send(Ok(PromptResult {
content: String::new(),
stop_reason,
}));
}
Err(e) => {
let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
}
}
}
BridgeCommand::Cancel => {
}
BridgeCommand::SetMode { mode, reply } => {
let mode_id = acp::SessionModeId::new(mode);
let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
match conn.set_session_mode(request).await {
Ok(_) => {
let _ = reply.send(Ok(()));
}
Err(e) => {
let _ = reply
.send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
}
}
}
BridgeCommand::SetConfig { key, value, reply } => {
let config_id = acp::SessionConfigId::new(key);
let value_id = acp::SessionConfigValueId::new(value);
let request = acp::SetSessionConfigOptionRequest::new(
session_id.clone(),
config_id,
value_id,
);
match conn.set_session_config_option(request).await {
Ok(_) => {
let _ = reply.send(Ok(()));
}
Err(e) => {
let _ = reply.send(Err(AcpCliError::Agent(format!(
"set_session_config_option: {e}"
))));
}
}
}
BridgeCommand::Shutdown => break,
}
}
Ok(())
}
.await;
reap_child_process(&mut child).await;
result
}
async fn reap_child_process(child: &mut tokio::process::Child) {
if !matches!(child.try_wait(), Ok(Some(_))) {
let _ = child.start_kill();
}
let _ = child.wait().await;
}
fn resolve_claude_auth_token() -> Option<String> {
if let Some(t) = std::env::var("ANTHROPIC_AUTH_TOKEN")
.ok()
.filter(|t| !t.is_empty())
{
return Some(t);
}
let config = crate::config::AcpCliConfig::load();
if let Some(token) = config.auth_token.filter(|t| !t.is_empty()) {
return Some(token);
}
if let Some(token) = read_claude_json_token() {
return Some(token);
}
#[cfg(target_os = "macos")]
if let Some(token) = read_keychain_token() {
return Some(token);
}
None
}
fn read_claude_json_token() -> Option<String> {
let path = dirs::home_dir()?.join(".claude.json");
let content = std::fs::read_to_string(path).ok()?;
let json: serde_json::Value = serde_json::from_str(&content).ok()?;
json.pointer("/oauthAccount/accessToken")
.or_else(|| json.get("accessToken"))
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
}
#[cfg(target_os = "macos")]
fn read_keychain_token() -> Option<String> {
for service in &["Claude Code", "claude.ai", "anthropic.claude"] {
let output = std::process::Command::new("security")
.args(["find-generic-password", "-s", service, "-w"])
.stderr(std::process::Stdio::null())
.output()
.ok()?;
if output.status.success() {
let token = String::from_utf8(output.stdout).ok()?.trim().to_string();
if !token.is_empty() {
return Some(token);
}
}
}
None
}
#[cfg(test)]
mod tests {
use super::{AcpCliError, BridgeCommand, BridgeEvent, acp_thread_main, reap_child_process};
use tokio::sync::mpsc;
fn exited_child_command() -> tokio::process::Command {
if cfg!(windows) {
let mut c = tokio::process::Command::new("cmd");
c.arg("/C").arg("exit 0");
c
} else {
let mut c = tokio::process::Command::new("sh");
c.arg("-c").arg("exit 0");
c
}
}
fn running_child_command() -> tokio::process::Command {
if cfg!(windows) {
let mut c = tokio::process::Command::new("cmd");
c.arg("/C").arg("ping -n 30 127.0.0.1 >NUL");
c
} else {
let mut c = tokio::process::Command::new("sh");
c.arg("-c").arg("sleep 10");
c
}
}
#[tokio::test]
async fn reap_child_process_handles_already_exited_child() {
let mut child = exited_child_command().spawn().expect("spawn child");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
reap_child_process(&mut child).await;
let status = child.wait().await.expect("wait after reap");
assert!(status.success());
}
#[tokio::test]
async fn reap_child_process_kills_running_child() {
let mut child = running_child_command().spawn().expect("spawn child");
reap_child_process(&mut child).await;
let status = child.wait().await.expect("wait after kill");
assert!(!status.success());
}
#[cfg(unix)]
#[tokio::test]
async fn initialize_error_still_reaps_child_process() {
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let temp = tempdir().expect("create tempdir");
let pid_file = temp.path().join("agent.pid");
let script = temp.path().join("fake-acp-agent.sh");
std::fs::write(
&script,
format!(
"#!/bin/sh\necho $$ > \"{}\"\nexec 1>&-\nsleep 30\n",
pid_file.display()
),
)
.expect("write script");
let mut perms = std::fs::metadata(&script)
.expect("script metadata")
.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script, perms).expect("chmod script");
let (_cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(2);
let (evt_tx, _evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
let local = tokio::task::LocalSet::new();
let result = local
.run_until(tokio::time::timeout(
std::time::Duration::from_secs(5),
acp_thread_main(
cmd_rx,
evt_tx,
script.to_string_lossy().to_string(),
vec![],
temp.path().to_path_buf(),
),
))
.await
.expect("acp_thread_main should not hang")
.expect_err("initialize should fail for non-ACP output");
assert!(
matches!(result, AcpCliError::Connection(_)),
"expected connection error, got: {result:?}"
);
let pid_raw = std::fs::read_to_string(&pid_file).expect("pid file");
let pid = pid_raw.trim().parse::<i32>().expect("parse pid");
let alive = unsafe { libc::kill(pid, 0) == 0 };
if alive {
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
}
assert!(!alive, "child process {pid} should have been reaped");
}
}