Skip to main content

claude_code_rust/acp/
client.rs

1// Claude Code Rust - A native Rust terminal interface for Claude Code
2// Copyright (C) 2025  Simon Peter Rothgang
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as
6// published by the Free Software Foundation, either version 3 of the
7// License, or (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17use agent_client_protocol::{self as acp};
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::rc::Rc;
22use std::sync::{Arc, Mutex};
23use tokio::io::AsyncReadExt;
24use tokio::sync::mpsc;
25
26/// Convert an `std::io::Error` into an `acp::Error` with the appropriate JSON-RPC
27/// error code and the original message attached as data.
28#[allow(clippy::needless_pass_by_value)]
29fn io_err(e: std::io::Error) -> acp::Error {
30    acp::Error::internal_error().data(serde_json::Value::String(e.to_string()))
31}
32
33/// Messages sent from the ACP Client impl to the App/UI layer.
34pub enum ClientEvent {
35    /// Session update notification (streaming text, tool calls, etc.)
36    SessionUpdate(acp::SessionUpdate),
37    /// Permission request that needs user input.
38    PermissionRequest {
39        request: acp::RequestPermissionRequest,
40        response_tx: tokio::sync::oneshot::Sender<acp::RequestPermissionResponse>,
41    },
42    /// A prompt turn completed successfully.
43    TurnComplete,
44    /// `cancel` notification was accepted by the adapter.
45    TurnCancelled,
46    /// A prompt turn failed with an error.
47    TurnError(String),
48    /// Background connection completed successfully.
49    Connected {
50        session_id: acp::SessionId,
51        model_name: String,
52        mode: Option<crate::app::ModeState>,
53    },
54    /// Background connection failed.
55    ConnectionFailed(String),
56    /// Authentication is required before a session can be created.
57    AuthRequired { method_name: String, method_description: String },
58    /// Slash-command execution failed with a user-facing error.
59    SlashCommandError(String),
60    /// Custom slash command replaced the active ACP session.
61    SessionReplaced {
62        session_id: acp::SessionId,
63        model_name: String,
64        mode: Option<crate::app::ModeState>,
65    },
66}
67
68/// Shared handle to all spawned terminal processes.
69/// Kept accessible so the app can kill them on exit.
70pub type TerminalMap = Rc<RefCell<HashMap<String, TerminalProcess>>>;
71
72pub struct ClaudeClient {
73    event_tx: mpsc::UnboundedSender<ClientEvent>,
74    auto_approve: bool,
75    terminals: TerminalMap,
76    cwd: PathBuf,
77}
78
79pub struct TerminalProcess {
80    child: tokio::process::Child,
81    /// Accumulated stdout+stderr - append-only, never cleared.
82    /// Shared with background reader tasks via Arc.
83    pub(crate) output_buffer: Arc<Mutex<Vec<u8>>>,
84    /// Byte offset: how much of `output_buffer` has already been returned
85    /// by `terminal_output` polls. Only the adapter advances this.
86    output_cursor: usize,
87    /// The shell command that was executed (e.g. "echo hello && ls -la").
88    pub(crate) command: String,
89}
90
91/// Spawn a background task that reads from an async reader into a shared buffer.
92fn spawn_output_reader(
93    mut reader: impl tokio::io::AsyncRead + Unpin + 'static,
94    buffer: Arc<Mutex<Vec<u8>>>,
95) {
96    tokio::task::spawn_local(async move {
97        let mut chunk = [0u8; 4096];
98        loop {
99            match reader.read(&mut chunk).await {
100                Ok(0) => break,
101                Ok(n) => {
102                    if let Ok(mut buf) = buffer.lock() {
103                        buf.extend_from_slice(&chunk[..n]);
104                    } else {
105                        break;
106                    }
107                }
108                Err(e) => {
109                    tracing::warn!("terminal output reader error: {e}");
110                    break;
111                }
112            }
113        }
114    });
115}
116
117impl ClaudeClient {
118    pub fn new(
119        event_tx: mpsc::UnboundedSender<ClientEvent>,
120        auto_approve: bool,
121        cwd: PathBuf,
122    ) -> (Self, TerminalMap) {
123        let terminals = Rc::new(RefCell::new(HashMap::new()));
124        (Self { event_tx, auto_approve, terminals: Rc::clone(&terminals), cwd }, terminals)
125    }
126
127    /// Create a `ClaudeClient` that shares an existing `TerminalMap`.
128    /// Used by the background connection task to reuse `App`'s terminal map.
129    pub fn with_terminals(
130        event_tx: mpsc::UnboundedSender<ClientEvent>,
131        auto_approve: bool,
132        cwd: PathBuf,
133        terminals: TerminalMap,
134    ) -> Self {
135        Self { event_tx, auto_approve, terminals, cwd }
136    }
137}
138
139/// Kill all spawned terminal child processes. Call on app exit.
140pub fn kill_all_terminals(terminals: &TerminalMap) {
141    let mut map = terminals.borrow_mut();
142    for (_, terminal) in map.iter_mut() {
143        // start_kill is synchronous - sends the kill signal without awaiting
144        let _ = terminal.child.start_kill();
145    }
146    map.clear();
147}
148
149#[async_trait::async_trait(?Send)]
150impl acp::Client for ClaudeClient {
151    async fn request_permission(
152        &self,
153        req: acp::RequestPermissionRequest,
154    ) -> acp::Result<acp::RequestPermissionResponse> {
155        if self.auto_approve {
156            let allow_option = req
157                .options
158                .iter()
159                .find(|o| {
160                    matches!(
161                        o.kind,
162                        acp::PermissionOptionKind::AllowOnce
163                            | acp::PermissionOptionKind::AllowAlways
164                    )
165                })
166                .ok_or_else(|| {
167                    acp::Error::internal_error()
168                        .data(serde_json::Value::String("No allow option found".into()))
169                })?;
170
171            return Ok(acp::RequestPermissionResponse::new(
172                acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
173                    allow_option.option_id.clone(),
174                )),
175            ));
176        }
177
178        // Send to UI and wait for user response
179        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
180        self.event_tx.send(ClientEvent::PermissionRequest { request: req, response_tx }).map_err(
181            |_| {
182                acp::Error::internal_error()
183                    .data(serde_json::Value::String("Event channel closed".into()))
184            },
185        )?;
186
187        response_rx.await.map_err(|_| {
188            acp::Error::internal_error()
189                .data(serde_json::Value::String("Permission dialog cancelled".into()))
190        })
191    }
192
193    async fn session_notification(
194        &self,
195        notification: acp::SessionNotification,
196    ) -> acp::Result<()> {
197        self.event_tx.send(ClientEvent::SessionUpdate(notification.update)).map_err(|_| {
198            acp::Error::internal_error()
199                .data(serde_json::Value::String("Event channel closed".into()))
200        })?;
201        Ok(())
202    }
203
204    async fn read_text_file(
205        &self,
206        req: acp::ReadTextFileRequest,
207    ) -> acp::Result<acp::ReadTextFileResponse> {
208        let content = tokio::fs::read_to_string(&req.path).await.map_err(io_err)?;
209
210        let filtered = if req.line.is_some() || req.limit.is_some() {
211            let lines: Vec<&str> = content.lines().collect();
212            let start = req.line.map_or(0, |l| (l as usize).saturating_sub(1));
213            let end = req.limit.map_or(lines.len(), |l| (start + l as usize).min(lines.len()));
214            lines[start..end].join("\n")
215        } else {
216            content
217        };
218
219        Ok(acp::ReadTextFileResponse::new(filtered))
220    }
221
222    async fn write_text_file(
223        &self,
224        req: acp::WriteTextFileRequest,
225    ) -> acp::Result<acp::WriteTextFileResponse> {
226        tokio::fs::write(&req.path, &req.content).await.map_err(io_err)?;
227        Ok(acp::WriteTextFileResponse::new())
228    }
229
230    async fn create_terminal(
231        &self,
232        req: acp::CreateTerminalRequest,
233    ) -> acp::Result<acp::CreateTerminalResponse> {
234        let cwd = req.cwd.unwrap_or_else(|| self.cwd.clone());
235
236        // The ACP adapter sends the full shell command as req.command
237        // (e.g. "echo hello && ls -la"). We must wrap it in a shell.
238        let mut command = if cfg!(windows) {
239            let mut c = tokio::process::Command::new("cmd.exe");
240            c.arg("/C").arg(&req.command);
241            c
242        } else {
243            let mut c = tokio::process::Command::new("sh");
244            c.arg("-c").arg(&req.command);
245            c
246        };
247        // Append any extra args the adapter may send (typically empty)
248        command.args(&req.args);
249
250        let mut child = command
251            .current_dir(&cwd)
252            .stdin(std::process::Stdio::null())
253            .stdout(std::process::Stdio::piped())
254            .stderr(std::process::Stdio::piped())
255            .envs(req.env.iter().map(|e| (&e.name, &e.value)))
256            // Force colored output - programs disable colors when stdout is piped.
257            // These env vars cover most CLI tools across ecosystems.
258            .env("FORCE_COLOR", "1")
259            .env("CLICOLOR_FORCE", "1")
260            .env("CARGO_TERM_COLOR", "always")
261            .spawn()
262            .map_err(io_err)?;
263
264        let output_buffer = Arc::new(Mutex::new(Vec::new()));
265
266        // Spawn background tasks to drain stdout and stderr into the shared buffer
267        if let Some(stdout) = child.stdout.take() {
268            spawn_output_reader(stdout, Arc::clone(&output_buffer));
269        }
270        if let Some(stderr) = child.stderr.take() {
271            spawn_output_reader(stderr, Arc::clone(&output_buffer));
272        }
273
274        let terminal_id = uuid::Uuid::new_v4().to_string();
275        self.terminals.borrow_mut().insert(
276            terminal_id.clone(),
277            TerminalProcess {
278                child,
279                output_buffer,
280                output_cursor: 0,
281                command: req.command.clone(),
282            },
283        );
284
285        Ok(acp::CreateTerminalResponse::new(terminal_id))
286    }
287
288    async fn terminal_output(
289        &self,
290        req: acp::TerminalOutputRequest,
291    ) -> acp::Result<acp::TerminalOutputResponse> {
292        let tid = req.terminal_id.to_string();
293        let mut terminals = self.terminals.borrow_mut();
294        let terminal = terminals.get_mut(tid.as_str()).ok_or_else(|| {
295            acp::Error::internal_error()
296                .data(serde_json::Value::String(format!("Terminal not found: {tid}")))
297        })?;
298
299        // Return new output since last poll (advance cursor, never clear buffer)
300        let output = {
301            if let Ok(buf) = terminal.output_buffer.lock() {
302                let new_data = &buf[terminal.output_cursor..];
303                let data = String::from_utf8_lossy(new_data).to_string();
304                terminal.output_cursor = buf.len();
305                data
306            } else {
307                String::new()
308            }
309        };
310
311        let exit_status = match terminal.child.try_wait().map_err(io_err)? {
312            Some(status) => {
313                let mut es = acp::TerminalExitStatus::new();
314                if let Some(code) = status.code() {
315                    es = es.exit_code(code.unsigned_abs());
316                }
317                Some(es)
318            }
319            None => None,
320        };
321
322        let mut response = acp::TerminalOutputResponse::new(output, false);
323        if let Some(es) = exit_status {
324            response = response.exit_status(es);
325        }
326        Ok(response)
327    }
328
329    async fn kill_terminal_command(
330        &self,
331        req: acp::KillTerminalCommandRequest,
332    ) -> acp::Result<acp::KillTerminalCommandResponse> {
333        let tid = req.terminal_id.to_string();
334        let mut terminals = self.terminals.borrow_mut();
335        if let Some(terminal) = terminals.get_mut(tid.as_str()) {
336            // start_kill sends the signal synchronously - no await needed
337            terminal.child.start_kill().map_err(io_err)?;
338        }
339        Ok(acp::KillTerminalCommandResponse::new())
340    }
341
342    async fn wait_for_terminal_exit(
343        &self,
344        req: acp::WaitForTerminalExitRequest,
345    ) -> acp::Result<acp::WaitForTerminalExitResponse> {
346        let tid = req.terminal_id.to_string();
347
348        // Poll with try_wait to avoid holding borrow_mut across .await
349        loop {
350            {
351                let mut terminals = self.terminals.borrow_mut();
352                let terminal = terminals.get_mut(tid.as_str()).ok_or_else(|| {
353                    acp::Error::internal_error()
354                        .data(serde_json::Value::String("Terminal not found".into()))
355                })?;
356
357                if let Some(status) = terminal.child.try_wait().map_err(io_err)? {
358                    let mut exit_status = acp::TerminalExitStatus::new();
359                    if let Some(code) = status.code() {
360                        exit_status = exit_status.exit_code(code.unsigned_abs());
361                    }
362                    return Ok(acp::WaitForTerminalExitResponse::new(exit_status));
363                }
364            } // borrow_mut dropped here
365
366            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
367        }
368    }
369
370    async fn release_terminal(
371        &self,
372        req: acp::ReleaseTerminalRequest,
373    ) -> acp::Result<acp::ReleaseTerminalResponse> {
374        let tid = req.terminal_id.to_string();
375        self.terminals.borrow_mut().remove(tid.as_str());
376        Ok(acp::ReleaseTerminalResponse::new())
377    }
378}