Skip to main content

acp_cli/bridge/
mod.rs

1pub mod commands;
2pub mod events;
3
4pub use commands::BridgeCommand;
5pub use events::{
6    BridgeEvent, PermissionKind, PermissionOption, PermissionOutcome, PromptResult, ToolCallInfo,
7};
8
9use std::path::PathBuf;
10
11use agent_client_protocol::{self as acp, Agent as _};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
15
16use crate::client::BridgedAcpClient;
17use crate::error::{AcpCliError, Result};
18
19/// Core bridge that manages the ACP agent process lifecycle.
20///
21/// Spawns the agent in a dedicated blocking thread with a `LocalSet` (required
22/// because ACP futures are `!Send`), and exposes an async command/event API
23/// for the main thread to drive prompts and receive streamed output.
24pub struct AcpBridge {
25    cmd_tx: mpsc::Sender<BridgeCommand>,
26    pub evt_rx: mpsc::UnboundedReceiver<BridgeEvent>,
27    handle: JoinHandle<std::result::Result<(), AcpCliError>>,
28}
29
30/// Handle used to send cancel/shutdown commands independently of `evt_rx`.
31///
32/// Obtained via `AcpBridge::cancel_handle()` so the caller can borrow `evt_rx`
33/// mutably while still being able to send cancel commands.
34#[derive(Clone)]
35pub struct BridgeCancelHandle {
36    cmd_tx: mpsc::Sender<BridgeCommand>,
37}
38
39impl BridgeCancelHandle {
40    /// Request cancellation of the current prompt (best-effort).
41    pub async fn cancel(&self) -> Result<()> {
42        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
43        Ok(())
44    }
45}
46
47impl AcpBridge {
48    /// Start the ACP bridge by spawning the agent process in a background thread.
49    ///
50    /// The agent is launched via `command` with the given `args`, and the working
51    /// directory for the ACP session is set to `cwd`.
52    pub async fn start(command: String, args: Vec<String>, cwd: PathBuf) -> Result<Self> {
53        let (cmd_tx, cmd_rx) = mpsc::channel::<BridgeCommand>(16);
54        let (evt_tx, evt_rx) = mpsc::unbounded_channel::<BridgeEvent>();
55
56        let handle = tokio::task::spawn_blocking(move || {
57            let rt = tokio::runtime::Builder::new_current_thread()
58                .enable_all()
59                .build()
60                .map_err(|e| AcpCliError::Connection(format!("runtime: {e}")))?;
61            let local = tokio::task::LocalSet::new();
62            local.block_on(&rt, acp_thread_main(cmd_rx, evt_tx, command, args, cwd))
63        });
64
65        Ok(Self {
66            cmd_tx,
67            evt_rx,
68            handle,
69        })
70    }
71
72    /// Obtain a lightweight cancel handle that can be used while `evt_rx` is
73    /// borrowed mutably.
74    pub fn cancel_handle(&self) -> BridgeCancelHandle {
75        BridgeCancelHandle {
76            cmd_tx: self.cmd_tx.clone(),
77        }
78    }
79
80    /// Send a prompt to the agent and wait for the result.
81    ///
82    /// Text content is streamed in real-time via `BridgeEvent::TextChunk` events
83    /// on `evt_rx`. The returned `PromptResult.content` will be empty because
84    /// the main thread is expected to collect content from those events.
85    pub async fn prompt(&self, messages: Vec<String>) -> Result<PromptResult> {
86        let reply_rx = self.send_prompt(messages).await?;
87        reply_rx
88            .await
89            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
90    }
91
92    /// Send a prompt command without awaiting the reply.
93    ///
94    /// Returns a oneshot receiver that will resolve when the prompt completes.
95    /// This allows the caller to drive `evt_rx` concurrently with waiting for
96    /// the prompt result, avoiding borrow conflicts on `self`.
97    pub async fn send_prompt(
98        &self,
99        messages: Vec<String>,
100    ) -> Result<tokio::sync::oneshot::Receiver<Result<PromptResult>>> {
101        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
102        self.cmd_tx
103            .send(BridgeCommand::Prompt {
104                messages,
105                reply: reply_tx,
106            })
107            .await
108            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
109        Ok(reply_rx)
110    }
111
112    /// Request cancellation of the current prompt (best-effort).
113    pub async fn cancel(&self) -> Result<()> {
114        let _ = self.cmd_tx.send(BridgeCommand::Cancel).await;
115        Ok(())
116    }
117
118    /// Set the session mode on the ACP connection.
119    pub async fn set_mode(&self, mode: String) -> Result<()> {
120        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
121        self.cmd_tx
122            .send(BridgeCommand::SetMode {
123                mode,
124                reply: reply_tx,
125            })
126            .await
127            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
128        reply_rx
129            .await
130            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
131    }
132
133    /// Set a session config option on the ACP connection.
134    pub async fn set_config(&self, key: String, value: String) -> Result<()> {
135        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
136        self.cmd_tx
137            .send(BridgeCommand::SetConfig {
138                key,
139                value,
140                reply: reply_tx,
141            })
142            .await
143            .map_err(|_| AcpCliError::Connection("bridge channel closed".into()))?;
144        reply_rx
145            .await
146            .map_err(|_| AcpCliError::Connection("bridge reply dropped".into()))?
147    }
148
149    /// Gracefully shut down the bridge, killing the agent process and joining
150    /// the background thread.
151    pub async fn shutdown(self) -> Result<()> {
152        let _ = self.cmd_tx.send(BridgeCommand::Shutdown).await;
153        match self.handle.await {
154            Ok(Ok(())) => Ok(()),
155            Ok(Err(e)) => Err(e),
156            Err(e) => Err(AcpCliError::Connection(format!("join: {e}"))),
157        }
158    }
159}
160
161/// Main loop running inside `spawn_blocking` + `LocalSet`.
162///
163/// Spawns the agent child process, establishes the ACP connection, initializes
164/// the protocol, creates a session, then enters a command loop that processes
165/// `BridgeCommand` messages from the main thread.
166async fn acp_thread_main(
167    mut cmd_rx: mpsc::Receiver<BridgeCommand>,
168    evt_tx: mpsc::UnboundedSender<BridgeEvent>,
169    command: String,
170    args: Vec<String>,
171    cwd: PathBuf,
172) -> Result<()> {
173    // 1. Spawn agent process
174    let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
175    let mut child = tokio::process::Command::new(&command)
176        .args(&args_refs)
177        .stdin(std::process::Stdio::piped())
178        .stdout(std::process::Stdio::piped())
179        .stderr(std::process::Stdio::inherit())
180        .kill_on_drop(true)
181        .spawn()
182        .map_err(|e| AcpCliError::Agent(format!("{command}: {e}")))?;
183
184    let stdin = child
185        .stdin
186        .take()
187        .ok_or_else(|| AcpCliError::Agent("no stdin".into()))?;
188    let stdout = child
189        .stdout
190        .take()
191        .ok_or_else(|| AcpCliError::Agent("no stdout".into()))?;
192
193    let client = BridgedAcpClient {
194        evt_tx: evt_tx.clone(),
195    };
196
197    let (conn, handle_io) =
198        acp::ClientSideConnection::new(client, stdin.compat_write(), stdout.compat(), |fut| {
199            tokio::task::spawn_local(fut);
200        });
201
202    // Drive the I/O loop in the background on the local task set.
203    tokio::task::spawn_local(async move {
204        if let Err(e) = handle_io.await {
205            eprintln!("[acp-cli] I/O error: {e}");
206        }
207    });
208
209    // 2. Initialize
210    conn.initialize(
211        acp::InitializeRequest::new(acp::ProtocolVersion::V1).client_info(
212            acp::Implementation::new("acp-cli", env!("CARGO_PKG_VERSION")),
213        ),
214    )
215    .await
216    .map_err(|e| AcpCliError::Connection(format!("initialize: {e}")))?;
217
218    // 3. Create session
219    let session = conn
220        .new_session(acp::NewSessionRequest::new(cwd))
221        .await
222        .map_err(|e| AcpCliError::Connection(format!("new_session: {e}")))?;
223
224    let session_id = session.session_id;
225    let _ = evt_tx.send(BridgeEvent::SessionCreated {
226        session_id: session_id.0.to_string(),
227    });
228
229    // 4. Command loop
230    while let Some(cmd) = cmd_rx.recv().await {
231        match cmd {
232            BridgeCommand::Prompt { messages, reply } => {
233                let content_blocks: Vec<acp::ContentBlock> =
234                    messages.into_iter().map(|m| m.into()).collect();
235                let result = conn
236                    .prompt(acp::PromptRequest::new(session_id.clone(), content_blocks))
237                    .await;
238                match result {
239                    Ok(response) => {
240                        let stop_reason = serde_json::to_value(response.stop_reason)
241                            .ok()
242                            .and_then(|v| v.as_str().map(String::from))
243                            .unwrap_or_else(|| "unknown".to_string());
244                        let _ = evt_tx.send(BridgeEvent::PromptDone {
245                            stop_reason: stop_reason.clone(),
246                        });
247                        // Content was already streamed via session_notification -> TextChunk.
248                        // The main thread collects content from BridgeEvent::TextChunk events.
249                        let _ = reply.send(Ok(PromptResult {
250                            content: String::new(),
251                            stop_reason,
252                        }));
253                    }
254                    Err(e) => {
255                        let _ = reply.send(Err(AcpCliError::Agent(format!("{e}"))));
256                    }
257                }
258            }
259            BridgeCommand::Cancel => {
260                // ACP cancel not yet implemented in SDK
261            }
262            BridgeCommand::SetMode { mode, reply } => {
263                let mode_id = acp::SessionModeId::new(mode);
264                let request = acp::SetSessionModeRequest::new(session_id.clone(), mode_id);
265                match conn.set_session_mode(request).await {
266                    Ok(_) => {
267                        let _ = reply.send(Ok(()));
268                    }
269                    Err(e) => {
270                        let _ =
271                            reply.send(Err(AcpCliError::Agent(format!("set_session_mode: {e}"))));
272                    }
273                }
274            }
275            BridgeCommand::SetConfig { key, value, reply } => {
276                let config_id = acp::SessionConfigId::new(key);
277                let value_id = acp::SessionConfigValueId::new(value);
278                let request = acp::SetSessionConfigOptionRequest::new(
279                    session_id.clone(),
280                    config_id,
281                    value_id,
282                );
283                match conn.set_session_config_option(request).await {
284                    Ok(_) => {
285                        let _ = reply.send(Ok(()));
286                    }
287                    Err(e) => {
288                        let _ = reply.send(Err(AcpCliError::Agent(format!(
289                            "set_session_config_option: {e}"
290                        ))));
291                    }
292                }
293            }
294            BridgeCommand::Shutdown => break,
295        }
296    }
297
298    // Cleanup
299    child.kill().await.ok();
300    Ok(())
301}