Skip to main content

claude_codes/
client_async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6    ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7    ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, error, info, warn};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
13use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16/// Asynchronous client for communicating with Claude
17pub struct AsyncClient {
18    child: Child,
19    stdin: ChildStdin,
20    stdout: BufReader<ChildStdout>,
21    stderr: Option<BufReader<ChildStderr>>,
22    session_uuid: Option<Uuid>,
23    /// Whether tool approval protocol has been initialized
24    tool_approval_enabled: bool,
25}
26
27/// Buffer size for reading Claude's stdout (10MB).
28const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
29
30impl AsyncClient {
31    /// Create a new async client from a tokio Child process
32    pub fn new(mut child: Child) -> Result<Self> {
33        let stdin = child
34            .stdin
35            .take()
36            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
37
38        let stdout = BufReader::with_capacity(
39            STDOUT_BUFFER_SIZE,
40            child
41                .stdout
42                .take()
43                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
44        );
45
46        let stderr = child.stderr.take().map(BufReader::new);
47
48        Ok(Self {
49            child,
50            stdin,
51            stdout,
52            stderr,
53            session_uuid: None,
54            tool_approval_enabled: false,
55        })
56    }
57
58    /// Create a client with default settings (using logic from start_claude)
59    pub async fn with_defaults() -> Result<Self> {
60        // Check Claude version (only warns once per session)
61        // NOTE: The claude-codes API is in high flux. If you wish to work around
62        // this version check, you can use AsyncClient::new() directly with:
63        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
64        //   AsyncClient::new(child)
65        crate::version::check_claude_version_async().await?;
66        Self::with_model("sonnet").await
67    }
68
69    /// Create a client with a specific model
70    pub async fn with_model(model: &str) -> Result<Self> {
71        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
72
73        info!("Started Claude process with model: {}", model);
74        Self::new(child)
75    }
76
77    /// Create a client from a custom builder
78    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
79        let child = builder.spawn().await?;
80        info!("Started Claude process from custom builder");
81        Self::new(child)
82    }
83
84    /// Resume a previous session by UUID
85    /// This creates a new client that resumes an existing session
86    pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
87        let child = ClaudeCliBuilder::new()
88            .resume(Some(session_uuid.to_string()))
89            .spawn()
90            .await?;
91
92        info!("Resuming Claude session with UUID: {}", session_uuid);
93        let mut client = Self::new(child)?;
94        // Pre-populate the session UUID since we're resuming
95        client.session_uuid = Some(session_uuid);
96        Ok(client)
97    }
98
99    /// Resume a previous session with a specific model
100    pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
101        let child = ClaudeCliBuilder::new()
102            .model(model)
103            .resume(Some(session_uuid.to_string()))
104            .spawn()
105            .await?;
106
107        info!(
108            "Resuming Claude session with UUID: {} and model: {}",
109            session_uuid, model
110        );
111        let mut client = Self::new(child)?;
112        // Pre-populate the session UUID since we're resuming
113        client.session_uuid = Some(session_uuid);
114        Ok(client)
115    }
116
117    /// Send a query and collect all responses until Result message
118    /// This is the simplified version that collects all responses
119    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
120        let session_id = Uuid::new_v4();
121        self.query_with_session(text, session_id).await
122    }
123
124    /// Send a query with a custom session ID and collect all responses
125    pub async fn query_with_session(
126        &mut self,
127        text: &str,
128        session_id: Uuid,
129    ) -> Result<Vec<ClaudeOutput>> {
130        // Send the query
131        let input = ClaudeInput::user_message(text, session_id);
132        self.send(&input).await?;
133
134        // Collect responses until we get a Result message
135        let mut responses = Vec::new();
136
137        loop {
138            let output = self.receive().await?;
139            let is_result = matches!(&output, ClaudeOutput::Result(_));
140            responses.push(output);
141
142            if is_result {
143                break;
144            }
145        }
146
147        Ok(responses)
148    }
149
150    /// Send a query and return an async iterator over responses
151    /// Returns a stream that yields ClaudeOutput until Result message is received
152    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
153        let session_id = Uuid::new_v4();
154        self.query_stream_with_session(text, session_id).await
155    }
156
157    /// Send a query with session ID and return an async iterator over responses
158    pub async fn query_stream_with_session(
159        &mut self,
160        text: &str,
161        session_id: Uuid,
162    ) -> Result<ResponseStream<'_>> {
163        // Send the query first
164        let input = ClaudeInput::user_message(text, session_id);
165        self.send(&input).await?;
166
167        // Return a stream that will read responses
168        Ok(ResponseStream {
169            client: self,
170            finished: false,
171        })
172    }
173
174    /// Send a ClaudeInput directly
175    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
176        let json_line = Protocol::serialize(input)?;
177        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
178
179        self.stdin
180            .write_all(json_line.as_bytes())
181            .await
182            .map_err(Error::Io)?;
183
184        self.stdin.flush().await.map_err(Error::Io)?;
185        Ok(())
186    }
187
188    /// Send an interrupt to gracefully stop the current response.
189    ///
190    /// This writes `{ "subtype": "interrupt" }` to stdin, telling Claude
191    /// to stop without killing the session.
192    pub async fn interrupt(&mut self) -> Result<()> {
193        self.send(&ClaudeInput::interrupt()).await
194    }
195
196    /// Receive a single response from Claude.
197    ///
198    /// # Important: Polling Frequency
199    ///
200    /// This method should be polled frequently to prevent the OS pipe buffer from
201    /// filling up. Claude can emit very large JSON messages (hundreds of KB), and
202    /// if the pipe buffer overflows, data may be truncated.
203    ///
204    /// In a `tokio::select!` loop with other async operations, ensure `receive()`
205    /// is given priority or called frequently. For high-throughput scenarios,
206    /// consider spawning a dedicated task to drain stdout into an unbounded channel.
207    ///
208    /// # Returns
209    ///
210    /// - `Ok(ClaudeOutput)` - A parsed message from Claude
211    /// - `Err(Error::ConnectionClosed)` - Claude process has exited
212    /// - `Err(Error::Deserialization)` - Failed to parse the message
213    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
214        let trimmed = self.read_frame_line().await?;
215        debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
216
217        // Use the parse_json_tolerant method which handles ANSI escape codes
218        match ClaudeOutput::parse_json_tolerant(&trimmed) {
219            Ok(output) => {
220                debug!("[INCOMING] Parsed output type: {}", output.message_type());
221
222                // Capture UUID from first response if not already set
223                if self.session_uuid.is_none() {
224                    if let ClaudeOutput::Assistant(ref msg) = output {
225                        if let Some(ref uuid_str) = msg.uuid {
226                            if let Ok(uuid) = Uuid::parse_str(uuid_str) {
227                                debug!("[INCOMING] Captured session UUID: {}", uuid);
228                                self.session_uuid = Some(uuid);
229                            }
230                        }
231                    } else if let ClaudeOutput::Result(ref msg) = output {
232                        if let Some(ref uuid_str) = msg.uuid {
233                            if let Ok(uuid) = Uuid::parse_str(uuid_str) {
234                                debug!("[INCOMING] Captured session UUID: {}", uuid);
235                                self.session_uuid = Some(uuid);
236                            }
237                        }
238                    }
239                }
240
241                Ok(output)
242            }
243            Err(parse_error) => {
244                warn!("[INCOMING] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
245                warn!("[INCOMING] Parse error: {}", parse_error.error_message);
246                warn!("[INCOMING] Raw message: {}", trimmed);
247                Err(parse_error.into())
248            }
249        }
250    }
251
252    /// Read the next non-empty line from Claude's stdout, trimmed.
253    ///
254    /// Returns `Err(Error::ConnectionClosed)` at EOF. Shared by [`receive`] and
255    /// [`receive_raw`].
256    ///
257    /// [`receive`]: Self::receive
258    /// [`receive_raw`]: Self::receive_raw
259    async fn read_frame_line(&mut self) -> Result<String> {
260        let mut line = String::new();
261        loop {
262            line.clear();
263            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
264            if bytes_read == 0 {
265                return Err(Error::ConnectionClosed);
266            }
267            let trimmed = line.trim();
268            if trimmed.is_empty() {
269                continue;
270            }
271            return Ok(trimmed.to_string());
272        }
273    }
274
275    /// Receive the next frame as a raw `serde_json::Value`, before it is mapped
276    /// into a typed [`ClaudeOutput`].
277    ///
278    /// Useful for auditing wire fidelity: pair it with
279    /// [`audit_frame`](crate::io::audit_frame) to confirm the typed model
280    /// captures every field the CLI emitted. Applies the same leading-prefix
281    /// tolerance as [`receive`](Self::receive).
282    pub async fn receive_raw(&mut self) -> Result<serde_json::Value> {
283        let trimmed = self.read_frame_line().await?;
284        match serde_json::from_str::<serde_json::Value>(&trimmed) {
285            Ok(value) => Ok(value),
286            Err(e) => match trimmed.find('{') {
287                Some(start) => Ok(serde_json::from_str::<serde_json::Value>(&trimmed[start..])
288                    .map_err(|_| Error::Json(e))?),
289                None => Err(Error::Json(e)),
290            },
291        }
292    }
293
294    /// Check if the Claude process is still running
295    pub fn is_alive(&mut self) -> bool {
296        self.child.try_wait().ok().flatten().is_none()
297    }
298
299    /// Gracefully shutdown the client
300    pub async fn shutdown(mut self) -> Result<()> {
301        info!("Shutting down Claude process...");
302        self.child.kill().await.map_err(Error::Io)?;
303        Ok(())
304    }
305
306    /// Get the process ID
307    pub fn pid(&self) -> Option<u32> {
308        self.child.id()
309    }
310
311    /// Take the stderr reader (can only be called once)
312    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
313        self.stderr.take()
314    }
315
316    /// Get the session UUID if available
317    /// Returns an error if no response has been received yet
318    pub fn session_uuid(&self) -> Result<Uuid> {
319        self.session_uuid.ok_or(Error::SessionNotInitialized)
320    }
321
322    /// Test if the Claude connection is working by sending a ping message
323    /// Returns true if Claude responds with "pong", false otherwise
324    pub async fn ping(&mut self) -> bool {
325        // Send a simple ping request
326        let ping_input = ClaudeInput::user_message(
327            "ping - respond with just the word 'pong' and nothing else",
328            self.session_uuid.unwrap_or_else(Uuid::new_v4),
329        );
330
331        // Try to send the ping
332        if let Err(e) = self.send(&ping_input).await {
333            debug!("Ping failed to send: {}", e);
334            return false;
335        }
336
337        // Try to receive responses until we get a result or error
338        let mut found_pong = false;
339        let mut message_count = 0;
340        const MAX_MESSAGES: usize = 10;
341
342        loop {
343            match self.receive().await {
344                Ok(output) => {
345                    message_count += 1;
346
347                    // Check if it's an assistant message containing "pong"
348                    if let ClaudeOutput::Assistant(msg) = &output {
349                        for content in &msg.message.content {
350                            if let ContentBlock::Text(text) = content {
351                                if text.text.to_lowercase().contains("pong") {
352                                    found_pong = true;
353                                }
354                            }
355                        }
356                    }
357
358                    // Stop on result message
359                    if matches!(output, ClaudeOutput::Result(_)) {
360                        break;
361                    }
362
363                    // Safety limit
364                    if message_count >= MAX_MESSAGES {
365                        debug!("Ping exceeded message limit");
366                        break;
367                    }
368                }
369                Err(e) => {
370                    debug!("Ping failed to receive response: {}", e);
371                    break;
372                }
373            }
374        }
375
376        found_pong
377    }
378
379    // =========================================================================
380    // Tool Approval Protocol
381    // =========================================================================
382
383    /// Enable the tool approval protocol by performing the initialization handshake.
384    ///
385    /// After calling this method, the CLI will send `ControlRequest` messages when
386    /// Claude wants to use a tool. You must handle these by calling
387    /// `send_control_response()` with an appropriate response.
388    ///
389    /// **Important**: The client must have been created with
390    /// `ClaudeCliBuilder::permission_prompt_tool("stdio")` for this to work.
391    ///
392    /// # Example
393    ///
394    /// ```no_run
395    /// use claude_codes::{AsyncClient, ClaudeCliBuilder, ClaudeOutput, ControlRequestPayload};
396    ///
397    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
398    /// let child = ClaudeCliBuilder::new()
399    ///     .model("sonnet")
400    ///     .permission_prompt_tool("stdio")
401    ///     .spawn()
402    ///     .await?;
403    ///
404    /// let mut client = AsyncClient::new(child)?;
405    /// client.enable_tool_approval().await?;
406    ///
407    /// // Now when you receive messages, you may get ControlRequest messages
408    /// // that need responses
409    /// # Ok(())
410    /// # }
411    /// ```
412    pub async fn enable_tool_approval(&mut self) -> Result<()> {
413        if self.tool_approval_enabled {
414            debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
415            return Ok(());
416        }
417
418        let request_id = format!("init-{}", Uuid::new_v4());
419        let init_request = ControlRequestMessage::initialize(&request_id);
420
421        debug!("[TOOL_APPROVAL] Sending initialization handshake");
422        let json_line = Protocol::serialize(&init_request)?;
423        self.stdin
424            .write_all(json_line.as_bytes())
425            .await
426            .map_err(Error::Io)?;
427        self.stdin.flush().await.map_err(Error::Io)?;
428
429        // Wait for the initialization response
430        loop {
431            let mut line = String::new();
432            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
433
434            if bytes_read == 0 {
435                return Err(Error::ConnectionClosed);
436            }
437
438            let trimmed = line.trim();
439            if trimmed.is_empty() {
440                continue;
441            }
442
443            debug!("[TOOL_APPROVAL] Received: {}", trimmed);
444
445            // Try to parse as ClaudeOutput
446            match ClaudeOutput::parse_json_tolerant(trimmed) {
447                Ok(ClaudeOutput::ControlResponse(resp)) => {
448                    use crate::io::ControlResponsePayload;
449                    match &resp.response {
450                        ControlResponsePayload::Success {
451                            request_id: rid, ..
452                        } if rid == &request_id => {
453                            debug!("[TOOL_APPROVAL] Initialization successful");
454                            self.tool_approval_enabled = true;
455                            return Ok(());
456                        }
457                        ControlResponsePayload::Error { error, .. } => {
458                            return Err(Error::Protocol(format!(
459                                "Tool approval initialization failed: {}",
460                                error
461                            )));
462                        }
463                        _ => {
464                            // Different request_id, keep waiting
465                            continue;
466                        }
467                    }
468                }
469                Ok(_) => {
470                    // Got a different message type (system, etc.), keep waiting
471                    continue;
472                }
473                Err(e) => {
474                    return Err(e.into());
475                }
476            }
477        }
478    }
479
480    /// Send a control response back to the CLI.
481    ///
482    /// Use this to respond to `ControlRequest` messages received during tool approval.
483    /// The easiest way to create responses is using the helper methods on
484    /// `ToolPermissionRequest`:
485    ///
486    /// # Example
487    ///
488    /// ```no_run
489    /// use claude_codes::{AsyncClient, ClaudeOutput, ControlRequestPayload};
490    ///
491    /// # async fn example(client: &mut AsyncClient) -> Result<(), Box<dyn std::error::Error>> {
492    /// # let output = client.receive().await?;
493    /// if let ClaudeOutput::ControlRequest(req) = output {
494    ///     if let ControlRequestPayload::CanUseTool(perm_req) = &req.request {
495    ///         // Use the ergonomic helpers on ToolPermissionRequest
496    ///         let response = if perm_req.tool_name == "Bash" {
497    ///             perm_req.deny("Bash commands not allowed", &req.request_id)
498    ///         } else {
499    ///             perm_req.allow(&req.request_id)
500    ///         };
501    ///         client.send_control_response(response).await?;
502    ///     }
503    /// }
504    /// # Ok(())
505    /// # }
506    /// ```
507    pub async fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
508        let message: ControlResponseMessage = response.into();
509        let json_line = Protocol::serialize(&message)?;
510        debug!(
511            "[TOOL_APPROVAL] Sending control response: {}",
512            json_line.trim()
513        );
514
515        self.stdin
516            .write_all(json_line.as_bytes())
517            .await
518            .map_err(Error::Io)?;
519        self.stdin.flush().await.map_err(Error::Io)?;
520        Ok(())
521    }
522
523    /// Check if tool approval protocol is enabled
524    pub fn is_tool_approval_enabled(&self) -> bool {
525        self.tool_approval_enabled
526    }
527}
528
529/// A response stream that yields ClaudeOutput messages
530/// Holds a reference to the client to read from
531pub struct ResponseStream<'a> {
532    client: &'a mut AsyncClient,
533    finished: bool,
534}
535
536impl ResponseStream<'_> {
537    /// Convert to a vector by collecting all responses
538    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
539        let mut responses = Vec::new();
540
541        while !self.finished {
542            let output = self.client.receive().await?;
543            let is_result = matches!(&output, ClaudeOutput::Result(_));
544            responses.push(output);
545
546            if is_result {
547                self.finished = true;
548                break;
549            }
550        }
551
552        Ok(responses)
553    }
554
555    /// Get the next response
556    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
557        if self.finished {
558            return None;
559        }
560
561        match self.client.receive().await {
562            Ok(output) => {
563                if matches!(&output, ClaudeOutput::Result(_)) {
564                    self.finished = true;
565                }
566                Some(Ok(output))
567            }
568            Err(e) => {
569                self.finished = true;
570                Some(Err(e))
571            }
572        }
573    }
574}
575
576impl Drop for AsyncClient {
577    fn drop(&mut self) {
578        if self.is_alive() {
579            // Try to kill the process
580            if let Err(e) = self.child.start_kill() {
581                error!("Failed to kill Claude process on drop: {}", e);
582            }
583        }
584    }
585}
586
587// Protocol extension methods for asynchronous I/O
588impl Protocol {
589    /// Write a message to an async writer
590    pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
591        writer: &mut W,
592        message: &T,
593    ) -> Result<()> {
594        let line = Self::serialize(message)?;
595        debug!("[PROTOCOL] Sending async: {}", line.trim());
596        writer.write_all(line.as_bytes()).await?;
597        writer.flush().await?;
598        Ok(())
599    }
600
601    /// Read a message from an async reader
602    pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
603        reader: &mut R,
604    ) -> Result<T> {
605        let mut line = String::new();
606        let bytes_read = reader.read_line(&mut line).await?;
607        if bytes_read == 0 {
608            return Err(Error::ConnectionClosed);
609        }
610        debug!("[PROTOCOL] Received async: {}", line.trim());
611        Self::deserialize(&line)
612    }
613}
614
615/// Async stream processor for handling continuous message streams
616pub struct AsyncStreamProcessor<R> {
617    reader: AsyncBufReader<R>,
618}
619
620impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
621    /// Create a new async stream processor
622    pub fn new(reader: R) -> Self {
623        Self {
624            reader: AsyncBufReader::new(reader),
625        }
626    }
627
628    /// Process the next message from the stream
629    pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
630        Protocol::read_async(&mut self.reader).await
631    }
632
633    /// Process all messages in the stream
634    pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
635    where
636        T: for<'de> Deserialize<'de>,
637        F: FnMut(T) -> Fut,
638        Fut: std::future::Future<Output = Result<()>>,
639    {
640        loop {
641            match self.next_message().await {
642                Ok(message) => handler(message).await?,
643                Err(Error::ConnectionClosed) => break,
644                Err(e) => return Err(e),
645            }
646        }
647        Ok(())
648    }
649}