Skip to main content

claude_codes/
client_sync.rs

1//! Synchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6    ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7    ControlResponseMessage, ParseError,
8};
9use crate::protocol::Protocol;
10use log::{debug, warn};
11use serde::{Deserialize, Serialize};
12use std::io::{BufRead, BufReader, Write};
13use std::process::{Child, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16/// Synchronous client for communicating with Claude
17pub struct SyncClient {
18    child: Child,
19    stdin: ChildStdin,
20    stdout: BufReader<ChildStdout>,
21    session_uuid: Option<Uuid>,
22    /// Whether tool approval protocol has been initialized
23    tool_approval_enabled: bool,
24}
25
26/// Buffer size for reading Claude's stdout (10MB).
27const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
28
29impl SyncClient {
30    /// Create a new synchronous client from an existing child process
31    pub fn new(mut child: Child) -> Result<Self> {
32        let stdin = child
33            .stdin
34            .take()
35            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
36        let stdout = child
37            .stdout
38            .take()
39            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
40
41        Ok(Self {
42            child,
43            stdin,
44            stdout: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
45            session_uuid: None,
46            tool_approval_enabled: false,
47        })
48    }
49
50    /// Create a new synchronous client with default settings
51    pub fn with_defaults() -> Result<Self> {
52        // Check Claude version (only warns once per session)
53        // NOTE: The claude-codes API is in high flux. If you wish to work around
54        // this version check, you can use SyncClient::new() directly with:
55        //   let child = ClaudeCliBuilder::new().spawn_sync()?;
56        //   SyncClient::new(child)
57        crate::version::check_claude_version()?;
58        let child = ClaudeCliBuilder::new().spawn_sync().map_err(Error::Io)?;
59        Self::new(child)
60    }
61
62    /// Resume a previous session by UUID
63    /// This creates a new client that resumes an existing session
64    pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
65        let child = ClaudeCliBuilder::new()
66            .resume(Some(session_uuid.to_string()))
67            .spawn_sync()
68            .map_err(Error::Io)?;
69
70        debug!("Resuming Claude session with UUID: {}", session_uuid);
71        let mut client = Self::new(child)?;
72        // Pre-populate the session UUID since we're resuming
73        client.session_uuid = Some(session_uuid);
74        Ok(client)
75    }
76
77    /// Resume a previous session with a specific model
78    pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
79        let child = ClaudeCliBuilder::new()
80            .model(model)
81            .resume(Some(session_uuid.to_string()))
82            .spawn_sync()
83            .map_err(Error::Io)?;
84
85        debug!(
86            "Resuming Claude session with UUID: {} and model: {}",
87            session_uuid, model
88        );
89        let mut client = Self::new(child)?;
90        // Pre-populate the session UUID since we're resuming
91        client.session_uuid = Some(session_uuid);
92        Ok(client)
93    }
94
95    /// Send a query and collect all responses
96    pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
97        let mut responses = Vec::new();
98        for response in self.query_stream(input)? {
99            responses.push(response?);
100        }
101        Ok(responses)
102    }
103
104    /// Send a query and return an iterator over responses
105    pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
106        // Send the input
107        Protocol::write_sync(&mut self.stdin, &input)?;
108
109        Ok(ResponseIterator {
110            client: self,
111            finished: false,
112        })
113    }
114
115    /// Read the next response from Claude
116    fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
117        let mut line = String::new();
118        match self.stdout.read_line(&mut line) {
119            Ok(0) => {
120                debug!("[CLIENT] Stream closed");
121                Ok(None)
122            }
123            Ok(_) => {
124                let trimmed = line.trim();
125                if trimmed.is_empty() {
126                    debug!("[CLIENT] Skipping empty line");
127                    return self.read_next();
128                }
129
130                debug!("[CLIENT] Received: {}", trimmed);
131                match ClaudeOutput::parse_json_tolerant(trimmed) {
132                    Ok(output) => {
133                        // Capture UUID from first response if not already set
134                        if self.session_uuid.is_none() {
135                            if let ClaudeOutput::Assistant(ref msg) = output {
136                                if let Some(ref uuid_str) = msg.uuid {
137                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
138                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
139                                        self.session_uuid = Some(uuid);
140                                    }
141                                }
142                            } else if let ClaudeOutput::Result(ref msg) = output {
143                                if let Some(ref uuid_str) = msg.uuid {
144                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
145                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
146                                        self.session_uuid = Some(uuid);
147                                    }
148                                }
149                            }
150                        }
151
152                        // Check if this is a result message
153                        if matches!(output, ClaudeOutput::Result(_)) {
154                            debug!("[CLIENT] Received result message, stream complete");
155                            Ok(Some(output))
156                        } else {
157                            Ok(Some(output))
158                        }
159                    }
160                    Err(ParseError { error_message, .. }) => {
161                        warn!("[CLIENT] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
162                        warn!("[CLIENT] Parse error: {}", error_message);
163                        warn!("[CLIENT] Raw message: {}", trimmed);
164                        Err(Error::Deserialization(format!(
165                            "{} (raw: {})",
166                            error_message, trimmed
167                        )))
168                    }
169                }
170            }
171            Err(e) => {
172                debug!("[CLIENT] Error reading from stdout: {}", e);
173                Err(Error::Io(e))
174            }
175        }
176    }
177
178    /// Shutdown the client and wait for the process to exit
179    pub fn shutdown(&mut self) -> Result<()> {
180        debug!("[CLIENT] Shutting down client");
181        self.child.kill().map_err(Error::Io)?;
182        self.child.wait().map_err(Error::Io)?;
183        Ok(())
184    }
185
186    /// Get the session UUID if available
187    /// Returns an error if no response has been received yet
188    pub fn session_uuid(&self) -> Result<Uuid> {
189        self.session_uuid.ok_or(Error::SessionNotInitialized)
190    }
191
192    /// Test if the Claude connection is working by sending a ping message
193    /// Returns true if Claude responds with "pong", false otherwise
194    pub fn ping(&mut self) -> bool {
195        // Send a simple ping request
196        let ping_input = ClaudeInput::user_message(
197            "ping - respond with just the word 'pong' and nothing else",
198            self.session_uuid.unwrap_or_else(Uuid::new_v4),
199        );
200
201        // Try to send the ping and get responses
202        match self.query(ping_input) {
203            Ok(responses) => {
204                // Check all responses for "pong"
205                for output in responses {
206                    if let ClaudeOutput::Assistant(msg) = &output {
207                        for content in &msg.message.content {
208                            if let ContentBlock::Text(text) = content {
209                                if text.text.to_lowercase().contains("pong") {
210                                    return true;
211                                }
212                            }
213                        }
214                    }
215                }
216                false
217            }
218            Err(e) => {
219                debug!("Ping failed: {}", e);
220                false
221            }
222        }
223    }
224
225    // =========================================================================
226    // Tool Approval Protocol
227    // =========================================================================
228
229    /// Enable the tool approval protocol by performing the initialization handshake.
230    ///
231    /// After calling this method, the CLI will send `ControlRequest` messages when
232    /// Claude wants to use a tool. You must handle these by calling
233    /// `send_control_response()` with an appropriate response.
234    ///
235    /// **Important**: The client must have been created with
236    /// `ClaudeCliBuilder::permission_prompt_tool("stdio")` for this to work.
237    ///
238    /// # Example
239    ///
240    /// ```no_run
241    /// use claude_codes::{SyncClient, ClaudeCliBuilder, ClaudeOutput, ControlRequestPayload};
242    ///
243    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
244    /// let child = ClaudeCliBuilder::new()
245    ///     .model("sonnet")
246    ///     .permission_prompt_tool("stdio")
247    ///     .spawn_sync()?;
248    ///
249    /// let mut client = SyncClient::new(child)?;
250    /// client.enable_tool_approval()?;
251    ///
252    /// // Now when you receive messages, you may get ControlRequest messages
253    /// // that need responses
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub fn enable_tool_approval(&mut self) -> Result<()> {
258        if self.tool_approval_enabled {
259            debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
260            return Ok(());
261        }
262
263        let request_id = format!("init-{}", Uuid::new_v4());
264        let init_request = ControlRequestMessage::initialize(&request_id);
265
266        debug!("[TOOL_APPROVAL] Sending initialization handshake");
267        Protocol::write_sync(&mut self.stdin, &init_request)?;
268
269        // Wait for the initialization response
270        loop {
271            let mut line = String::new();
272            let bytes_read = self.stdout.read_line(&mut line).map_err(Error::Io)?;
273
274            if bytes_read == 0 {
275                return Err(Error::ConnectionClosed);
276            }
277
278            let trimmed = line.trim();
279            if trimmed.is_empty() {
280                continue;
281            }
282
283            debug!("[TOOL_APPROVAL] Received: {}", trimmed);
284
285            // Try to parse as ClaudeOutput
286            match ClaudeOutput::parse_json_tolerant(trimmed) {
287                Ok(ClaudeOutput::ControlResponse(resp)) => {
288                    use crate::io::ControlResponsePayload;
289                    match &resp.response {
290                        ControlResponsePayload::Success {
291                            request_id: rid, ..
292                        } if rid == &request_id => {
293                            debug!("[TOOL_APPROVAL] Initialization successful");
294                            self.tool_approval_enabled = true;
295                            return Ok(());
296                        }
297                        ControlResponsePayload::Error { error, .. } => {
298                            return Err(Error::Protocol(format!(
299                                "Tool approval initialization failed: {}",
300                                error
301                            )));
302                        }
303                        _ => {
304                            // Different request_id, keep waiting
305                            continue;
306                        }
307                    }
308                }
309                Ok(_) => {
310                    // Got a different message type (system, etc.), keep waiting
311                    continue;
312                }
313                Err(e) => {
314                    return Err(Error::Deserialization(e.to_string()));
315                }
316            }
317        }
318    }
319
320    /// Send a control response back to the CLI.
321    ///
322    /// Use this to respond to `ControlRequest` messages received during tool approval.
323    /// The easiest way to create responses is using the helper methods on
324    /// `ToolPermissionRequest`:
325    ///
326    /// # Example
327    ///
328    /// ```ignore
329    /// use claude_codes::{SyncClient, ControlRequestPayload, ControlResponse, ToolPermissionRequest};
330    ///
331    /// fn handle_permission(
332    ///     client: &mut SyncClient,
333    ///     perm_req: &ToolPermissionRequest,
334    ///     request_id: &str,
335    /// ) -> claude_codes::Result<()> {
336    ///     let response = if perm_req.tool_name == "Bash" {
337    ///         perm_req.deny("Bash commands not allowed", request_id)
338    ///     } else {
339    ///         perm_req.allow(request_id)
340    ///     };
341    ///     client.send_control_response(response)
342    /// }
343    /// ```
344    pub fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
345        let message: ControlResponseMessage = response.into();
346        debug!(
347            "[TOOL_APPROVAL] Sending control response: {:?}",
348            serde_json::to_string(&message)
349        );
350        Protocol::write_sync(&mut self.stdin, &message)?;
351        Ok(())
352    }
353
354    /// Check if tool approval protocol is enabled
355    pub fn is_tool_approval_enabled(&self) -> bool {
356        self.tool_approval_enabled
357    }
358}
359
360// Protocol extension methods for synchronous I/O
361impl Protocol {
362    /// Write a message to a synchronous writer
363    pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
364        let line = Self::serialize(message)?;
365        debug!("[PROTOCOL] Sending: {}", line.trim());
366        writer.write_all(line.as_bytes())?;
367        writer.flush()?;
368        Ok(())
369    }
370
371    /// Read a message from a synchronous reader
372    pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
373        let mut line = String::new();
374        let bytes_read = reader.read_line(&mut line)?;
375        if bytes_read == 0 {
376            return Err(Error::ConnectionClosed);
377        }
378        debug!("[PROTOCOL] Received: {}", line.trim());
379        Self::deserialize(&line)
380    }
381}
382
383/// Iterator over responses from Claude
384pub struct ResponseIterator<'a> {
385    client: &'a mut SyncClient,
386    finished: bool,
387}
388
389impl Iterator for ResponseIterator<'_> {
390    type Item = Result<ClaudeOutput>;
391
392    fn next(&mut self) -> Option<Self::Item> {
393        if self.finished {
394            return None;
395        }
396
397        match self.client.read_next() {
398            Ok(Some(output)) => {
399                // Check if this is a result message
400                if matches!(output, ClaudeOutput::Result(_)) {
401                    self.finished = true;
402                }
403                Some(Ok(output))
404            }
405            Ok(None) => {
406                self.finished = true;
407                None
408            }
409            Err(e) => {
410                self.finished = true;
411                Some(Err(e))
412            }
413        }
414    }
415}
416
417impl Drop for SyncClient {
418    fn drop(&mut self) {
419        if let Err(e) = self.shutdown() {
420            debug!("[CLIENT] Error during shutdown: {}", e);
421        }
422    }
423}
424
425/// Stream processor for handling continuous message streams
426pub struct StreamProcessor<R> {
427    reader: BufReader<R>,
428}
429
430impl<R: std::io::Read> StreamProcessor<R> {
431    /// Create a new stream processor
432    pub fn new(reader: R) -> Self {
433        Self {
434            reader: BufReader::new(reader),
435        }
436    }
437
438    /// Process the next message from the stream
439    pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
440        Protocol::read_sync(&mut self.reader)
441    }
442
443    /// Process all messages in the stream
444    pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
445    where
446        T: for<'de> Deserialize<'de>,
447        F: FnMut(T) -> Result<()>,
448    {
449        loop {
450            match self.next_message() {
451                Ok(message) => handler(message)?,
452                Err(Error::ConnectionClosed) => break,
453                Err(e) => return Err(e),
454            }
455        }
456        Ok(())
457    }
458}