claude_codes/
client_sync.rs

1//! Synchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput, ContentBlock, ParseError};
6use crate::protocol::Protocol;
7use log::debug;
8use serde::{Deserialize, Serialize};
9use std::io::{BufRead, BufReader, Write};
10use std::process::{Child, ChildStdin, ChildStdout};
11use uuid::Uuid;
12
13/// Synchronous client for communicating with Claude
14pub struct SyncClient {
15    child: Child,
16    stdin: ChildStdin,
17    stdout: BufReader<ChildStdout>,
18    session_uuid: Option<Uuid>,
19}
20
21impl SyncClient {
22    /// Create a new synchronous client from an existing child process
23    pub fn new(mut child: Child) -> Result<Self> {
24        let stdin = child
25            .stdin
26            .take()
27            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
28        let stdout = child
29            .stdout
30            .take()
31            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
32
33        Ok(Self {
34            child,
35            stdin,
36            stdout: BufReader::new(stdout),
37            session_uuid: None,
38        })
39    }
40
41    /// Create a new synchronous client with default settings
42    pub fn with_defaults() -> Result<Self> {
43        // Check Claude version (only warns once per session)
44        // NOTE: The claude-codes API is in high flux. If you wish to work around
45        // this version check, you can use SyncClient::new() directly with:
46        //   let child = ClaudeCliBuilder::new().spawn_sync()?;
47        //   SyncClient::new(child)
48        crate::version::check_claude_version()?;
49        let child = ClaudeCliBuilder::new().spawn_sync().map_err(Error::Io)?;
50        Self::new(child)
51    }
52
53    /// Resume a previous session by UUID
54    /// This creates a new client that resumes an existing session
55    pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
56        let child = ClaudeCliBuilder::new()
57            .resume(Some(session_uuid.to_string()))
58            .spawn_sync()
59            .map_err(Error::Io)?;
60
61        debug!("Resuming Claude session with UUID: {}", session_uuid);
62        let mut client = Self::new(child)?;
63        // Pre-populate the session UUID since we're resuming
64        client.session_uuid = Some(session_uuid);
65        Ok(client)
66    }
67
68    /// Resume a previous session with a specific model
69    pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
70        let child = ClaudeCliBuilder::new()
71            .model(model)
72            .resume(Some(session_uuid.to_string()))
73            .spawn_sync()
74            .map_err(Error::Io)?;
75
76        debug!(
77            "Resuming Claude session with UUID: {} and model: {}",
78            session_uuid, model
79        );
80        let mut client = Self::new(child)?;
81        // Pre-populate the session UUID since we're resuming
82        client.session_uuid = Some(session_uuid);
83        Ok(client)
84    }
85
86    /// Send a query and collect all responses
87    pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
88        let mut responses = Vec::new();
89        for response in self.query_stream(input)? {
90            responses.push(response?);
91        }
92        Ok(responses)
93    }
94
95    /// Send a query and return an iterator over responses
96    pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
97        // Send the input
98        Protocol::write_sync(&mut self.stdin, &input)?;
99
100        Ok(ResponseIterator {
101            client: self,
102            finished: false,
103        })
104    }
105
106    /// Read the next response from Claude
107    fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
108        let mut line = String::new();
109        match self.stdout.read_line(&mut line) {
110            Ok(0) => {
111                debug!("[CLIENT] Stream closed");
112                Ok(None)
113            }
114            Ok(_) => {
115                let trimmed = line.trim();
116                if trimmed.is_empty() {
117                    debug!("[CLIENT] Skipping empty line");
118                    return self.read_next();
119                }
120
121                debug!("[CLIENT] Received: {}", trimmed);
122                match ClaudeOutput::parse_json_tolerant(trimmed) {
123                    Ok(output) => {
124                        // Capture UUID from first response if not already set
125                        if self.session_uuid.is_none() {
126                            if let ClaudeOutput::Assistant(ref msg) = output {
127                                if let Some(ref uuid_str) = msg.uuid {
128                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
129                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
130                                        self.session_uuid = Some(uuid);
131                                    }
132                                }
133                            } else if let ClaudeOutput::Result(ref msg) = output {
134                                if let Some(ref uuid_str) = msg.uuid {
135                                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
136                                        debug!("[CLIENT] Captured session UUID: {}", uuid);
137                                        self.session_uuid = Some(uuid);
138                                    }
139                                }
140                            }
141                        }
142
143                        // Check if this is a result message
144                        if matches!(output, ClaudeOutput::Result(_)) {
145                            debug!("[CLIENT] Received result message, stream complete");
146                            Ok(Some(output))
147                        } else {
148                            Ok(Some(output))
149                        }
150                    }
151                    Err(ParseError { error_message, .. }) => {
152                        debug!("[CLIENT] Failed to deserialize: {}", error_message);
153                        debug!("[CLIENT] Raw JSON that failed: {}", trimmed);
154                        Err(Error::Deserialization(format!(
155                            "{} (raw: {})",
156                            error_message, trimmed
157                        )))
158                    }
159                }
160            }
161            Err(e) => {
162                debug!("[CLIENT] Error reading from stdout: {}", e);
163                Err(Error::Io(e))
164            }
165        }
166    }
167
168    /// Shutdown the client and wait for the process to exit
169    pub fn shutdown(&mut self) -> Result<()> {
170        debug!("[CLIENT] Shutting down client");
171        self.child.kill().map_err(Error::Io)?;
172        self.child.wait().map_err(Error::Io)?;
173        Ok(())
174    }
175
176    /// Get the session UUID if available
177    /// Returns an error if no response has been received yet
178    pub fn session_uuid(&self) -> Result<Uuid> {
179        self.session_uuid.ok_or(Error::SessionNotInitialized)
180    }
181
182    /// Test if the Claude connection is working by sending a ping message
183    /// Returns true if Claude responds with "pong", false otherwise
184    pub fn ping(&mut self) -> bool {
185        // Send a simple ping request
186        let ping_input = ClaudeInput::user_message(
187            "ping - respond with just the word 'pong' and nothing else",
188            self.session_uuid.unwrap_or_else(Uuid::new_v4),
189        );
190
191        // Try to send the ping and get responses
192        match self.query(ping_input) {
193            Ok(responses) => {
194                // Check all responses for "pong"
195                for output in responses {
196                    if let ClaudeOutput::Assistant(msg) = &output {
197                        for content in &msg.message.content {
198                            if let ContentBlock::Text(text) = content {
199                                if text.text.to_lowercase().contains("pong") {
200                                    return true;
201                                }
202                            }
203                        }
204                    }
205                }
206                false
207            }
208            Err(e) => {
209                debug!("Ping failed: {}", e);
210                false
211            }
212        }
213    }
214}
215
216// Protocol extension methods for synchronous I/O
217impl Protocol {
218    /// Write a message to a synchronous writer
219    pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
220        let line = Self::serialize(message)?;
221        debug!("[PROTOCOL] Sending: {}", line.trim());
222        writer.write_all(line.as_bytes())?;
223        writer.flush()?;
224        Ok(())
225    }
226
227    /// Read a message from a synchronous reader
228    pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
229        let mut line = String::new();
230        let bytes_read = reader.read_line(&mut line)?;
231        if bytes_read == 0 {
232            return Err(Error::ConnectionClosed);
233        }
234        debug!("[PROTOCOL] Received: {}", line.trim());
235        Self::deserialize(&line)
236    }
237}
238
239/// Iterator over responses from Claude
240pub struct ResponseIterator<'a> {
241    client: &'a mut SyncClient,
242    finished: bool,
243}
244
245impl Iterator for ResponseIterator<'_> {
246    type Item = Result<ClaudeOutput>;
247
248    fn next(&mut self) -> Option<Self::Item> {
249        if self.finished {
250            return None;
251        }
252
253        match self.client.read_next() {
254            Ok(Some(output)) => {
255                // Check if this is a result message
256                if matches!(output, ClaudeOutput::Result(_)) {
257                    self.finished = true;
258                }
259                Some(Ok(output))
260            }
261            Ok(None) => {
262                self.finished = true;
263                None
264            }
265            Err(e) => {
266                self.finished = true;
267                Some(Err(e))
268            }
269        }
270    }
271}
272
273impl Drop for SyncClient {
274    fn drop(&mut self) {
275        if let Err(e) = self.shutdown() {
276            debug!("[CLIENT] Error during shutdown: {}", e);
277        }
278    }
279}
280
281/// Stream processor for handling continuous message streams
282pub struct StreamProcessor<R> {
283    reader: BufReader<R>,
284}
285
286impl<R: std::io::Read> StreamProcessor<R> {
287    /// Create a new stream processor
288    pub fn new(reader: R) -> Self {
289        Self {
290            reader: BufReader::new(reader),
291        }
292    }
293
294    /// Process the next message from the stream
295    pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
296        Protocol::read_sync(&mut self.reader)
297    }
298
299    /// Process all messages in the stream
300    pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
301    where
302        T: for<'de> Deserialize<'de>,
303        F: FnMut(T) -> Result<()>,
304    {
305        loop {
306            match self.next_message() {
307                Ok(message) => handler(message)?,
308                Err(Error::ConnectionClosed) => break,
309                Err(e) => return Err(e),
310            }
311        }
312        Ok(())
313    }
314}