Skip to main content

claude_codes/
client_async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6    ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7    ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, error, info, warn};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
13use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16/// Asynchronous client for communicating with Claude
17pub struct AsyncClient {
18    child: Child,
19    stdin: ChildStdin,
20    stdout: BufReader<ChildStdout>,
21    stderr: Option<BufReader<ChildStderr>>,
22    session_uuid: Option<Uuid>,
23    /// Whether tool approval protocol has been initialized
24    tool_approval_enabled: bool,
25}
26
27/// Buffer size for reading Claude's stdout (10MB).
28const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
29
30impl AsyncClient {
31    /// Create a new async client from a tokio Child process
32    pub fn new(mut child: Child) -> Result<Self> {
33        let stdin = child
34            .stdin
35            .take()
36            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
37
38        let stdout = BufReader::with_capacity(
39            STDOUT_BUFFER_SIZE,
40            child
41                .stdout
42                .take()
43                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
44        );
45
46        let stderr = child.stderr.take().map(BufReader::new);
47
48        Ok(Self {
49            child,
50            stdin,
51            stdout,
52            stderr,
53            session_uuid: None,
54            tool_approval_enabled: false,
55        })
56    }
57
58    /// Create a client with default settings (using logic from start_claude)
59    pub async fn with_defaults() -> Result<Self> {
60        // Check Claude version (only warns once per session)
61        // NOTE: The claude-codes API is in high flux. If you wish to work around
62        // this version check, you can use AsyncClient::new() directly with:
63        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
64        //   AsyncClient::new(child)
65        crate::version::check_claude_version_async().await?;
66        Self::with_model("sonnet").await
67    }
68
69    /// Create a client with a specific model
70    pub async fn with_model(model: &str) -> Result<Self> {
71        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
72
73        info!("Started Claude process with model: {}", model);
74        Self::new(child)
75    }
76
77    /// Create a client from a custom builder
78    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
79        let child = builder.spawn().await?;
80        info!("Started Claude process from custom builder");
81        Self::new(child)
82    }
83
84    /// Resume a previous session by UUID
85    /// This creates a new client that resumes an existing session
86    pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
87        let child = ClaudeCliBuilder::new()
88            .resume(Some(session_uuid.to_string()))
89            .spawn()
90            .await?;
91
92        info!("Resuming Claude session with UUID: {}", session_uuid);
93        let mut client = Self::new(child)?;
94        // Pre-populate the session UUID since we're resuming
95        client.session_uuid = Some(session_uuid);
96        Ok(client)
97    }
98
99    /// Resume a previous session with a specific model
100    pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
101        let child = ClaudeCliBuilder::new()
102            .model(model)
103            .resume(Some(session_uuid.to_string()))
104            .spawn()
105            .await?;
106
107        info!(
108            "Resuming Claude session with UUID: {} and model: {}",
109            session_uuid, model
110        );
111        let mut client = Self::new(child)?;
112        // Pre-populate the session UUID since we're resuming
113        client.session_uuid = Some(session_uuid);
114        Ok(client)
115    }
116
117    /// Send a query and collect all responses until Result message
118    /// This is the simplified version that collects all responses
119    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
120        let session_id = Uuid::new_v4();
121        self.query_with_session(text, session_id).await
122    }
123
124    /// Send a query with a custom session ID and collect all responses
125    pub async fn query_with_session(
126        &mut self,
127        text: &str,
128        session_id: Uuid,
129    ) -> Result<Vec<ClaudeOutput>> {
130        // Send the query
131        let input = ClaudeInput::user_message(text, session_id);
132        self.send(&input).await?;
133
134        // Collect responses until we get a Result message
135        let mut responses = Vec::new();
136
137        loop {
138            let output = self.receive().await?;
139            let is_result = matches!(&output, ClaudeOutput::Result(_));
140            responses.push(output);
141
142            if is_result {
143                break;
144            }
145        }
146
147        Ok(responses)
148    }
149
150    /// Send a query and return an async iterator over responses
151    /// Returns a stream that yields ClaudeOutput until Result message is received
152    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
153        let session_id = Uuid::new_v4();
154        self.query_stream_with_session(text, session_id).await
155    }
156
157    /// Send a query with session ID and return an async iterator over responses
158    pub async fn query_stream_with_session(
159        &mut self,
160        text: &str,
161        session_id: Uuid,
162    ) -> Result<ResponseStream<'_>> {
163        // Send the query first
164        let input = ClaudeInput::user_message(text, session_id);
165        self.send(&input).await?;
166
167        // Return a stream that will read responses
168        Ok(ResponseStream {
169            client: self,
170            finished: false,
171        })
172    }
173
174    /// Send a ClaudeInput directly
175    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
176        let json_line = Protocol::serialize(input)?;
177        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
178
179        self.stdin
180            .write_all(json_line.as_bytes())
181            .await
182            .map_err(Error::Io)?;
183
184        self.stdin.flush().await.map_err(Error::Io)?;
185        Ok(())
186    }
187
188    /// Receive a single response from Claude.
189    ///
190    /// # Important: Polling Frequency
191    ///
192    /// This method should be polled frequently to prevent the OS pipe buffer from
193    /// filling up. Claude can emit very large JSON messages (hundreds of KB), and
194    /// if the pipe buffer overflows, data may be truncated.
195    ///
196    /// In a `tokio::select!` loop with other async operations, ensure `receive()`
197    /// is given priority or called frequently. For high-throughput scenarios,
198    /// consider spawning a dedicated task to drain stdout into an unbounded channel.
199    ///
200    /// # Returns
201    ///
202    /// - `Ok(ClaudeOutput)` - A parsed message from Claude
203    /// - `Err(Error::ConnectionClosed)` - Claude process has exited
204    /// - `Err(Error::Deserialization)` - Failed to parse the message
205    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
206        let mut line = String::new();
207
208        loop {
209            line.clear();
210            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
211
212            if bytes_read == 0 {
213                return Err(Error::ConnectionClosed);
214            }
215
216            let trimmed = line.trim();
217            if trimmed.is_empty() {
218                continue;
219            }
220
221            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
222
223            // Use the parse_json_tolerant method which handles ANSI escape codes
224            match ClaudeOutput::parse_json_tolerant(trimmed) {
225                Ok(output) => {
226                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
227
228                    // Capture UUID from first response if not already set
229                    if self.session_uuid.is_none() {
230                        if let ClaudeOutput::Assistant(ref msg) = output {
231                            if let Some(ref uuid_str) = msg.uuid {
232                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
233                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
234                                    self.session_uuid = Some(uuid);
235                                }
236                            }
237                        } else if let ClaudeOutput::Result(ref msg) = output {
238                            if let Some(ref uuid_str) = msg.uuid {
239                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
240                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
241                                    self.session_uuid = Some(uuid);
242                                }
243                            }
244                        }
245                    }
246
247                    return Ok(output);
248                }
249                Err(parse_error) => {
250                    warn!("[INCOMING] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
251                    warn!("[INCOMING] Parse error: {}", parse_error.error_message);
252                    warn!("[INCOMING] Raw message: {}", trimmed);
253                    return Err(parse_error.into());
254                }
255            }
256        }
257    }
258
259    /// Check if the Claude process is still running
260    pub fn is_alive(&mut self) -> bool {
261        self.child.try_wait().ok().flatten().is_none()
262    }
263
264    /// Gracefully shutdown the client
265    pub async fn shutdown(mut self) -> Result<()> {
266        info!("Shutting down Claude process...");
267        self.child.kill().await.map_err(Error::Io)?;
268        Ok(())
269    }
270
271    /// Get the process ID
272    pub fn pid(&self) -> Option<u32> {
273        self.child.id()
274    }
275
276    /// Take the stderr reader (can only be called once)
277    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
278        self.stderr.take()
279    }
280
281    /// Get the session UUID if available
282    /// Returns an error if no response has been received yet
283    pub fn session_uuid(&self) -> Result<Uuid> {
284        self.session_uuid.ok_or(Error::SessionNotInitialized)
285    }
286
287    /// Test if the Claude connection is working by sending a ping message
288    /// Returns true if Claude responds with "pong", false otherwise
289    pub async fn ping(&mut self) -> bool {
290        // Send a simple ping request
291        let ping_input = ClaudeInput::user_message(
292            "ping - respond with just the word 'pong' and nothing else",
293            self.session_uuid.unwrap_or_else(Uuid::new_v4),
294        );
295
296        // Try to send the ping
297        if let Err(e) = self.send(&ping_input).await {
298            debug!("Ping failed to send: {}", e);
299            return false;
300        }
301
302        // Try to receive responses until we get a result or error
303        let mut found_pong = false;
304        let mut message_count = 0;
305        const MAX_MESSAGES: usize = 10;
306
307        loop {
308            match self.receive().await {
309                Ok(output) => {
310                    message_count += 1;
311
312                    // Check if it's an assistant message containing "pong"
313                    if let ClaudeOutput::Assistant(msg) = &output {
314                        for content in &msg.message.content {
315                            if let ContentBlock::Text(text) = content {
316                                if text.text.to_lowercase().contains("pong") {
317                                    found_pong = true;
318                                }
319                            }
320                        }
321                    }
322
323                    // Stop on result message
324                    if matches!(output, ClaudeOutput::Result(_)) {
325                        break;
326                    }
327
328                    // Safety limit
329                    if message_count >= MAX_MESSAGES {
330                        debug!("Ping exceeded message limit");
331                        break;
332                    }
333                }
334                Err(e) => {
335                    debug!("Ping failed to receive response: {}", e);
336                    break;
337                }
338            }
339        }
340
341        found_pong
342    }
343
344    // =========================================================================
345    // Tool Approval Protocol
346    // =========================================================================
347
348    /// Enable the tool approval protocol by performing the initialization handshake.
349    ///
350    /// After calling this method, the CLI will send `ControlRequest` messages when
351    /// Claude wants to use a tool. You must handle these by calling
352    /// `send_control_response()` with an appropriate response.
353    ///
354    /// **Important**: The client must have been created with
355    /// `ClaudeCliBuilder::permission_prompt_tool("stdio")` for this to work.
356    ///
357    /// # Example
358    ///
359    /// ```no_run
360    /// use claude_codes::{AsyncClient, ClaudeCliBuilder, ClaudeOutput, ControlRequestPayload};
361    ///
362    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
363    /// let child = ClaudeCliBuilder::new()
364    ///     .model("sonnet")
365    ///     .permission_prompt_tool("stdio")
366    ///     .spawn()
367    ///     .await?;
368    ///
369    /// let mut client = AsyncClient::new(child)?;
370    /// client.enable_tool_approval().await?;
371    ///
372    /// // Now when you receive messages, you may get ControlRequest messages
373    /// // that need responses
374    /// # Ok(())
375    /// # }
376    /// ```
377    pub async fn enable_tool_approval(&mut self) -> Result<()> {
378        if self.tool_approval_enabled {
379            debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
380            return Ok(());
381        }
382
383        let request_id = format!("init-{}", Uuid::new_v4());
384        let init_request = ControlRequestMessage::initialize(&request_id);
385
386        debug!("[TOOL_APPROVAL] Sending initialization handshake");
387        let json_line = Protocol::serialize(&init_request)?;
388        self.stdin
389            .write_all(json_line.as_bytes())
390            .await
391            .map_err(Error::Io)?;
392        self.stdin.flush().await.map_err(Error::Io)?;
393
394        // Wait for the initialization response
395        loop {
396            let mut line = String::new();
397            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
398
399            if bytes_read == 0 {
400                return Err(Error::ConnectionClosed);
401            }
402
403            let trimmed = line.trim();
404            if trimmed.is_empty() {
405                continue;
406            }
407
408            debug!("[TOOL_APPROVAL] Received: {}", trimmed);
409
410            // Try to parse as ClaudeOutput
411            match ClaudeOutput::parse_json_tolerant(trimmed) {
412                Ok(ClaudeOutput::ControlResponse(resp)) => {
413                    use crate::io::ControlResponsePayload;
414                    match &resp.response {
415                        ControlResponsePayload::Success {
416                            request_id: rid, ..
417                        } if rid == &request_id => {
418                            debug!("[TOOL_APPROVAL] Initialization successful");
419                            self.tool_approval_enabled = true;
420                            return Ok(());
421                        }
422                        ControlResponsePayload::Error { error, .. } => {
423                            return Err(Error::Protocol(format!(
424                                "Tool approval initialization failed: {}",
425                                error
426                            )));
427                        }
428                        _ => {
429                            // Different request_id, keep waiting
430                            continue;
431                        }
432                    }
433                }
434                Ok(_) => {
435                    // Got a different message type (system, etc.), keep waiting
436                    continue;
437                }
438                Err(e) => {
439                    return Err(e.into());
440                }
441            }
442        }
443    }
444
445    /// Send a control response back to the CLI.
446    ///
447    /// Use this to respond to `ControlRequest` messages received during tool approval.
448    /// The easiest way to create responses is using the helper methods on
449    /// `ToolPermissionRequest`:
450    ///
451    /// # Example
452    ///
453    /// ```no_run
454    /// use claude_codes::{AsyncClient, ClaudeOutput, ControlRequestPayload};
455    ///
456    /// # async fn example(client: &mut AsyncClient) -> Result<(), Box<dyn std::error::Error>> {
457    /// # let output = client.receive().await?;
458    /// if let ClaudeOutput::ControlRequest(req) = output {
459    ///     if let ControlRequestPayload::CanUseTool(perm_req) = &req.request {
460    ///         // Use the ergonomic helpers on ToolPermissionRequest
461    ///         let response = if perm_req.tool_name == "Bash" {
462    ///             perm_req.deny("Bash commands not allowed", &req.request_id)
463    ///         } else {
464    ///             perm_req.allow(&req.request_id)
465    ///         };
466    ///         client.send_control_response(response).await?;
467    ///     }
468    /// }
469    /// # Ok(())
470    /// # }
471    /// ```
472    pub async fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
473        let message: ControlResponseMessage = response.into();
474        let json_line = Protocol::serialize(&message)?;
475        debug!(
476            "[TOOL_APPROVAL] Sending control response: {}",
477            json_line.trim()
478        );
479
480        self.stdin
481            .write_all(json_line.as_bytes())
482            .await
483            .map_err(Error::Io)?;
484        self.stdin.flush().await.map_err(Error::Io)?;
485        Ok(())
486    }
487
488    /// Check if tool approval protocol is enabled
489    pub fn is_tool_approval_enabled(&self) -> bool {
490        self.tool_approval_enabled
491    }
492}
493
494/// A response stream that yields ClaudeOutput messages
495/// Holds a reference to the client to read from
496pub struct ResponseStream<'a> {
497    client: &'a mut AsyncClient,
498    finished: bool,
499}
500
501impl ResponseStream<'_> {
502    /// Convert to a vector by collecting all responses
503    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
504        let mut responses = Vec::new();
505
506        while !self.finished {
507            let output = self.client.receive().await?;
508            let is_result = matches!(&output, ClaudeOutput::Result(_));
509            responses.push(output);
510
511            if is_result {
512                self.finished = true;
513                break;
514            }
515        }
516
517        Ok(responses)
518    }
519
520    /// Get the next response
521    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
522        if self.finished {
523            return None;
524        }
525
526        match self.client.receive().await {
527            Ok(output) => {
528                if matches!(&output, ClaudeOutput::Result(_)) {
529                    self.finished = true;
530                }
531                Some(Ok(output))
532            }
533            Err(e) => {
534                self.finished = true;
535                Some(Err(e))
536            }
537        }
538    }
539}
540
541impl Drop for AsyncClient {
542    fn drop(&mut self) {
543        if self.is_alive() {
544            // Try to kill the process
545            if let Err(e) = self.child.start_kill() {
546                error!("Failed to kill Claude process on drop: {}", e);
547            }
548        }
549    }
550}
551
552// Protocol extension methods for asynchronous I/O
553impl Protocol {
554    /// Write a message to an async writer
555    pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
556        writer: &mut W,
557        message: &T,
558    ) -> Result<()> {
559        let line = Self::serialize(message)?;
560        debug!("[PROTOCOL] Sending async: {}", line.trim());
561        writer.write_all(line.as_bytes()).await?;
562        writer.flush().await?;
563        Ok(())
564    }
565
566    /// Read a message from an async reader
567    pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
568        reader: &mut R,
569    ) -> Result<T> {
570        let mut line = String::new();
571        let bytes_read = reader.read_line(&mut line).await?;
572        if bytes_read == 0 {
573            return Err(Error::ConnectionClosed);
574        }
575        debug!("[PROTOCOL] Received async: {}", line.trim());
576        Self::deserialize(&line)
577    }
578}
579
580/// Async stream processor for handling continuous message streams
581pub struct AsyncStreamProcessor<R> {
582    reader: AsyncBufReader<R>,
583}
584
585impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
586    /// Create a new async stream processor
587    pub fn new(reader: R) -> Self {
588        Self {
589            reader: AsyncBufReader::new(reader),
590        }
591    }
592
593    /// Process the next message from the stream
594    pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
595        Protocol::read_async(&mut self.reader).await
596    }
597
598    /// Process all messages in the stream
599    pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
600    where
601        T: for<'de> Deserialize<'de>,
602        F: FnMut(T) -> Fut,
603        Fut: std::future::Future<Output = Result<()>>,
604    {
605        loop {
606            match self.next_message().await {
607                Ok(message) => handler(message).await?,
608                Err(Error::ConnectionClosed) => break,
609                Err(e) => return Err(e),
610            }
611        }
612        Ok(())
613    }
614}