1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6 BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19pub struct AcpBridge {
25 cmd_tx: mpsc::Sender<BridgeCommand>,
26 pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27 handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30#[derive(Clone)]
35pub struct BridgeCancelHandle {
36 cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40 pub async fn cancel(&self) -> Result<()> {
42 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43 Ok(())
44 }
45}
46
47impl AcpBridge {
48 pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53 let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54 let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56 let handle = tokio::task::spawn_blocking(move || {
57 let rt = tokio::runtime::Builder::new_current_thread()
58 .enable_all()
59 .build()
60 .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61 let local = tokio::task::LocalSet::new();
62 local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63 });
64
65 Ok(Self {
66 cmd_tx,
67 evt_rx,
68 handle,
69 })
70 }
71
72 pub fn cancel_handle(&self) -> BridgeCancelHandle {
75 BridgeCancelHandle {
76 cmd_tx: self.cmd_tx.clone(),
77 }
78 }
79
80 pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86 let reply_rx = self.send_prompt(messages).await?;
87 reply_rx
88 .await
89 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90 }
91
92 pub async fn send_prompt(
98 &self,
99 messages: Vec<String>,
100 ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102 self.cmd_tx
103 .send(BridgeCommand::Prompt {
104 messages,
105 reply: reply_tx,
106 })
107 .await
108 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109 Ok(reply_rx)
110 }
111
112 pub async fn cancel(&self) -> Result<()> {
114 let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115 Ok(())
116 }
117
118 pub async fn set_mode(&self, mode: String) -> Result<()> {
120 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121 self.cmd_tx
122 .send(BridgeCommand::SetMode {
123 mode,
124 reply: reply_tx,
125 })
126 .await
127 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128 reply_rx
129 .await
130 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131 }
132
133 pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136 self.cmd_tx
137 .send(BridgeCommand::SetConfig {
138 key,
139 value,
140 reply: reply_tx,
141 })
142 .await
143 .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144 reply_rx
145 .await
146 .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147 }
148
149 pub async fn shutdown(self) -> Result<()> {
152 let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153 match self.handle.await {
154 Ok(Ok(())) => Ok(()),
155 Ok(Err(e)) => Err(e),
156 Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157 }
158 }
159}
160
161async fn acp_thread_main(
167 mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168 evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169 command: String,
170 args: Vec<String>,
171 cwd: PathBuf,
172) -> Result<()> {
173 let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175 let mut child = tokio::process::Command::new(&command)
176 .args(&args_refs)
177 .stdin(std::process::Stdio::piped())
178 .stdout(std::process::Stdio::piped())
179 .stderr(std::process::Stdio::inherit())
180 .kill_on_drop(true)
181 .spawn()
182 .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
183
184 let stdin = child
185 .stdin
186 .take()
187 .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
188 let stdout = child
189 .stdout
190 .take()
191 .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
192
193 let client = BridgedAcpClient {
194 evt_tx: evt_tx.clone(),
195 };
196
197 let (conn, handle_io) =
198 acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
199 tokio::task::spawn_local(fut);
200 });
201
202 tokio::task::spawn_local(async move {
204 if let Err(e) = handle_io.await {
205 eprintln!("[acp-cli] I/O error: {e}");
206 }
207 });
208
209 conn.initialize(
211 acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
212 acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
213 ),
214 )
215 .await
216 .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
217
218 let session = conn
220 .new_session(acp::NewSessionRequest::new(cwd))
221 .await
222 .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
223
224 let session_id = session.session_id;
225 let _ = evt_tx.send(BridgeEvent::SessionCreated {
226 session_id: session_id.0.to_string(),
227 });
228
229 while let Some(cmd) = cmd_rx.recv().await {
231 match cmd {
232 BridgeCommand::Prompt { messages, reply } => {
233 let content_blocks: Vec<acp::ContentBlock> =
234 messages.into_iter().map(|m| m.into()).collect();
235 let result = conn
236 .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
237 .await;
238 match result {
239 Ok(response) => {
240 let stop_reason = serde_json::to_value(response.stop_reason)
241 .ok()
242 .and_then(|v| v.as_str().map(String::from))
243 .unwrap_or_else(|| "unknown".to_string());
244 let _ = evt_tx.send(BridgeEvent::PromptDone {
245 stop_reason: stop_reason.clone(),
246 });
247 let _ = reply.send(Ok(PromptResult {
250 content: String::new(),
251 stop_reason,
252 }));
253 }
254 Err(e) => {
255 let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
256 }
257 }
258 }
259 BridgeCommand::Cancel => {
260 }
262 BridgeCommand::SetMode { mode, reply } => {
263 let mode_id = acp::SessionModeId::new(mode);
264 let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
265 match conn.set_session_mode(request).await {
266 Ok(_) => {
267 let _ = reply.send(Ok(()));
268 }
269 Err(e) => {
270 let _ =
271 reply.send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
272 }
273 }
274 }
275 BridgeCommand::SetConfig { key, value, reply } => {
276 let config_id = acp::SessionConfigId::new(key);
277 let value_id = acp::SessionConfigValueId::new(value);
278 let request = acp::SetSessionConfigOptionRequest::new(
279 session_id.clone(),
280 config_id,
281 value_id,
282 );
283 match conn.set_session_config_option(request).await {
284 Ok(_) => {
285 let _ = reply.send(Ok(()));
286 }
287 Err(e) => {
288 let _ = reply.send(Err(AcpCliError::Agent(format!(
289 "set_session_config_option: {e}"
290 ))));
291 }
292 }
293 }
294 BridgeCommand::Shutdown => break,
295 }
296 }
297
298 child.kill().await.ok();
300 Ok(())
301}