Skip to main content

claude_code_rust/acp/
client.rs

1// Claude Code Rust - A native Rust terminal interface for Claude Code
2// Copyright (C) 2025  Simon Peter Rothgang
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as
6// published by the Free Software Foundation, either version 3 of the
7// License, or (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17use agent_client_protocol::{self as acp};
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::rc::Rc;
22use std::sync::{Arc, Mutex};
23use tokio::io::AsyncReadExt;
24use tokio::sync::mpsc;
25
26/// Convert an `std::io::Error` into an `acp::Error` with the appropriate JSON-RPC
27/// error code and the original message attached as data.
28#[allow(clippy::needless_pass_by_value)]
29fn io_err(e: std::io::Error) -> acp::Error {
30    acp::Error::internal_error().data(serde_json::Value::String(e.to_string()))
31}
32
33/// Messages sent from the ACP Client impl to the App/UI layer.
34pub enum ClientEvent {
35    /// Session update notification (streaming text, tool calls, etc.)
36    SessionUpdate(acp::SessionUpdate),
37    /// Permission request that needs user input.
38    PermissionRequest {
39        request: acp::RequestPermissionRequest,
40        response_tx: tokio::sync::oneshot::Sender<acp::RequestPermissionResponse>,
41    },
42    /// A prompt turn completed successfully.
43    TurnComplete,
44    /// `cancel` notification was accepted by the adapter.
45    TurnCancelled,
46    /// A prompt turn failed with an error.
47    TurnError(String),
48    /// Background connection completed successfully.
49    Connected {
50        session_id: acp::SessionId,
51        model_name: String,
52        mode: Option<crate::app::ModeState>,
53    },
54    /// Background connection failed.
55    ConnectionFailed(String),
56    /// Authentication is required before a session can be created.
57    AuthRequired { method_name: String, method_description: String },
58    /// Slash-command execution failed with a user-facing error.
59    SlashCommandError(String),
60    /// Custom slash command replaced the active ACP session.
61    SessionReplaced {
62        session_id: acp::SessionId,
63        model_name: String,
64        mode: Option<crate::app::ModeState>,
65    },
66    /// Startup update check found a newer published version.
67    UpdateAvailable { latest_version: String, current_version: String },
68}
69
70/// Shared handle to all spawned terminal processes.
71/// Kept accessible so the app can kill them on exit.
72pub type TerminalMap = Rc<RefCell<HashMap<String, TerminalProcess>>>;
73
74pub struct ClaudeClient {
75    event_tx: mpsc::UnboundedSender<ClientEvent>,
76    auto_approve: bool,
77    terminals: TerminalMap,
78    cwd: PathBuf,
79}
80
81pub struct TerminalProcess {
82    child: tokio::process::Child,
83    /// Accumulated stdout+stderr - append-only, never cleared.
84    /// Shared with background reader tasks via Arc.
85    pub(crate) output_buffer: Arc<Mutex<Vec<u8>>>,
86    /// Byte offset: how much of `output_buffer` has already been returned
87    /// by `terminal_output` polls. Only the adapter advances this.
88    output_cursor: usize,
89    /// The shell command that was executed (e.g. "echo hello && ls -la").
90    pub(crate) command: String,
91}
92
93/// Spawn a background task that reads from an async reader into a shared buffer.
94fn spawn_output_reader(
95    mut reader: impl tokio::io::AsyncRead + Unpin + 'static,
96    buffer: Arc<Mutex<Vec<u8>>>,
97) {
98    tokio::task::spawn_local(async move {
99        let mut chunk = [0u8; 4096];
100        loop {
101            match reader.read(&mut chunk).await {
102                Ok(0) => break,
103                Ok(n) => {
104                    if let Ok(mut buf) = buffer.lock() {
105                        buf.extend_from_slice(&chunk[..n]);
106                    } else {
107                        break;
108                    }
109                }
110                Err(e) => {
111                    tracing::warn!("terminal output reader error: {e}");
112                    break;
113                }
114            }
115        }
116    });
117}
118
119impl ClaudeClient {
120    pub fn new(
121        event_tx: mpsc::UnboundedSender<ClientEvent>,
122        auto_approve: bool,
123        cwd: PathBuf,
124    ) -> (Self, TerminalMap) {
125        let terminals = Rc::new(RefCell::new(HashMap::new()));
126        (Self { event_tx, auto_approve, terminals: Rc::clone(&terminals), cwd }, terminals)
127    }
128
129    /// Create a `ClaudeClient` that shares an existing `TerminalMap`.
130    /// Used by the background connection task to reuse `App`'s terminal map.
131    pub fn with_terminals(
132        event_tx: mpsc::UnboundedSender<ClientEvent>,
133        auto_approve: bool,
134        cwd: PathBuf,
135        terminals: TerminalMap,
136    ) -> Self {
137        Self { event_tx, auto_approve, terminals, cwd }
138    }
139}
140
141/// Kill all spawned terminal child processes. Call on app exit.
142pub fn kill_all_terminals(terminals: &TerminalMap) {
143    let mut map = terminals.borrow_mut();
144    for (_, terminal) in map.iter_mut() {
145        // start_kill is synchronous - sends the kill signal without awaiting
146        let _ = terminal.child.start_kill();
147    }
148    map.clear();
149}
150
151#[async_trait::async_trait(?Send)]
152impl acp::Client for ClaudeClient {
153    async fn request_permission(
154        &self,
155        req: acp::RequestPermissionRequest,
156    ) -> acp::Result<acp::RequestPermissionResponse> {
157        if self.auto_approve {
158            let allow_option = req
159                .options
160                .iter()
161                .find(|o| {
162                    matches!(
163                        o.kind,
164                        acp::PermissionOptionKind::AllowOnce
165                            | acp::PermissionOptionKind::AllowAlways
166                    )
167                })
168                .ok_or_else(|| {
169                    acp::Error::internal_error()
170                        .data(serde_json::Value::String("No allow option found".into()))
171                })?;
172
173            return Ok(acp::RequestPermissionResponse::new(
174                acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
175                    allow_option.option_id.clone(),
176                )),
177            ));
178        }
179
180        // Send to UI and wait for user response
181        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
182        self.event_tx.send(ClientEvent::PermissionRequest { request: req, response_tx }).map_err(
183            |_| {
184                acp::Error::internal_error()
185                    .data(serde_json::Value::String("Event channel closed".into()))
186            },
187        )?;
188
189        response_rx.await.map_err(|_| {
190            acp::Error::internal_error()
191                .data(serde_json::Value::String("Permission dialog cancelled".into()))
192        })
193    }
194
195    async fn session_notification(
196        &self,
197        notification: acp::SessionNotification,
198    ) -> acp::Result<()> {
199        self.event_tx.send(ClientEvent::SessionUpdate(notification.update)).map_err(|_| {
200            acp::Error::internal_error()
201                .data(serde_json::Value::String("Event channel closed".into()))
202        })?;
203        Ok(())
204    }
205
206    async fn read_text_file(
207        &self,
208        req: acp::ReadTextFileRequest,
209    ) -> acp::Result<acp::ReadTextFileResponse> {
210        let content = tokio::fs::read_to_string(&req.path).await.map_err(io_err)?;
211
212        let filtered = if req.line.is_some() || req.limit.is_some() {
213            let lines: Vec<&str> = content.lines().collect();
214            let start = req.line.map_or(0, |l| (l as usize).saturating_sub(1));
215            let end = req.limit.map_or(lines.len(), |l| (start + l as usize).min(lines.len()));
216            lines[start..end].join("\n")
217        } else {
218            content
219        };
220
221        Ok(acp::ReadTextFileResponse::new(filtered))
222    }
223
224    async fn write_text_file(
225        &self,
226        req: acp::WriteTextFileRequest,
227    ) -> acp::Result<acp::WriteTextFileResponse> {
228        tokio::fs::write(&req.path, &req.content).await.map_err(io_err)?;
229        Ok(acp::WriteTextFileResponse::new())
230    }
231
232    async fn create_terminal(
233        &self,
234        req: acp::CreateTerminalRequest,
235    ) -> acp::Result<acp::CreateTerminalResponse> {
236        let cwd = req.cwd.unwrap_or_else(|| self.cwd.clone());
237
238        // The ACP adapter sends the full shell command as req.command
239        // (e.g. "echo hello && ls -la"). We must wrap it in a shell.
240        let mut command = if cfg!(windows) {
241            let mut c = tokio::process::Command::new("cmd.exe");
242            c.arg("/C").arg(&req.command);
243            c
244        } else {
245            let mut c = tokio::process::Command::new("sh");
246            c.arg("-c").arg(&req.command);
247            c
248        };
249        // Append any extra args the adapter may send (typically empty)
250        command.args(&req.args);
251
252        let mut child = command
253            .current_dir(&cwd)
254            .stdin(std::process::Stdio::null())
255            .stdout(std::process::Stdio::piped())
256            .stderr(std::process::Stdio::piped())
257            .envs(req.env.iter().map(|e| (&e.name, &e.value)))
258            // Force colored output - programs disable colors when stdout is piped.
259            // These env vars cover most CLI tools across ecosystems.
260            .env("FORCE_COLOR", "1")
261            .env("CLICOLOR_FORCE", "1")
262            .env("CARGO_TERM_COLOR", "always")
263            .spawn()
264            .map_err(io_err)?;
265
266        let output_buffer = Arc::new(Mutex::new(Vec::new()));
267
268        // Spawn background tasks to drain stdout and stderr into the shared buffer
269        if let Some(stdout) = child.stdout.take() {
270            spawn_output_reader(stdout, Arc::clone(&output_buffer));
271        }
272        if let Some(stderr) = child.stderr.take() {
273            spawn_output_reader(stderr, Arc::clone(&output_buffer));
274        }
275
276        let terminal_id = uuid::Uuid::new_v4().to_string();
277        self.terminals.borrow_mut().insert(
278            terminal_id.clone(),
279            TerminalProcess {
280                child,
281                output_buffer,
282                output_cursor: 0,
283                command: req.command.clone(),
284            },
285        );
286
287        Ok(acp::CreateTerminalResponse::new(terminal_id))
288    }
289
290    async fn terminal_output(
291        &self,
292        req: acp::TerminalOutputRequest,
293    ) -> acp::Result<acp::TerminalOutputResponse> {
294        let tid = req.terminal_id.to_string();
295        let mut terminals = self.terminals.borrow_mut();
296        let terminal = terminals.get_mut(tid.as_str()).ok_or_else(|| {
297            acp::Error::internal_error()
298                .data(serde_json::Value::String(format!("Terminal not found: {tid}")))
299        })?;
300
301        // Return new output since last poll (advance cursor, never clear buffer)
302        let output = {
303            if let Ok(buf) = terminal.output_buffer.lock() {
304                let new_data = &buf[terminal.output_cursor..];
305                let data = String::from_utf8_lossy(new_data).to_string();
306                terminal.output_cursor = buf.len();
307                data
308            } else {
309                String::new()
310            }
311        };
312
313        let exit_status = match terminal.child.try_wait().map_err(io_err)? {
314            Some(status) => {
315                let mut es = acp::TerminalExitStatus::new();
316                if let Some(code) = status.code() {
317                    es = es.exit_code(code.unsigned_abs());
318                }
319                Some(es)
320            }
321            None => None,
322        };
323
324        let mut response = acp::TerminalOutputResponse::new(output, false);
325        if let Some(es) = exit_status {
326            response = response.exit_status(es);
327        }
328        Ok(response)
329    }
330
331    async fn kill_terminal_command(
332        &self,
333        req: acp::KillTerminalCommandRequest,
334    ) -> acp::Result<acp::KillTerminalCommandResponse> {
335        let tid = req.terminal_id.to_string();
336        let mut terminals = self.terminals.borrow_mut();
337        if let Some(terminal) = terminals.get_mut(tid.as_str()) {
338            // start_kill sends the signal synchronously - no await needed
339            terminal.child.start_kill().map_err(io_err)?;
340        }
341        Ok(acp::KillTerminalCommandResponse::new())
342    }
343
344    async fn wait_for_terminal_exit(
345        &self,
346        req: acp::WaitForTerminalExitRequest,
347    ) -> acp::Result<acp::WaitForTerminalExitResponse> {
348        let tid = req.terminal_id.to_string();
349
350        // Poll with try_wait to avoid holding borrow_mut across .await
351        loop {
352            {
353                let mut terminals = self.terminals.borrow_mut();
354                let terminal = terminals.get_mut(tid.as_str()).ok_or_else(|| {
355                    acp::Error::internal_error()
356                        .data(serde_json::Value::String("Terminal not found".into()))
357                })?;
358
359                if let Some(status) = terminal.child.try_wait().map_err(io_err)? {
360                    let mut exit_status = acp::TerminalExitStatus::new();
361                    if let Some(code) = status.code() {
362                        exit_status = exit_status.exit_code(code.unsigned_abs());
363                    }
364                    return Ok(acp::WaitForTerminalExitResponse::new(exit_status));
365                }
366            } // borrow_mut dropped here
367
368            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
369        }
370    }
371
372    async fn release_terminal(
373        &self,
374        req: acp::ReleaseTerminalRequest,
375    ) -> acp::Result<acp::ReleaseTerminalResponse> {
376        let tid = req.terminal_id.to_string();
377        self.terminals.borrow_mut().remove(tid.as_str());
378        Ok(acp::ReleaseTerminalResponse::new())
379    }
380}