claude_codes/
client_async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput, ContentBlock};
6use crate::protocol::Protocol;
7use log::{debug, error, info};
8use serde::{Deserialize, Serialize};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
10use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
11use uuid::Uuid;
12
13/// Asynchronous client for communicating with Claude
14pub struct AsyncClient {
15    child: Child,
16    stdin: ChildStdin,
17    stdout: BufReader<ChildStdout>,
18    stderr: Option<BufReader<ChildStderr>>,
19    session_uuid: Option<Uuid>,
20}
21
22impl AsyncClient {
23    /// Create a new async client from a tokio Child process
24    pub fn new(mut child: Child) -> Result<Self> {
25        let stdin = child
26            .stdin
27            .take()
28            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
29
30        let stdout = BufReader::new(
31            child
32                .stdout
33                .take()
34                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
35        );
36
37        let stderr = child.stderr.take().map(BufReader::new);
38
39        Ok(Self {
40            child,
41            stdin,
42            stdout,
43            stderr,
44            session_uuid: None,
45        })
46    }
47
48    /// Create a client with default settings (using logic from start_claude)
49    pub async fn with_defaults() -> Result<Self> {
50        // Check Claude version (only warns once per session)
51        // NOTE: The claude-codes API is in high flux. If you wish to work around
52        // this version check, you can use AsyncClient::new() directly with:
53        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
54        //   AsyncClient::new(child)
55        crate::version::check_claude_version_async().await?;
56        Self::with_model("sonnet").await
57    }
58
59    /// Create a client with a specific model
60    pub async fn with_model(model: &str) -> Result<Self> {
61        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
62
63        info!("Started Claude process with model: {}", model);
64        Self::new(child)
65    }
66
67    /// Create a client from a custom builder
68    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
69        let child = builder.spawn().await?;
70        info!("Started Claude process from custom builder");
71        Self::new(child)
72    }
73
74    /// Resume a previous session by UUID
75    /// This creates a new client that resumes an existing session
76    pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
77        let child = ClaudeCliBuilder::new()
78            .resume(Some(session_uuid.to_string()))
79            .spawn()
80            .await?;
81
82        info!("Resuming Claude session with UUID: {}", session_uuid);
83        let mut client = Self::new(child)?;
84        // Pre-populate the session UUID since we're resuming
85        client.session_uuid = Some(session_uuid);
86        Ok(client)
87    }
88
89    /// Resume a previous session with a specific model
90    pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
91        let child = ClaudeCliBuilder::new()
92            .model(model)
93            .resume(Some(session_uuid.to_string()))
94            .spawn()
95            .await?;
96
97        info!(
98            "Resuming Claude session with UUID: {} and model: {}",
99            session_uuid, model
100        );
101        let mut client = Self::new(child)?;
102        // Pre-populate the session UUID since we're resuming
103        client.session_uuid = Some(session_uuid);
104        Ok(client)
105    }
106
107    /// Send a query and collect all responses until Result message
108    /// This is the simplified version that collects all responses
109    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
110        let session_id = Uuid::new_v4();
111        self.query_with_session(text, session_id).await
112    }
113
114    /// Send a query with a custom session ID and collect all responses
115    pub async fn query_with_session(
116        &mut self,
117        text: &str,
118        session_id: Uuid,
119    ) -> Result<Vec<ClaudeOutput>> {
120        // Send the query
121        let input = ClaudeInput::user_message(text, session_id);
122        self.send(&input).await?;
123
124        // Collect responses until we get a Result message
125        let mut responses = Vec::new();
126
127        loop {
128            let output = self.receive().await?;
129            let is_result = matches!(&output, ClaudeOutput::Result(_));
130            responses.push(output);
131
132            if is_result {
133                break;
134            }
135        }
136
137        Ok(responses)
138    }
139
140    /// Send a query and return an async iterator over responses
141    /// Returns a stream that yields ClaudeOutput until Result message is received
142    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
143        let session_id = Uuid::new_v4();
144        self.query_stream_with_session(text, session_id).await
145    }
146
147    /// Send a query with session ID and return an async iterator over responses
148    pub async fn query_stream_with_session(
149        &mut self,
150        text: &str,
151        session_id: Uuid,
152    ) -> Result<ResponseStream<'_>> {
153        // Send the query first
154        let input = ClaudeInput::user_message(text, session_id);
155        self.send(&input).await?;
156
157        // Return a stream that will read responses
158        Ok(ResponseStream {
159            client: self,
160            finished: false,
161        })
162    }
163
164    /// Send a ClaudeInput directly
165    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
166        let json_line = Protocol::serialize(input)?;
167        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
168
169        self.stdin
170            .write_all(json_line.as_bytes())
171            .await
172            .map_err(Error::Io)?;
173
174        self.stdin.flush().await.map_err(Error::Io)?;
175        Ok(())
176    }
177
178    /// Try to receive a single response
179    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
180        let mut line = String::new();
181
182        loop {
183            line.clear();
184            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
185
186            if bytes_read == 0 {
187                return Err(Error::ConnectionClosed);
188            }
189
190            let trimmed = line.trim();
191            if trimmed.is_empty() {
192                continue;
193            }
194
195            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
196
197            // Use the parse_json_tolerant method which handles ANSI escape codes
198            match ClaudeOutput::parse_json_tolerant(trimmed) {
199                Ok(output) => {
200                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
201
202                    // Capture UUID from first response if not already set
203                    if self.session_uuid.is_none() {
204                        if let ClaudeOutput::Assistant(ref msg) = output {
205                            if let Some(ref uuid_str) = msg.uuid {
206                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
207                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
208                                    self.session_uuid = Some(uuid);
209                                }
210                            }
211                        } else if let ClaudeOutput::Result(ref msg) = output {
212                            if let Some(ref uuid_str) = msg.uuid {
213                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
214                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
215                                    self.session_uuid = Some(uuid);
216                                }
217                            }
218                        }
219                    }
220
221                    return Ok(output);
222                }
223                Err(parse_error) => {
224                    error!("[INCOMING] Failed to deserialize: {}", parse_error);
225                    error!("[INCOMING] Raw JSON that failed: {}", trimmed);
226                    // Convert ParseError to our Error type
227                    return Err(Error::Deserialization(format!(
228                        "{} (raw: {})",
229                        parse_error.error_message, trimmed
230                    )));
231                }
232            }
233        }
234    }
235
236    /// Check if the Claude process is still running
237    pub fn is_alive(&mut self) -> bool {
238        self.child.try_wait().ok().flatten().is_none()
239    }
240
241    /// Gracefully shutdown the client
242    pub async fn shutdown(mut self) -> Result<()> {
243        info!("Shutting down Claude process...");
244        self.child.kill().await.map_err(Error::Io)?;
245        Ok(())
246    }
247
248    /// Get the process ID
249    pub fn pid(&self) -> Option<u32> {
250        self.child.id()
251    }
252
253    /// Take the stderr reader (can only be called once)
254    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
255        self.stderr.take()
256    }
257
258    /// Get the session UUID if available
259    /// Returns an error if no response has been received yet
260    pub fn session_uuid(&self) -> Result<Uuid> {
261        self.session_uuid.ok_or(Error::SessionNotInitialized)
262    }
263
264    /// Test if the Claude connection is working by sending a ping message
265    /// Returns true if Claude responds with "pong", false otherwise
266    pub async fn ping(&mut self) -> bool {
267        // Send a simple ping request
268        let ping_input = ClaudeInput::user_message(
269            "ping - respond with just the word 'pong' and nothing else",
270            self.session_uuid.unwrap_or_else(Uuid::new_v4),
271        );
272
273        // Try to send the ping
274        if let Err(e) = self.send(&ping_input).await {
275            debug!("Ping failed to send: {}", e);
276            return false;
277        }
278
279        // Try to receive responses until we get a result or error
280        let mut found_pong = false;
281        let mut message_count = 0;
282        const MAX_MESSAGES: usize = 10;
283
284        loop {
285            match self.receive().await {
286                Ok(output) => {
287                    message_count += 1;
288
289                    // Check if it's an assistant message containing "pong"
290                    if let ClaudeOutput::Assistant(msg) = &output {
291                        for content in &msg.message.content {
292                            if let ContentBlock::Text(text) = content {
293                                if text.text.to_lowercase().contains("pong") {
294                                    found_pong = true;
295                                }
296                            }
297                        }
298                    }
299
300                    // Stop on result message
301                    if matches!(output, ClaudeOutput::Result(_)) {
302                        break;
303                    }
304
305                    // Safety limit
306                    if message_count >= MAX_MESSAGES {
307                        debug!("Ping exceeded message limit");
308                        break;
309                    }
310                }
311                Err(e) => {
312                    debug!("Ping failed to receive response: {}", e);
313                    break;
314                }
315            }
316        }
317
318        found_pong
319    }
320}
321
322/// A response stream that yields ClaudeOutput messages
323/// Holds a reference to the client to read from
324pub struct ResponseStream<'a> {
325    client: &'a mut AsyncClient,
326    finished: bool,
327}
328
329impl ResponseStream<'_> {
330    /// Convert to a vector by collecting all responses
331    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
332        let mut responses = Vec::new();
333
334        while !self.finished {
335            let output = self.client.receive().await?;
336            let is_result = matches!(&output, ClaudeOutput::Result(_));
337            responses.push(output);
338
339            if is_result {
340                self.finished = true;
341                break;
342            }
343        }
344
345        Ok(responses)
346    }
347
348    /// Get the next response
349    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
350        if self.finished {
351            return None;
352        }
353
354        match self.client.receive().await {
355            Ok(output) => {
356                if matches!(&output, ClaudeOutput::Result(_)) {
357                    self.finished = true;
358                }
359                Some(Ok(output))
360            }
361            Err(e) => {
362                self.finished = true;
363                Some(Err(e))
364            }
365        }
366    }
367}
368
369impl Drop for AsyncClient {
370    fn drop(&mut self) {
371        if self.is_alive() {
372            // Try to kill the process
373            if let Err(e) = self.child.start_kill() {
374                error!("Failed to kill Claude process on drop: {}", e);
375            }
376        }
377    }
378}
379
380// Protocol extension methods for asynchronous I/O
381impl Protocol {
382    /// Write a message to an async writer
383    pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
384        writer: &mut W,
385        message: &T,
386    ) -> Result<()> {
387        let line = Self::serialize(message)?;
388        debug!("[PROTOCOL] Sending async: {}", line.trim());
389        writer.write_all(line.as_bytes()).await?;
390        writer.flush().await?;
391        Ok(())
392    }
393
394    /// Read a message from an async reader
395    pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
396        reader: &mut R,
397    ) -> Result<T> {
398        let mut line = String::new();
399        let bytes_read = reader.read_line(&mut line).await?;
400        if bytes_read == 0 {
401            return Err(Error::ConnectionClosed);
402        }
403        debug!("[PROTOCOL] Received async: {}", line.trim());
404        Self::deserialize(&line)
405    }
406}
407
408/// Async stream processor for handling continuous message streams
409pub struct AsyncStreamProcessor<R> {
410    reader: AsyncBufReader<R>,
411}
412
413impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
414    /// Create a new async stream processor
415    pub fn new(reader: R) -> Self {
416        Self {
417            reader: AsyncBufReader::new(reader),
418        }
419    }
420
421    /// Process the next message from the stream
422    pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
423        Protocol::read_async(&mut self.reader).await
424    }
425
426    /// Process all messages in the stream
427    pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
428    where
429        T: for<'de> Deserialize<'de>,
430        F: FnMut(T) -> Fut,
431        Fut: std::future::Future<Output = Result<()>>,
432    {
433        loop {
434            match self.next_message().await {
435                Ok(message) => handler(message).await?,
436                Err(Error::ConnectionClosed) => break,
437                Err(e) => return Err(e),
438            }
439        }
440        Ok(())
441    }
442}