claude_codes/
client_sync.rs

1//! Synchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6    ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7    ControlResponseMessage, ParseError,
8};
9use crate::protocol::Protocol;
10use log::debug;
11use serde::{Deserialize, Serialize};
12use std::io::{BufRead, BufReader, Write};
13use std::process::{Child, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16/// Synchronous client for communicating with Claude
17pub struct SyncClient {
18    child: Child,
19    stdin: ChildStdin,
20    stdout: BufReader<ChildStdout>,
21    session_uuid: Option<Uuid>,
22    /// Whether tool approval protocol has been initialized
23    tool_approval_enabled: bool,
24}
25
26impl SyncClient {
27    /// Create a new synchronous client from an existing child process
28    pub fn new(mut child: Child) -> Result<Self> {
29        let stdin = child
30            .stdin
31            .take()
32            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
33        let stdout = child
34            .stdout
35            .take()
36            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
37
38        Ok(Self {
39            child,
40            stdin,
41            stdout: BufReader::new(stdout),
42            session_uuid: None,
43            tool_approval_enabled: false,
44        })
45    }
46
47    /// Create a new synchronous client with default settings
48    pub fn with_defaults() -> Result<Self> {
49        // Check Claude version (only warns once per session)
50        // NOTE: The claude-codes API is in high flux. If you wish to work around
51        // this version check, you can use SyncClient::new() directly with:
52        //   let child = ClaudeCliBuilder::new().spawn_sync()?;
53        //   SyncClient::new(child)
54        crate::version::check_claude_version()?;
55        let child = ClaudeCliBuilder::new().spawn_sync().map_err(Error::Io)?;
56        Self::new(child)
57    }
58
59    /// Resume a previous session by UUID
60    /// This creates a new client that resumes an existing session
61    pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
62        let child = ClaudeCliBuilder::new()
63            .resume(Some(session_uuid.to_string()))
64            .spawn_sync()
65            .map_err(Error::Io)?;
66
67        debug!("Resuming Claude session with UUID: {}", session_uuid);
68        let mut client = Self::new(child)?;
69        // Pre-populate the session UUID since we're resuming
70        client.session_uuid = Some(session_uuid);
71        Ok(client)
72    }
73
74    /// Resume a previous session with a specific model
75    pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
76        let child = ClaudeCliBuilder::new()
77            .model(model)
78            .resume(Some(session_uuid.to_string()))
79            .spawn_sync()
80            .map_err(Error::Io)?;
81
82        debug!(
83            "Resuming Claude session with UUID: {} and model: {}",
84            session_uuid, model
85        );
86        let mut client = Self::new(child)?;
87        // Pre-populate the session UUID since we're resuming
88        client.session_uuid = Some(session_uuid);
89        Ok(client)
90    }
91
92    /// Send a query and collect all responses
93    pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
94        let mut responses = Vec::new();
95        for response in self.query_stream(input)? {
96            responses.push(response?);
97        }
98        Ok(responses)
99    }
100
101    /// Send a query and return an iterator over responses
102    pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
103        // Send the input
104        Protocol::write_sync(&mut self.stdin, &input)?;
105
106        Ok(ResponseIterator {
107            client: self,
108            finished: false,
109        })
110    }
111
112    /// Read the next response from Claude
113    fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
114        let mut line = String::new();
115        match self.stdout.read_line(&mut line) {
116            Ok(0) => {
117                debug!("[CLIENT] Stream closed");
118                Ok(None)
119            }
120            Ok(_) => {
121                let trimmed = line.trim();
122                if trimmed.is_empty() {
123                    debug!("[CLIENT] Skipping empty line");
124                    return self.read_next();
125                }
126
127                debug!("[CLIENT] Received: {}", trimmed);
128                match ClaudeOutput::parse_json_tolerant(trimmed) {
129                    Ok(output) => {
130                        // Capture UUID from first response if not already set
131                        if self.session_uuid.is_none() {
132                            if let ClaudeOutput::Assistant(ref msg) = output {
133                                if let Some(ref uuid_str) = msg.uuid {
134                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
135                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
136                                        self.session_uuid = Some(uuid);
137                                    }
138                                }
139                            } else if let ClaudeOutput::Result(ref msg) = output {
140                                if let Some(ref uuid_str) = msg.uuid {
141                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
142                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
143                                        self.session_uuid = Some(uuid);
144                                    }
145                                }
146                            }
147                        }
148
149                        // Check if this is a result message
150                        if matches!(output, ClaudeOutput::Result(_)) {
151                            debug!("[CLIENT] Received result message, stream complete");
152                            Ok(Some(output))
153                        } else {
154                            Ok(Some(output))
155                        }
156                    }
157                    Err(ParseError { error_message, .. }) => {
158                        debug!("[CLIENT] Failed to deserialize: {}", error_message);
159                        debug!("[CLIENT] Raw JSON that failed: {}", trimmed);
160                        Err(Error::Deserialization(format!(
161                            "{} (raw: {})",
162                            error_message, trimmed
163                        )))
164                    }
165                }
166            }
167            Err(e) => {
168                debug!("[CLIENT] Error reading from stdout: {}", e);
169                Err(Error::Io(e))
170            }
171        }
172    }
173
174    /// Shutdown the client and wait for the process to exit
175    pub fn shutdown(&mut self) -> Result<()> {
176        debug!("[CLIENT] Shutting down client");
177        self.child.kill().map_err(Error::Io)?;
178        self.child.wait().map_err(Error::Io)?;
179        Ok(())
180    }
181
182    /// Get the session UUID if available
183    /// Returns an error if no response has been received yet
184    pub fn session_uuid(&self) -> Result<Uuid> {
185        self.session_uuid.ok_or(Error::SessionNotInitialized)
186    }
187
188    /// Test if the Claude connection is working by sending a ping message
189    /// Returns true if Claude responds with "pong", false otherwise
190    pub fn ping(&mut self) -> bool {
191        // Send a simple ping request
192        let ping_input = ClaudeInput::user_message(
193            "ping - respond with just the word 'pong' and nothing else",
194            self.session_uuid.unwrap_or_else(Uuid::new_v4),
195        );
196
197        // Try to send the ping and get responses
198        match self.query(ping_input) {
199            Ok(responses) => {
200                // Check all responses for "pong"
201                for output in responses {
202                    if let ClaudeOutput::Assistant(msg) = &output {
203                        for content in &msg.message.content {
204                            if let ContentBlock::Text(text) = content {
205                                if text.text.to_lowercase().contains("pong") {
206                                    return true;
207                                }
208                            }
209                        }
210                    }
211                }
212                false
213            }
214            Err(e) => {
215                debug!("Ping failed: {}", e);
216                false
217            }
218        }
219    }
220
221    // =========================================================================
222    // Tool Approval Protocol
223    // =========================================================================
224
225    /// Enable the tool approval protocol by performing the initialization handshake.
226    ///
227    /// After calling this method, the CLI will send `ControlRequest` messages when
228    /// Claude wants to use a tool. You must handle these by calling
229    /// `send_control_response()` with an appropriate response.
230    ///
231    /// **Important**: The client must have been created with
232    /// `ClaudeCliBuilder::permission_prompt_tool("stdio")` for this to work.
233    ///
234    /// # Example
235    ///
236    /// ```no_run
237    /// use claude_codes::{SyncClient, ClaudeCliBuilder, ClaudeOutput, ControlRequestPayload};
238    ///
239    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
240    /// let child = ClaudeCliBuilder::new()
241    ///     .model("sonnet")
242    ///     .permission_prompt_tool("stdio")
243    ///     .spawn_sync()?;
244    ///
245    /// let mut client = SyncClient::new(child)?;
246    /// client.enable_tool_approval()?;
247    ///
248    /// // Now when you receive messages, you may get ControlRequest messages
249    /// // that need responses
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub fn enable_tool_approval(&mut self) -> Result<()> {
254        if self.tool_approval_enabled {
255            debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
256            return Ok(());
257        }
258
259        let request_id = format!("init-{}", Uuid::new_v4());
260        let init_request = ControlRequestMessage::initialize(&request_id);
261
262        debug!("[TOOL_APPROVAL] Sending initialization handshake");
263        Protocol::write_sync(&mut self.stdin, &init_request)?;
264
265        // Wait for the initialization response
266        loop {
267            let mut line = String::new();
268            let bytes_read = self.stdout.read_line(&mut line).map_err(Error::Io)?;
269
270            if bytes_read == 0 {
271                return Err(Error::ConnectionClosed);
272            }
273
274            let trimmed = line.trim();
275            if trimmed.is_empty() {
276                continue;
277            }
278
279            debug!("[TOOL_APPROVAL] Received: {}", trimmed);
280
281            // Try to parse as ClaudeOutput
282            match ClaudeOutput::parse_json_tolerant(trimmed) {
283                Ok(ClaudeOutput::ControlResponse(resp)) => {
284                    use crate::io::ControlResponsePayload;
285                    match &resp.response {
286                        ControlResponsePayload::Success {
287                            request_id: rid, ..
288                        } if rid == &request_id => {
289                            debug!("[TOOL_APPROVAL] Initialization successful");
290                            self.tool_approval_enabled = true;
291                            return Ok(());
292                        }
293                        ControlResponsePayload::Error { error, .. } => {
294                            return Err(Error::Protocol(format!(
295                                "Tool approval initialization failed: {}",
296                                error
297                            )));
298                        }
299                        _ => {
300                            // Different request_id, keep waiting
301                            continue;
302                        }
303                    }
304                }
305                Ok(_) => {
306                    // Got a different message type (system, etc.), keep waiting
307                    continue;
308                }
309                Err(e) => {
310                    return Err(Error::Deserialization(e.to_string()));
311                }
312            }
313        }
314    }
315
316    /// Send a control response back to the CLI.
317    ///
318    /// Use this to respond to `ControlRequest` messages received during tool approval.
319    /// The easiest way to create responses is using the helper methods on
320    /// `ToolPermissionRequest`:
321    ///
322    /// # Example
323    ///
324    /// ```ignore
325    /// use claude_codes::{SyncClient, ControlRequestPayload, ControlResponse, ToolPermissionRequest};
326    ///
327    /// fn handle_permission(
328    ///     client: &mut SyncClient,
329    ///     perm_req: &ToolPermissionRequest,
330    ///     request_id: &str,
331    /// ) -> claude_codes::Result<()> {
332    ///     let response = if perm_req.tool_name == "Bash" {
333    ///         perm_req.deny("Bash commands not allowed", request_id)
334    ///     } else {
335    ///         perm_req.allow(request_id)
336    ///     };
337    ///     client.send_control_response(response)
338    /// }
339    /// ```
340    pub fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
341        let message: ControlResponseMessage = response.into();
342        debug!(
343            "[TOOL_APPROVAL] Sending control response: {:?}",
344            serde_json::to_string(&message)
345        );
346        Protocol::write_sync(&mut self.stdin, &message)?;
347        Ok(())
348    }
349
350    /// Check if tool approval protocol is enabled
351    pub fn is_tool_approval_enabled(&self) -> bool {
352        self.tool_approval_enabled
353    }
354}
355
356// Protocol extension methods for synchronous I/O
357impl Protocol {
358    /// Write a message to a synchronous writer
359    pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
360        let line = Self::serialize(message)?;
361        debug!("[PROTOCOL] Sending: {}", line.trim());
362        writer.write_all(line.as_bytes())?;
363        writer.flush()?;
364        Ok(())
365    }
366
367    /// Read a message from a synchronous reader
368    pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
369        let mut line = String::new();
370        let bytes_read = reader.read_line(&mut line)?;
371        if bytes_read == 0 {
372            return Err(Error::ConnectionClosed);
373        }
374        debug!("[PROTOCOL] Received: {}", line.trim());
375        Self::deserialize(&line)
376    }
377}
378
379/// Iterator over responses from Claude
380pub struct ResponseIterator<'a> {
381    client: &'a mut SyncClient,
382    finished: bool,
383}
384
385impl Iterator for ResponseIterator<'_> {
386    type Item = Result<ClaudeOutput>;
387
388    fn next(&mut self) -> Option<Self::Item> {
389        if self.finished {
390            return None;
391        }
392
393        match self.client.read_next() {
394            Ok(Some(output)) => {
395                // Check if this is a result message
396                if matches!(output, ClaudeOutput::Result(_)) {
397                    self.finished = true;
398                }
399                Some(Ok(output))
400            }
401            Ok(None) => {
402                self.finished = true;
403                None
404            }
405            Err(e) => {
406                self.finished = true;
407                Some(Err(e))
408            }
409        }
410    }
411}
412
413impl Drop for SyncClient {
414    fn drop(&mut self) {
415        if let Err(e) = self.shutdown() {
416            debug!("[CLIENT] Error during shutdown: {}", e);
417        }
418    }
419}
420
421/// Stream processor for handling continuous message streams
422pub struct StreamProcessor<R> {
423    reader: BufReader<R>,
424}
425
426impl<R: std::io::Read> StreamProcessor<R> {
427    /// Create a new stream processor
428    pub fn new(reader: R) -> Self {
429        Self {
430            reader: BufReader::new(reader),
431        }
432    }
433
434    /// Process the next message from the stream
435    pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
436        Protocol::read_sync(&mut self.reader)
437    }
438
439    /// Process all messages in the stream
440    pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
441    where
442        T: for<'de> Deserialize<'de>,
443        F: FnMut(T) -> Result<()>,
444    {
445        loop {
446            match self.next_message() {
447                Ok(message) => handler(message)?,
448                Err(Error::ConnectionClosed) => break,
449                Err(e) => return Err(e),
450            }
451        }
452        Ok(())
453    }
454}