Skip to main content

claude_codes/
client_async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6    ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7    ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, error, info, warn};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
13use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16/// Asynchronous client for communicating with Claude
17pub struct AsyncClient {
18    child: Child,
19    stdin: ChildStdin,
20    stdout: BufReader<ChildStdout>,
21    stderr: Option<BufReader<ChildStderr>>,
22    session_uuid: Option<Uuid>,
23    /// Whether tool approval protocol has been initialized
24    tool_approval_enabled: bool,
25}
26
27/// Buffer size for reading Claude's stdout (10MB).
28const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
29
30impl AsyncClient {
31    /// Create a new async client from a tokio Child process
32    pub fn new(mut child: Child) -> Result<Self> {
33        let stdin = child
34            .stdin
35            .take()
36            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
37
38        let stdout = BufReader::with_capacity(
39            STDOUT_BUFFER_SIZE,
40            child
41                .stdout
42                .take()
43                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
44        );
45
46        let stderr = child.stderr.take().map(BufReader::new);
47
48        Ok(Self {
49            child,
50            stdin,
51            stdout,
52            stderr,
53            session_uuid: None,
54            tool_approval_enabled: false,
55        })
56    }
57
58    /// Create a client with default settings (using logic from start_claude)
59    pub async fn with_defaults() -> Result<Self> {
60        // Check Claude version (only warns once per session)
61        // NOTE: The claude-codes API is in high flux. If you wish to work around
62        // this version check, you can use AsyncClient::new() directly with:
63        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
64        //   AsyncClient::new(child)
65        crate::version::check_claude_version_async().await?;
66        Self::with_model("sonnet").await
67    }
68
69    /// Create a client with a specific model
70    pub async fn with_model(model: &str) -> Result<Self> {
71        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
72
73        info!("Started Claude process with model: {}", model);
74        Self::new(child)
75    }
76
77    /// Create a client from a custom builder
78    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
79        let child = builder.spawn().await?;
80        info!("Started Claude process from custom builder");
81        Self::new(child)
82    }
83
84    /// Resume a previous session by UUID
85    /// This creates a new client that resumes an existing session
86    pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
87        let child = ClaudeCliBuilder::new()
88            .resume(Some(session_uuid.to_string()))
89            .spawn()
90            .await?;
91
92        info!("Resuming Claude session with UUID: {}", session_uuid);
93        let mut client = Self::new(child)?;
94        // Pre-populate the session UUID since we're resuming
95        client.session_uuid = Some(session_uuid);
96        Ok(client)
97    }
98
99    /// Resume a previous session with a specific model
100    pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
101        let child = ClaudeCliBuilder::new()
102            .model(model)
103            .resume(Some(session_uuid.to_string()))
104            .spawn()
105            .await?;
106
107        info!(
108            "Resuming Claude session with UUID: {} and model: {}",
109            session_uuid, model
110        );
111        let mut client = Self::new(child)?;
112        // Pre-populate the session UUID since we're resuming
113        client.session_uuid = Some(session_uuid);
114        Ok(client)
115    }
116
117    /// Send a query and collect all responses until Result message
118    /// This is the simplified version that collects all responses
119    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
120        let session_id = Uuid::new_v4();
121        self.query_with_session(text, session_id).await
122    }
123
124    /// Send a query with a custom session ID and collect all responses
125    pub async fn query_with_session(
126        &mut self,
127        text: &str,
128        session_id: Uuid,
129    ) -> Result<Vec<ClaudeOutput>> {
130        // Send the query
131        let input = ClaudeInput::user_message(text, session_id);
132        self.send(&input).await?;
133
134        // Collect responses until we get a Result message
135        let mut responses = Vec::new();
136
137        loop {
138            let output = self.receive().await?;
139            let is_result = matches!(&output, ClaudeOutput::Result(_));
140            responses.push(output);
141
142            if is_result {
143                break;
144            }
145        }
146
147        Ok(responses)
148    }
149
150    /// Send a query and return an async iterator over responses
151    /// Returns a stream that yields ClaudeOutput until Result message is received
152    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
153        let session_id = Uuid::new_v4();
154        self.query_stream_with_session(text, session_id).await
155    }
156
157    /// Send a query with session ID and return an async iterator over responses
158    pub async fn query_stream_with_session(
159        &mut self,
160        text: &str,
161        session_id: Uuid,
162    ) -> Result<ResponseStream<'_>> {
163        // Send the query first
164        let input = ClaudeInput::user_message(text, session_id);
165        self.send(&input).await?;
166
167        // Return a stream that will read responses
168        Ok(ResponseStream {
169            client: self,
170            finished: false,
171        })
172    }
173
174    /// Send a ClaudeInput directly
175    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
176        let json_line = Protocol::serialize(input)?;
177        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
178
179        self.stdin
180            .write_all(json_line.as_bytes())
181            .await
182            .map_err(Error::Io)?;
183
184        self.stdin.flush().await.map_err(Error::Io)?;
185        Ok(())
186    }
187
188    /// Send an interrupt to gracefully stop the current response.
189    ///
190    /// This writes `{ "subtype": "interrupt" }` to stdin, telling Claude
191    /// to stop without killing the session.
192    pub async fn interrupt(&mut self) -> Result<()> {
193        self.send(&ClaudeInput::interrupt()).await
194    }
195
196    /// Receive a single response from Claude.
197    ///
198    /// # Important: Polling Frequency
199    ///
200    /// This method should be polled frequently to prevent the OS pipe buffer from
201    /// filling up. Claude can emit very large JSON messages (hundreds of KB), and
202    /// if the pipe buffer overflows, data may be truncated.
203    ///
204    /// In a `tokio::select!` loop with other async operations, ensure `receive()`
205    /// is given priority or called frequently. For high-throughput scenarios,
206    /// consider spawning a dedicated task to drain stdout into an unbounded channel.
207    ///
208    /// # Returns
209    ///
210    /// - `Ok(ClaudeOutput)` - A parsed message from Claude
211    /// - `Err(Error::ConnectionClosed)` - Claude process has exited
212    /// - `Err(Error::Deserialization)` - Failed to parse the message
213    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
214        let mut line = String::new();
215
216        loop {
217            line.clear();
218            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
219
220            if bytes_read == 0 {
221                return Err(Error::ConnectionClosed);
222            }
223
224            let trimmed = line.trim();
225            if trimmed.is_empty() {
226                continue;
227            }
228
229            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
230
231            // Use the parse_json_tolerant method which handles ANSI escape codes
232            match ClaudeOutput::parse_json_tolerant(trimmed) {
233                Ok(output) => {
234                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
235
236                    // Capture UUID from first response if not already set
237                    if self.session_uuid.is_none() {
238                        if let ClaudeOutput::Assistant(ref msg) = output {
239                            if let Some(ref uuid_str) = msg.uuid {
240                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
241                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
242                                    self.session_uuid = Some(uuid);
243                                }
244                            }
245                        } else if let ClaudeOutput::Result(ref msg) = output {
246                            if let Some(ref uuid_str) = msg.uuid {
247                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
248                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
249                                    self.session_uuid = Some(uuid);
250                                }
251                            }
252                        }
253                    }
254
255                    return Ok(output);
256                }
257                Err(parse_error) => {
258                    warn!("[INCOMING] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
259                    warn!("[INCOMING] Parse error: {}", parse_error.error_message);
260                    warn!("[INCOMING] Raw message: {}", trimmed);
261                    return Err(parse_error.into());
262                }
263            }
264        }
265    }
266
267    /// Check if the Claude process is still running
268    pub fn is_alive(&mut self) -> bool {
269        self.child.try_wait().ok().flatten().is_none()
270    }
271
272    /// Gracefully shutdown the client
273    pub async fn shutdown(mut self) -> Result<()> {
274        info!("Shutting down Claude process...");
275        self.child.kill().await.map_err(Error::Io)?;
276        Ok(())
277    }
278
279    /// Get the process ID
280    pub fn pid(&self) -> Option<u32> {
281        self.child.id()
282    }
283
284    /// Take the stderr reader (can only be called once)
285    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
286        self.stderr.take()
287    }
288
289    /// Get the session UUID if available
290    /// Returns an error if no response has been received yet
291    pub fn session_uuid(&self) -> Result<Uuid> {
292        self.session_uuid.ok_or(Error::SessionNotInitialized)
293    }
294
295    /// Test if the Claude connection is working by sending a ping message
296    /// Returns true if Claude responds with "pong", false otherwise
297    pub async fn ping(&mut self) -> bool {
298        // Send a simple ping request
299        let ping_input = ClaudeInput::user_message(
300            "ping - respond with just the word 'pong' and nothing else",
301            self.session_uuid.unwrap_or_else(Uuid::new_v4),
302        );
303
304        // Try to send the ping
305        if let Err(e) = self.send(&ping_input).await {
306            debug!("Ping failed to send: {}", e);
307            return false;
308        }
309
310        // Try to receive responses until we get a result or error
311        let mut found_pong = false;
312        let mut message_count = 0;
313        const MAX_MESSAGES: usize = 10;
314
315        loop {
316            match self.receive().await {
317                Ok(output) => {
318                    message_count += 1;
319
320                    // Check if it's an assistant message containing "pong"
321                    if let ClaudeOutput::Assistant(msg) = &output {
322                        for content in &msg.message.content {
323                            if let ContentBlock::Text(text) = content {
324                                if text.text.to_lowercase().contains("pong") {
325                                    found_pong = true;
326                                }
327                            }
328                        }
329                    }
330
331                    // Stop on result message
332                    if matches!(output, ClaudeOutput::Result(_)) {
333                        break;
334                    }
335
336                    // Safety limit
337                    if message_count >= MAX_MESSAGES {
338                        debug!("Ping exceeded message limit");
339                        break;
340                    }
341                }
342                Err(e) => {
343                    debug!("Ping failed to receive response: {}", e);
344                    break;
345                }
346            }
347        }
348
349        found_pong
350    }
351
352    // =========================================================================
353    // Tool Approval Protocol
354    // =========================================================================
355
356    /// Enable the tool approval protocol by performing the initialization handshake.
357    ///
358    /// After calling this method, the CLI will send `ControlRequest` messages when
359    /// Claude wants to use a tool. You must handle these by calling
360    /// `send_control_response()` with an appropriate response.
361    ///
362    /// **Important**: The client must have been created with
363    /// `ClaudeCliBuilder::permission_prompt_tool("stdio")` for this to work.
364    ///
365    /// # Example
366    ///
367    /// ```no_run
368    /// use claude_codes::{AsyncClient, ClaudeCliBuilder, ClaudeOutput, ControlRequestPayload};
369    ///
370    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
371    /// let child = ClaudeCliBuilder::new()
372    ///     .model("sonnet")
373    ///     .permission_prompt_tool("stdio")
374    ///     .spawn()
375    ///     .await?;
376    ///
377    /// let mut client = AsyncClient::new(child)?;
378    /// client.enable_tool_approval().await?;
379    ///
380    /// // Now when you receive messages, you may get ControlRequest messages
381    /// // that need responses
382    /// # Ok(())
383    /// # }
384    /// ```
385    pub async fn enable_tool_approval(&mut self) -> Result<()> {
386        if self.tool_approval_enabled {
387            debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
388            return Ok(());
389        }
390
391        let request_id = format!("init-{}", Uuid::new_v4());
392        let init_request = ControlRequestMessage::initialize(&request_id);
393
394        debug!("[TOOL_APPROVAL] Sending initialization handshake");
395        let json_line = Protocol::serialize(&init_request)?;
396        self.stdin
397            .write_all(json_line.as_bytes())
398            .await
399            .map_err(Error::Io)?;
400        self.stdin.flush().await.map_err(Error::Io)?;
401
402        // Wait for the initialization response
403        loop {
404            let mut line = String::new();
405            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
406
407            if bytes_read == 0 {
408                return Err(Error::ConnectionClosed);
409            }
410
411            let trimmed = line.trim();
412            if trimmed.is_empty() {
413                continue;
414            }
415
416            debug!("[TOOL_APPROVAL] Received: {}", trimmed);
417
418            // Try to parse as ClaudeOutput
419            match ClaudeOutput::parse_json_tolerant(trimmed) {
420                Ok(ClaudeOutput::ControlResponse(resp)) => {
421                    use crate::io::ControlResponsePayload;
422                    match &resp.response {
423                        ControlResponsePayload::Success {
424                            request_id: rid, ..
425                        } if rid == &request_id => {
426                            debug!("[TOOL_APPROVAL] Initialization successful");
427                            self.tool_approval_enabled = true;
428                            return Ok(());
429                        }
430                        ControlResponsePayload::Error { error, .. } => {
431                            return Err(Error::Protocol(format!(
432                                "Tool approval initialization failed: {}",
433                                error
434                            )));
435                        }
436                        _ => {
437                            // Different request_id, keep waiting
438                            continue;
439                        }
440                    }
441                }
442                Ok(_) => {
443                    // Got a different message type (system, etc.), keep waiting
444                    continue;
445                }
446                Err(e) => {
447                    return Err(e.into());
448                }
449            }
450        }
451    }
452
453    /// Send a control response back to the CLI.
454    ///
455    /// Use this to respond to `ControlRequest` messages received during tool approval.
456    /// The easiest way to create responses is using the helper methods on
457    /// `ToolPermissionRequest`:
458    ///
459    /// # Example
460    ///
461    /// ```no_run
462    /// use claude_codes::{AsyncClient, ClaudeOutput, ControlRequestPayload};
463    ///
464    /// # async fn example(client: &mut AsyncClient) -> Result<(), Box<dyn std::error::Error>> {
465    /// # let output = client.receive().await?;
466    /// if let ClaudeOutput::ControlRequest(req) = output {
467    ///     if let ControlRequestPayload::CanUseTool(perm_req) = &req.request {
468    ///         // Use the ergonomic helpers on ToolPermissionRequest
469    ///         let response = if perm_req.tool_name == "Bash" {
470    ///             perm_req.deny("Bash commands not allowed", &req.request_id)
471    ///         } else {
472    ///             perm_req.allow(&req.request_id)
473    ///         };
474    ///         client.send_control_response(response).await?;
475    ///     }
476    /// }
477    /// # Ok(())
478    /// # }
479    /// ```
480    pub async fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
481        let message: ControlResponseMessage = response.into();
482        let json_line = Protocol::serialize(&message)?;
483        debug!(
484            "[TOOL_APPROVAL] Sending control response: {}",
485            json_line.trim()
486        );
487
488        self.stdin
489            .write_all(json_line.as_bytes())
490            .await
491            .map_err(Error::Io)?;
492        self.stdin.flush().await.map_err(Error::Io)?;
493        Ok(())
494    }
495
496    /// Check if tool approval protocol is enabled
497    pub fn is_tool_approval_enabled(&self) -> bool {
498        self.tool_approval_enabled
499    }
500}
501
502/// A response stream that yields ClaudeOutput messages
503/// Holds a reference to the client to read from
504pub struct ResponseStream<'a> {
505    client: &'a mut AsyncClient,
506    finished: bool,
507}
508
509impl ResponseStream<'_> {
510    /// Convert to a vector by collecting all responses
511    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
512        let mut responses = Vec::new();
513
514        while !self.finished {
515            let output = self.client.receive().await?;
516            let is_result = matches!(&output, ClaudeOutput::Result(_));
517            responses.push(output);
518
519            if is_result {
520                self.finished = true;
521                break;
522            }
523        }
524
525        Ok(responses)
526    }
527
528    /// Get the next response
529    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
530        if self.finished {
531            return None;
532        }
533
534        match self.client.receive().await {
535            Ok(output) => {
536                if matches!(&output, ClaudeOutput::Result(_)) {
537                    self.finished = true;
538                }
539                Some(Ok(output))
540            }
541            Err(e) => {
542                self.finished = true;
543                Some(Err(e))
544            }
545        }
546    }
547}
548
549impl Drop for AsyncClient {
550    fn drop(&mut self) {
551        if self.is_alive() {
552            // Try to kill the process
553            if let Err(e) = self.child.start_kill() {
554                error!("Failed to kill Claude process on drop: {}", e);
555            }
556        }
557    }
558}
559
560// Protocol extension methods for asynchronous I/O
561impl Protocol {
562    /// Write a message to an async writer
563    pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
564        writer: &mut W,
565        message: &T,
566    ) -> Result<()> {
567        let line = Self::serialize(message)?;
568        debug!("[PROTOCOL] Sending async: {}", line.trim());
569        writer.write_all(line.as_bytes()).await?;
570        writer.flush().await?;
571        Ok(())
572    }
573
574    /// Read a message from an async reader
575    pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
576        reader: &mut R,
577    ) -> Result<T> {
578        let mut line = String::new();
579        let bytes_read = reader.read_line(&mut line).await?;
580        if bytes_read == 0 {
581            return Err(Error::ConnectionClosed);
582        }
583        debug!("[PROTOCOL] Received async: {}", line.trim());
584        Self::deserialize(&line)
585    }
586}
587
588/// Async stream processor for handling continuous message streams
589pub struct AsyncStreamProcessor<R> {
590    reader: AsyncBufReader<R>,
591}
592
593impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
594    /// Create a new async stream processor
595    pub fn new(reader: R) -> Self {
596        Self {
597            reader: AsyncBufReader::new(reader),
598        }
599    }
600
601    /// Process the next message from the stream
602    pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
603        Protocol::read_async(&mut self.reader).await
604    }
605
606    /// Process all messages in the stream
607    pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
608    where
609        T: for<'de> Deserialize<'de>,
610        F: FnMut(T) -> Fut,
611        Fut: std::future::Future<Output = Result<()>>,
612    {
613        loop {
614            match self.next_message().await {
615                Ok(message) => handler(message).await?,
616                Err(Error::ConnectionClosed) => break,
617                Err(e) => return Err(e),
618            }
619        }
620        Ok(())
621    }
622}