Skip to main content

claude_codes/
client_sync.rs

1//! Synchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6    ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7    ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, warn};
11use serde::{Deserialize, Serialize};
12use std::io::{BufRead, BufReader, Write};
13use std::process::{Child, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16/// Synchronous client for communicating with Claude
17pub struct SyncClient {
18    child: Child,
19    stdin: ChildStdin,
20    stdout: BufReader<ChildStdout>,
21    session_uuid: Option<Uuid>,
22    /// Whether tool approval protocol has been initialized
23    tool_approval_enabled: bool,
24}
25
26/// Buffer size for reading Claude's stdout (10MB).
27const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
28
29impl SyncClient {
30    /// Create a new synchronous client from an existing child process
31    pub fn new(mut child: Child) -> Result<Self> {
32        let stdin = child
33            .stdin
34            .take()
35            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
36        let stdout = child
37            .stdout
38            .take()
39            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
40
41        Ok(Self {
42            child,
43            stdin,
44            stdout: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
45            session_uuid: None,
46            tool_approval_enabled: false,
47        })
48    }
49
50    /// Create a new synchronous client with default settings
51    pub fn with_defaults() -> Result<Self> {
52        // Check Claude version (only warns once per session)
53        // NOTE: The claude-codes API is in high flux. If you wish to work around
54        // this version check, you can use SyncClient::new() directly with:
55        //   let child = ClaudeCliBuilder::new().spawn_sync()?;
56        //   SyncClient::new(child)
57        crate::version::check_claude_version()?;
58        let child = ClaudeCliBuilder::new().spawn_sync()?;
59        Self::new(child)
60    }
61
62    /// Resume a previous session by UUID
63    /// This creates a new client that resumes an existing session
64    pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
65        let child = ClaudeCliBuilder::new()
66            .resume(Some(session_uuid.to_string()))
67            .spawn_sync()?;
68
69        debug!("Resuming Claude session with UUID: {}", session_uuid);
70        let mut client = Self::new(child)?;
71        // Pre-populate the session UUID since we're resuming
72        client.session_uuid = Some(session_uuid);
73        Ok(client)
74    }
75
76    /// Resume a previous session with a specific model
77    pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
78        let child = ClaudeCliBuilder::new()
79            .model(model)
80            .resume(Some(session_uuid.to_string()))
81            .spawn_sync()?;
82
83        debug!(
84            "Resuming Claude session with UUID: {} and model: {}",
85            session_uuid, model
86        );
87        let mut client = Self::new(child)?;
88        // Pre-populate the session UUID since we're resuming
89        client.session_uuid = Some(session_uuid);
90        Ok(client)
91    }
92
93    /// Send a query and collect all responses
94    pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
95        let mut responses = Vec::new();
96        for response in self.query_stream(input)? {
97            responses.push(response?);
98        }
99        Ok(responses)
100    }
101
102    /// Send a query and return an iterator over responses
103    pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
104        // Send the input
105        Protocol::write_sync(&mut self.stdin, &input)?;
106
107        Ok(ResponseIterator {
108            client: self,
109            finished: false,
110        })
111    }
112
113    /// Read the next response from Claude
114    fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
115        let mut line = String::new();
116        match self.stdout.read_line(&mut line) {
117            Ok(0) => {
118                debug!("[CLIENT] Stream closed");
119                Ok(None)
120            }
121            Ok(_) => {
122                let trimmed = line.trim();
123                if trimmed.is_empty() {
124                    debug!("[CLIENT] Skipping empty line");
125                    return self.read_next();
126                }
127
128                debug!("[CLIENT] Received: {}", trimmed);
129                match ClaudeOutput::parse_json_tolerant(trimmed) {
130                    Ok(output) => {
131                        // Capture UUID from first response if not already set
132                        if self.session_uuid.is_none() {
133                            if let ClaudeOutput::Assistant(ref msg) = output {
134                                if let Some(ref uuid_str) = msg.uuid {
135                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
136                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
137                                        self.session_uuid = Some(uuid);
138                                    }
139                                }
140                            } else if let ClaudeOutput::Result(ref msg) = output {
141                                if let Some(ref uuid_str) = msg.uuid {
142                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
143                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
144                                        self.session_uuid = Some(uuid);
145                                    }
146                                }
147                            }
148                        }
149
150                        // Check if this is a result message
151                        if matches!(output, ClaudeOutput::Result(_)) {
152                            debug!("[CLIENT] Received result message, stream complete");
153                            Ok(Some(output))
154                        } else {
155                            Ok(Some(output))
156                        }
157                    }
158                    Err(parse_error) => {
159                        warn!("[CLIENT] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
160                        warn!("[CLIENT] Parse error: {}", parse_error.error_message);
161                        warn!("[CLIENT] Raw message: {}", trimmed);
162                        Err(parse_error.into())
163                    }
164                }
165            }
166            Err(e) => {
167                debug!("[CLIENT] Error reading from stdout: {}", e);
168                Err(Error::Io(e))
169            }
170        }
171    }
172
173    /// Shutdown the client and wait for the process to exit
174    pub fn shutdown(&mut self) -> Result<()> {
175        debug!("[CLIENT] Shutting down client");
176        self.child.kill().map_err(Error::Io)?;
177        self.child.wait().map_err(Error::Io)?;
178        Ok(())
179    }
180
181    /// Get the session UUID if available
182    /// Returns an error if no response has been received yet
183    pub fn session_uuid(&self) -> Result<Uuid> {
184        self.session_uuid.ok_or(Error::SessionNotInitialized)
185    }
186
187    /// Test if the Claude connection is working by sending a ping message
188    /// Returns true if Claude responds with "pong", false otherwise
189    pub fn ping(&mut self) -> bool {
190        // Send a simple ping request
191        let ping_input = ClaudeInput::user_message(
192            "ping - respond with just the word 'pong' and nothing else",
193            self.session_uuid.unwrap_or_else(Uuid::new_v4),
194        );
195
196        // Try to send the ping and get responses
197        match self.query(ping_input) {
198            Ok(responses) => {
199                // Check all responses for "pong"
200                for output in responses {
201                    if let ClaudeOutput::Assistant(msg) = &output {
202                        for content in &msg.message.content {
203                            if let ContentBlock::Text(text) = content {
204                                if text.text.to_lowercase().contains("pong") {
205                                    return true;
206                                }
207                            }
208                        }
209                    }
210                }
211                false
212            }
213            Err(e) => {
214                debug!("Ping failed: {}", e);
215                false
216            }
217        }
218    }
219
220    // =========================================================================
221    // Tool Approval Protocol
222    // =========================================================================
223
224    /// Enable the tool approval protocol by performing the initialization handshake.
225    ///
226    /// After calling this method, the CLI will send `ControlRequest` messages when
227    /// Claude wants to use a tool. You must handle these by calling
228    /// `send_control_response()` with an appropriate response.
229    ///
230    /// **Important**: The client must have been created with
231    /// `ClaudeCliBuilder::permission_prompt_tool("stdio")` for this to work.
232    ///
233    /// # Example
234    ///
235    /// ```no_run
236    /// use claude_codes::{SyncClient, ClaudeCliBuilder, ClaudeOutput, ControlRequestPayload};
237    ///
238    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
239    /// let child = ClaudeCliBuilder::new()
240    ///     .model("sonnet")
241    ///     .permission_prompt_tool("stdio")
242    ///     .spawn_sync()?;
243    ///
244    /// let mut client = SyncClient::new(child)?;
245    /// client.enable_tool_approval()?;
246    ///
247    /// // Now when you receive messages, you may get ControlRequest messages
248    /// // that need responses
249    /// # Ok(())
250    /// # }
251    /// ```
252    pub fn enable_tool_approval(&mut self) -> Result<()> {
253        if self.tool_approval_enabled {
254            debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
255            return Ok(());
256        }
257
258        let request_id = format!("init-{}", Uuid::new_v4());
259        let init_request = ControlRequestMessage::initialize(&request_id);
260
261        debug!("[TOOL_APPROVAL] Sending initialization handshake");
262        Protocol::write_sync(&mut self.stdin, &init_request)?;
263
264        // Wait for the initialization response
265        loop {
266            let mut line = String::new();
267            let bytes_read = self.stdout.read_line(&mut line).map_err(Error::Io)?;
268
269            if bytes_read == 0 {
270                return Err(Error::ConnectionClosed);
271            }
272
273            let trimmed = line.trim();
274            if trimmed.is_empty() {
275                continue;
276            }
277
278            debug!("[TOOL_APPROVAL] Received: {}", trimmed);
279
280            // Try to parse as ClaudeOutput
281            match ClaudeOutput::parse_json_tolerant(trimmed) {
282                Ok(ClaudeOutput::ControlResponse(resp)) => {
283                    use crate::io::ControlResponsePayload;
284                    match &resp.response {
285                        ControlResponsePayload::Success {
286                            request_id: rid, ..
287                        } if rid == &request_id => {
288                            debug!("[TOOL_APPROVAL] Initialization successful");
289                            self.tool_approval_enabled = true;
290                            return Ok(());
291                        }
292                        ControlResponsePayload::Error { error, .. } => {
293                            return Err(Error::Protocol(format!(
294                                "Tool approval initialization failed: {}",
295                                error
296                            )));
297                        }
298                        _ => {
299                            // Different request_id, keep waiting
300                            continue;
301                        }
302                    }
303                }
304                Ok(_) => {
305                    // Got a different message type (system, etc.), keep waiting
306                    continue;
307                }
308                Err(e) => {
309                    return Err(e.into());
310                }
311            }
312        }
313    }
314
315    /// Send a control response back to the CLI.
316    ///
317    /// Use this to respond to `ControlRequest` messages received during tool approval.
318    /// The easiest way to create responses is using the helper methods on
319    /// `ToolPermissionRequest`:
320    ///
321    /// # Example
322    ///
323    /// ```ignore
324    /// use claude_codes::{SyncClient, ControlRequestPayload, ControlResponse, ToolPermissionRequest};
325    ///
326    /// fn handle_permission(
327    ///     client: &mut SyncClient,
328    ///     perm_req: &ToolPermissionRequest,
329    ///     request_id: &str,
330    /// ) -> claude_codes::Result<()> {
331    ///     let response = if perm_req.tool_name == "Bash" {
332    ///         perm_req.deny("Bash commands not allowed", request_id)
333    ///     } else {
334    ///         perm_req.allow(request_id)
335    ///     };
336    ///     client.send_control_response(response)
337    /// }
338    /// ```
339    pub fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
340        let message: ControlResponseMessage = response.into();
341        debug!(
342            "[TOOL_APPROVAL] Sending control response: {:?}",
343            serde_json::to_string(&message)
344        );
345        Protocol::write_sync(&mut self.stdin, &message)?;
346        Ok(())
347    }
348
349    /// Send an interrupt to gracefully stop the current response.
350    ///
351    /// This writes `{ "subtype": "interrupt" }` to stdin, telling Claude
352    /// to stop without killing the session.
353    pub fn interrupt(&mut self) -> Result<()> {
354        let input = ClaudeInput::interrupt();
355        Protocol::write_sync(&mut self.stdin, &input)?;
356        Ok(())
357    }
358
359    /// Check if tool approval protocol is enabled
360    pub fn is_tool_approval_enabled(&self) -> bool {
361        self.tool_approval_enabled
362    }
363}
364
365// Protocol extension methods for synchronous I/O
366impl Protocol {
367    /// Write a message to a synchronous writer
368    pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
369        let line = Self::serialize(message)?;
370        debug!("[PROTOCOL] Sending: {}", line.trim());
371        writer.write_all(line.as_bytes())?;
372        writer.flush()?;
373        Ok(())
374    }
375
376    /// Read a message from a synchronous reader
377    pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
378        let mut line = String::new();
379        let bytes_read = reader.read_line(&mut line)?;
380        if bytes_read == 0 {
381            return Err(Error::ConnectionClosed);
382        }
383        debug!("[PROTOCOL] Received: {}", line.trim());
384        Self::deserialize(&line)
385    }
386}
387
388/// Iterator over responses from Claude
389pub struct ResponseIterator<'a> {
390    client: &'a mut SyncClient,
391    finished: bool,
392}
393
394impl Iterator for ResponseIterator<'_> {
395    type Item = Result<ClaudeOutput>;
396
397    fn next(&mut self) -> Option<Self::Item> {
398        if self.finished {
399            return None;
400        }
401
402        match self.client.read_next() {
403            Ok(Some(output)) => {
404                // Check if this is a result message
405                if matches!(output, ClaudeOutput::Result(_)) {
406                    self.finished = true;
407                }
408                Some(Ok(output))
409            }
410            Ok(None) => {
411                self.finished = true;
412                None
413            }
414            Err(e) => {
415                self.finished = true;
416                Some(Err(e))
417            }
418        }
419    }
420}
421
422impl Drop for SyncClient {
423    fn drop(&mut self) {
424        if let Err(e) = self.shutdown() {
425            debug!("[CLIENT] Error during shutdown: {}", e);
426        }
427    }
428}
429
430/// Stream processor for handling continuous message streams
431pub struct StreamProcessor<R> {
432    reader: BufReader<R>,
433}
434
435impl<R: std::io::Read> StreamProcessor<R> {
436    /// Create a new stream processor
437    pub fn new(reader: R) -> Self {
438        Self {
439            reader: BufReader::new(reader),
440        }
441    }
442
443    /// Process the next message from the stream
444    pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
445        Protocol::read_sync(&mut self.reader)
446    }
447
448    /// Process all messages in the stream
449    pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
450    where
451        T: for<'de> Deserialize<'de>,
452        F: FnMut(T) -> Result<()>,
453    {
454        loop {
455            match self.next_message() {
456                Ok(message) => handler(message)?,
457                Err(Error::ConnectionClosed) => break,
458                Err(e) => return Err(e),
459            }
460        }
461        Ok(())
462    }
463}