claude_codes/client/
async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10
11/// Asynchronous client for communicating with Claude
12pub struct AsyncClient {
13    child: Child,
14    stdin: ChildStdin,
15    stdout: BufReader<ChildStdout>,
16    stderr: Option<BufReader<ChildStderr>>,
17}
18
19impl AsyncClient {
20    /// Create a new async client from a tokio Child process
21    pub fn new(mut child: Child) -> Result<Self> {
22        let stdin = child
23            .stdin
24            .take()
25            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
26
27        let stdout = BufReader::new(
28            child
29                .stdout
30                .take()
31                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
32        );
33
34        let stderr = child.stderr.take().map(BufReader::new);
35
36        Ok(Self {
37            child,
38            stdin,
39            stdout,
40            stderr,
41        })
42    }
43
44    /// Create a client with default settings (using logic from start_claude)
45    pub async fn with_defaults() -> Result<Self> {
46        // Check Claude version (only warns once per session)
47        // NOTE: The claude-codes API is in high flux. If you wish to work around
48        // this version check, you can use AsyncClient::new() directly with:
49        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
50        //   AsyncClient::new(child)
51        crate::version::check_claude_version_async().await?;
52        Self::with_model("sonnet").await
53    }
54
55    /// Create a client with a specific model
56    pub async fn with_model(model: &str) -> Result<Self> {
57        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
58
59        info!("Started Claude process with model: {}", model);
60        Self::new(child)
61    }
62
63    /// Create a client from a custom builder
64    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
65        let child = builder.spawn().await?;
66        info!("Started Claude process from custom builder");
67        Self::new(child)
68    }
69
70    /// Send a query and collect all responses until Result message
71    /// This is the simplified version that collects all responses
72    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
73        self.query_with_session(text, "default").await
74    }
75
76    /// Send a query with a custom session ID and collect all responses
77    pub async fn query_with_session(
78        &mut self,
79        text: &str,
80        session_id: &str,
81    ) -> Result<Vec<ClaudeOutput>> {
82        // Send the query
83        let input = ClaudeInput::user_message(text, session_id);
84        self.send(&input).await?;
85
86        // Collect responses until we get a Result message
87        let mut responses = Vec::new();
88
89        loop {
90            let output = self.receive().await?;
91            let is_result = matches!(&output, ClaudeOutput::Result(_));
92            responses.push(output);
93
94            if is_result {
95                break;
96            }
97        }
98
99        Ok(responses)
100    }
101
102    /// Send a query and return an async iterator over responses
103    /// Returns a stream that yields ClaudeOutput until Result message is received
104    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
105        self.query_stream_with_session(text, "default").await
106    }
107
108    /// Send a query with session ID and return an async iterator over responses
109    pub async fn query_stream_with_session(
110        &mut self,
111        text: &str,
112        session_id: &str,
113    ) -> Result<ResponseStream<'_>> {
114        // Send the query first
115        let input = ClaudeInput::user_message(text, session_id);
116        self.send(&input).await?;
117
118        // Return a stream that will read responses
119        Ok(ResponseStream {
120            client: self,
121            finished: false,
122        })
123    }
124
125    /// Send a ClaudeInput directly
126    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
127        let json_line = Protocol::serialize(input)?;
128        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
129
130        self.stdin
131            .write_all(json_line.as_bytes())
132            .await
133            .map_err(Error::Io)?;
134
135        self.stdin.flush().await.map_err(Error::Io)?;
136        Ok(())
137    }
138
139    /// Try to receive a single response
140    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
141        let mut line = String::new();
142
143        loop {
144            line.clear();
145            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
146
147            if bytes_read == 0 {
148                return Err(Error::ConnectionClosed);
149            }
150
151            let trimmed = line.trim();
152            if trimmed.is_empty() {
153                continue;
154            }
155
156            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
157
158            // Use the parse_json method which returns ParseError
159            match ClaudeOutput::parse_json(trimmed) {
160                Ok(output) => {
161                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
162                    return Ok(output);
163                }
164                Err(parse_error) => {
165                    error!("[INCOMING] Failed to deserialize: {}", parse_error);
166                    // Convert ParseError to our Error type
167                    return Err(Error::Deserialization(parse_error.error_message));
168                }
169            }
170        }
171    }
172
173    /// Check if the Claude process is still running
174    pub fn is_alive(&mut self) -> bool {
175        self.child.try_wait().ok().flatten().is_none()
176    }
177
178    /// Gracefully shutdown the client
179    pub async fn shutdown(mut self) -> Result<()> {
180        info!("Shutting down Claude process...");
181        self.child.kill().await.map_err(Error::Io)?;
182        Ok(())
183    }
184
185    /// Get the process ID
186    pub fn pid(&self) -> Option<u32> {
187        self.child.id()
188    }
189
190    /// Take the stderr reader (can only be called once)
191    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
192        self.stderr.take()
193    }
194}
195
196/// A response stream that yields ClaudeOutput messages
197/// Holds a reference to the client to read from
198pub struct ResponseStream<'a> {
199    client: &'a mut AsyncClient,
200    finished: bool,
201}
202
203impl ResponseStream<'_> {
204    /// Convert to a vector by collecting all responses
205    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
206        let mut responses = Vec::new();
207
208        while !self.finished {
209            let output = self.client.receive().await?;
210            let is_result = matches!(&output, ClaudeOutput::Result(_));
211            responses.push(output);
212
213            if is_result {
214                self.finished = true;
215                break;
216            }
217        }
218
219        Ok(responses)
220    }
221
222    /// Get the next response
223    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
224        if self.finished {
225            return None;
226        }
227
228        match self.client.receive().await {
229            Ok(output) => {
230                if matches!(&output, ClaudeOutput::Result(_)) {
231                    self.finished = true;
232                }
233                Some(Ok(output))
234            }
235            Err(e) => {
236                self.finished = true;
237                Some(Err(e))
238            }
239        }
240    }
241}
242
243impl Drop for AsyncClient {
244    fn drop(&mut self) {
245        if self.is_alive() {
246            // Try to kill the process
247            if let Err(e) = self.child.start_kill() {
248                error!("Failed to kill Claude process on drop: {}", e);
249            }
250        }
251    }
252}