claude_codes/
client_async.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput, ContentBlock};
6use crate::protocol::Protocol;
7use log::{debug, error, info};
8use serde::{Deserialize, Serialize};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
10use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
11use uuid::Uuid;
12
13pub struct AsyncClient {
15 child: Child,
16 stdin: ChildStdin,
17 stdout: BufReader<ChildStdout>,
18 stderr: Option<BufReader<ChildStderr>>,
19 session_uuid: Option<Uuid>,
20}
21
22impl AsyncClient {
23 pub fn new(mut child: Child) -> Result<Self> {
25 let stdin = child
26 .stdin
27 .take()
28 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
29
30 let stdout = BufReader::new(
31 child
32 .stdout
33 .take()
34 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
35 );
36
37 let stderr = child.stderr.take().map(BufReader::new);
38
39 Ok(Self {
40 child,
41 stdin,
42 stdout,
43 stderr,
44 session_uuid: None,
45 })
46 }
47
48 pub async fn with_defaults() -> Result<Self> {
50 crate::version::check_claude_version_async().await?;
56 Self::with_model("sonnet").await
57 }
58
59 pub async fn with_model(model: &str) -> Result<Self> {
61 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
62
63 info!("Started Claude process with model: {}", model);
64 Self::new(child)
65 }
66
67 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
69 let child = builder.spawn().await?;
70 info!("Started Claude process from custom builder");
71 Self::new(child)
72 }
73
74 pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
77 let child = ClaudeCliBuilder::new()
78 .resume(Some(session_uuid.to_string()))
79 .spawn()
80 .await?;
81
82 info!("Resuming Claude session with UUID: {}", session_uuid);
83 let mut client = Self::new(child)?;
84 client.session_uuid = Some(session_uuid);
86 Ok(client)
87 }
88
89 pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
91 let child = ClaudeCliBuilder::new()
92 .model(model)
93 .resume(Some(session_uuid.to_string()))
94 .spawn()
95 .await?;
96
97 info!(
98 "Resuming Claude session with UUID: {} and model: {}",
99 session_uuid, model
100 );
101 let mut client = Self::new(child)?;
102 client.session_uuid = Some(session_uuid);
104 Ok(client)
105 }
106
107 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
110 let session_id = Uuid::new_v4();
111 self.query_with_session(text, session_id).await
112 }
113
114 pub async fn query_with_session(
116 &mut self,
117 text: &str,
118 session_id: Uuid,
119 ) -> Result<Vec<ClaudeOutput>> {
120 let input = ClaudeInput::user_message(text, session_id);
122 self.send(&input).await?;
123
124 let mut responses = Vec::new();
126
127 loop {
128 let output = self.receive().await?;
129 let is_result = matches!(&output, ClaudeOutput::Result(_));
130 responses.push(output);
131
132 if is_result {
133 break;
134 }
135 }
136
137 Ok(responses)
138 }
139
140 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
143 let session_id = Uuid::new_v4();
144 self.query_stream_with_session(text, session_id).await
145 }
146
147 pub async fn query_stream_with_session(
149 &mut self,
150 text: &str,
151 session_id: Uuid,
152 ) -> Result<ResponseStream<'_>> {
153 let input = ClaudeInput::user_message(text, session_id);
155 self.send(&input).await?;
156
157 Ok(ResponseStream {
159 client: self,
160 finished: false,
161 })
162 }
163
164 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
166 let json_line = Protocol::serialize(input)?;
167 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
168
169 self.stdin
170 .write_all(json_line.as_bytes())
171 .await
172 .map_err(Error::Io)?;
173
174 self.stdin.flush().await.map_err(Error::Io)?;
175 Ok(())
176 }
177
178 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
180 let mut line = String::new();
181
182 loop {
183 line.clear();
184 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
185
186 if bytes_read == 0 {
187 return Err(Error::ConnectionClosed);
188 }
189
190 let trimmed = line.trim();
191 if trimmed.is_empty() {
192 continue;
193 }
194
195 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
196
197 match ClaudeOutput::parse_json_tolerant(trimmed) {
199 Ok(output) => {
200 debug!("[INCOMING] Parsed output type: {}", output.message_type());
201
202 if self.session_uuid.is_none() {
204 if let ClaudeOutput::Assistant(ref msg) = output {
205 if let Some(ref uuid_str) = msg.uuid {
206 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
207 debug!("[INCOMING] Captured session UUID: {}", uuid);
208 self.session_uuid = Some(uuid);
209 }
210 }
211 } else if let ClaudeOutput::Result(ref msg) = output {
212 if let Some(ref uuid_str) = msg.uuid {
213 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
214 debug!("[INCOMING] Captured session UUID: {}", uuid);
215 self.session_uuid = Some(uuid);
216 }
217 }
218 }
219 }
220
221 return Ok(output);
222 }
223 Err(parse_error) => {
224 error!("[INCOMING] Failed to deserialize: {}", parse_error);
225 error!("[INCOMING] Raw JSON that failed: {}", trimmed);
226 return Err(Error::Deserialization(format!(
228 "{} (raw: {})",
229 parse_error.error_message, trimmed
230 )));
231 }
232 }
233 }
234 }
235
236 pub fn is_alive(&mut self) -> bool {
238 self.child.try_wait().ok().flatten().is_none()
239 }
240
241 pub async fn shutdown(mut self) -> Result<()> {
243 info!("Shutting down Claude process...");
244 self.child.kill().await.map_err(Error::Io)?;
245 Ok(())
246 }
247
248 pub fn pid(&self) -> Option<u32> {
250 self.child.id()
251 }
252
253 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
255 self.stderr.take()
256 }
257
258 pub fn session_uuid(&self) -> Result<Uuid> {
261 self.session_uuid.ok_or(Error::SessionNotInitialized)
262 }
263
264 pub async fn ping(&mut self) -> bool {
267 let ping_input = ClaudeInput::user_message(
269 "ping - respond with just the word 'pong' and nothing else",
270 self.session_uuid.unwrap_or_else(Uuid::new_v4),
271 );
272
273 if let Err(e) = self.send(&ping_input).await {
275 debug!("Ping failed to send: {}", e);
276 return false;
277 }
278
279 let mut found_pong = false;
281 let mut message_count = 0;
282 const MAX_MESSAGES: usize = 10;
283
284 loop {
285 match self.receive().await {
286 Ok(output) => {
287 message_count += 1;
288
289 if let ClaudeOutput::Assistant(msg) = &output {
291 for content in &msg.message.content {
292 if let ContentBlock::Text(text) = content {
293 if text.text.to_lowercase().contains("pong") {
294 found_pong = true;
295 }
296 }
297 }
298 }
299
300 if matches!(output, ClaudeOutput::Result(_)) {
302 break;
303 }
304
305 if message_count >= MAX_MESSAGES {
307 debug!("Ping exceeded message limit");
308 break;
309 }
310 }
311 Err(e) => {
312 debug!("Ping failed to receive response: {}", e);
313 break;
314 }
315 }
316 }
317
318 found_pong
319 }
320}
321
322pub struct ResponseStream<'a> {
325 client: &'a mut AsyncClient,
326 finished: bool,
327}
328
329impl ResponseStream<'_> {
330 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
332 let mut responses = Vec::new();
333
334 while !self.finished {
335 let output = self.client.receive().await?;
336 let is_result = matches!(&output, ClaudeOutput::Result(_));
337 responses.push(output);
338
339 if is_result {
340 self.finished = true;
341 break;
342 }
343 }
344
345 Ok(responses)
346 }
347
348 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
350 if self.finished {
351 return None;
352 }
353
354 match self.client.receive().await {
355 Ok(output) => {
356 if matches!(&output, ClaudeOutput::Result(_)) {
357 self.finished = true;
358 }
359 Some(Ok(output))
360 }
361 Err(e) => {
362 self.finished = true;
363 Some(Err(e))
364 }
365 }
366 }
367}
368
369impl Drop for AsyncClient {
370 fn drop(&mut self) {
371 if self.is_alive() {
372 if let Err(e) = self.child.start_kill() {
374 error!("Failed to kill Claude process on drop: {}", e);
375 }
376 }
377 }
378}
379
380impl Protocol {
382 pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
384 writer: &mut W,
385 message: &T,
386 ) -> Result<()> {
387 let line = Self::serialize(message)?;
388 debug!("[PROTOCOL] Sending async: {}", line.trim());
389 writer.write_all(line.as_bytes()).await?;
390 writer.flush().await?;
391 Ok(())
392 }
393
394 pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
396 reader: &mut R,
397 ) -> Result<T> {
398 let mut line = String::new();
399 let bytes_read = reader.read_line(&mut line).await?;
400 if bytes_read == 0 {
401 return Err(Error::ConnectionClosed);
402 }
403 debug!("[PROTOCOL] Received async: {}", line.trim());
404 Self::deserialize(&line)
405 }
406}
407
408pub struct AsyncStreamProcessor<R> {
410 reader: AsyncBufReader<R>,
411}
412
413impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
414 pub fn new(reader: R) -> Self {
416 Self {
417 reader: AsyncBufReader::new(reader),
418 }
419 }
420
421 pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
423 Protocol::read_async(&mut self.reader).await
424 }
425
426 pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
428 where
429 T: for<'de> Deserialize<'de>,
430 F: FnMut(T) -> Fut,
431 Fut: std::future::Future<Output = Result<()>>,
432 {
433 loop {
434 match self.next_message().await {
435 Ok(message) => handler(message).await?,
436 Err(Error::ConnectionClosed) => break,
437 Err(e) => return Err(e),
438 }
439 }
440 Ok(())
441 }
442}