1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6 ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7 ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, error, info, warn};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
13use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16pub struct AsyncClient {
18 child: Child,
19 stdin: ChildStdin,
20 stdout: BufReader<ChildStdout>,
21 stderr: Option<BufReader<ChildStderr>>,
22 session_uuid: Option<Uuid>,
23 tool_approval_enabled: bool,
25}
26
27const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
29
30impl AsyncClient {
31 pub fn new(mut child: Child) -> Result<Self> {
33 let stdin = child
34 .stdin
35 .take()
36 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
37
38 let stdout = BufReader::with_capacity(
39 STDOUT_BUFFER_SIZE,
40 child
41 .stdout
42 .take()
43 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
44 );
45
46 let stderr = child.stderr.take().map(BufReader::new);
47
48 Ok(Self {
49 child,
50 stdin,
51 stdout,
52 stderr,
53 session_uuid: None,
54 tool_approval_enabled: false,
55 })
56 }
57
58 pub async fn with_defaults() -> Result<Self> {
60 crate::version::check_claude_version_async().await?;
66 Self::with_model("sonnet").await
67 }
68
69 pub async fn with_model(model: &str) -> Result<Self> {
71 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
72
73 info!("Started Claude process with model: {}", model);
74 Self::new(child)
75 }
76
77 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
79 let child = builder.spawn().await?;
80 info!("Started Claude process from custom builder");
81 Self::new(child)
82 }
83
84 pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
87 let child = ClaudeCliBuilder::new()
88 .resume(Some(session_uuid.to_string()))
89 .spawn()
90 .await?;
91
92 info!("Resuming Claude session with UUID: {}", session_uuid);
93 let mut client = Self::new(child)?;
94 client.session_uuid = Some(session_uuid);
96 Ok(client)
97 }
98
99 pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
101 let child = ClaudeCliBuilder::new()
102 .model(model)
103 .resume(Some(session_uuid.to_string()))
104 .spawn()
105 .await?;
106
107 info!(
108 "Resuming Claude session with UUID: {} and model: {}",
109 session_uuid, model
110 );
111 let mut client = Self::new(child)?;
112 client.session_uuid = Some(session_uuid);
114 Ok(client)
115 }
116
117 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
120 let session_id = Uuid::new_v4();
121 self.query_with_session(text, session_id).await
122 }
123
124 pub async fn query_with_session(
126 &mut self,
127 text: &str,
128 session_id: Uuid,
129 ) -> Result<Vec<ClaudeOutput>> {
130 let input = ClaudeInput::user_message(text, session_id);
132 self.send(&input).await?;
133
134 let mut responses = Vec::new();
136
137 loop {
138 let output = self.receive().await?;
139 let is_result = matches!(&output, ClaudeOutput::Result(_));
140 responses.push(output);
141
142 if is_result {
143 break;
144 }
145 }
146
147 Ok(responses)
148 }
149
150 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
153 let session_id = Uuid::new_v4();
154 self.query_stream_with_session(text, session_id).await
155 }
156
157 pub async fn query_stream_with_session(
159 &mut self,
160 text: &str,
161 session_id: Uuid,
162 ) -> Result<ResponseStream<'_>> {
163 let input = ClaudeInput::user_message(text, session_id);
165 self.send(&input).await?;
166
167 Ok(ResponseStream {
169 client: self,
170 finished: false,
171 })
172 }
173
174 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
176 let json_line = Protocol::serialize(input)?;
177 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
178
179 self.stdin
180 .write_all(json_line.as_bytes())
181 .await
182 .map_err(Error::Io)?;
183
184 self.stdin.flush().await.map_err(Error::Io)?;
185 Ok(())
186 }
187
188 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
206 let mut line = String::new();
207
208 loop {
209 line.clear();
210 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
211
212 if bytes_read == 0 {
213 return Err(Error::ConnectionClosed);
214 }
215
216 let trimmed = line.trim();
217 if trimmed.is_empty() {
218 continue;
219 }
220
221 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
222
223 match ClaudeOutput::parse_json_tolerant(trimmed) {
225 Ok(output) => {
226 debug!("[INCOMING] Parsed output type: {}", output.message_type());
227
228 if self.session_uuid.is_none() {
230 if let ClaudeOutput::Assistant(ref msg) = output {
231 if let Some(ref uuid_str) = msg.uuid {
232 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
233 debug!("[INCOMING] Captured session UUID: {}", uuid);
234 self.session_uuid = Some(uuid);
235 }
236 }
237 } else if let ClaudeOutput::Result(ref msg) = output {
238 if let Some(ref uuid_str) = msg.uuid {
239 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
240 debug!("[INCOMING] Captured session UUID: {}", uuid);
241 self.session_uuid = Some(uuid);
242 }
243 }
244 }
245 }
246
247 return Ok(output);
248 }
249 Err(parse_error) => {
250 warn!("[INCOMING] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
251 warn!("[INCOMING] Parse error: {}", parse_error.error_message);
252 warn!("[INCOMING] Raw message: {}", trimmed);
253 return Err(parse_error.into());
254 }
255 }
256 }
257 }
258
259 pub fn is_alive(&mut self) -> bool {
261 self.child.try_wait().ok().flatten().is_none()
262 }
263
264 pub async fn shutdown(mut self) -> Result<()> {
266 info!("Shutting down Claude process...");
267 self.child.kill().await.map_err(Error::Io)?;
268 Ok(())
269 }
270
271 pub fn pid(&self) -> Option<u32> {
273 self.child.id()
274 }
275
276 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
278 self.stderr.take()
279 }
280
281 pub fn session_uuid(&self) -> Result<Uuid> {
284 self.session_uuid.ok_or(Error::SessionNotInitialized)
285 }
286
287 pub async fn ping(&mut self) -> bool {
290 let ping_input = ClaudeInput::user_message(
292 "ping - respond with just the word 'pong' and nothing else",
293 self.session_uuid.unwrap_or_else(Uuid::new_v4),
294 );
295
296 if let Err(e) = self.send(&ping_input).await {
298 debug!("Ping failed to send: {}", e);
299 return false;
300 }
301
302 let mut found_pong = false;
304 let mut message_count = 0;
305 const MAX_MESSAGES: usize = 10;
306
307 loop {
308 match self.receive().await {
309 Ok(output) => {
310 message_count += 1;
311
312 if let ClaudeOutput::Assistant(msg) = &output {
314 for content in &msg.message.content {
315 if let ContentBlock::Text(text) = content {
316 if text.text.to_lowercase().contains("pong") {
317 found_pong = true;
318 }
319 }
320 }
321 }
322
323 if matches!(output, ClaudeOutput::Result(_)) {
325 break;
326 }
327
328 if message_count >= MAX_MESSAGES {
330 debug!("Ping exceeded message limit");
331 break;
332 }
333 }
334 Err(e) => {
335 debug!("Ping failed to receive response: {}", e);
336 break;
337 }
338 }
339 }
340
341 found_pong
342 }
343
344 pub async fn enable_tool_approval(&mut self) -> Result<()> {
378 if self.tool_approval_enabled {
379 debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
380 return Ok(());
381 }
382
383 let request_id = format!("init-{}", Uuid::new_v4());
384 let init_request = ControlRequestMessage::initialize(&request_id);
385
386 debug!("[TOOL_APPROVAL] Sending initialization handshake");
387 let json_line = Protocol::serialize(&init_request)?;
388 self.stdin
389 .write_all(json_line.as_bytes())
390 .await
391 .map_err(Error::Io)?;
392 self.stdin.flush().await.map_err(Error::Io)?;
393
394 loop {
396 let mut line = String::new();
397 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
398
399 if bytes_read == 0 {
400 return Err(Error::ConnectionClosed);
401 }
402
403 let trimmed = line.trim();
404 if trimmed.is_empty() {
405 continue;
406 }
407
408 debug!("[TOOL_APPROVAL] Received: {}", trimmed);
409
410 match ClaudeOutput::parse_json_tolerant(trimmed) {
412 Ok(ClaudeOutput::ControlResponse(resp)) => {
413 use crate::io::ControlResponsePayload;
414 match &resp.response {
415 ControlResponsePayload::Success {
416 request_id: rid, ..
417 } if rid == &request_id => {
418 debug!("[TOOL_APPROVAL] Initialization successful");
419 self.tool_approval_enabled = true;
420 return Ok(());
421 }
422 ControlResponsePayload::Error { error, .. } => {
423 return Err(Error::Protocol(format!(
424 "Tool approval initialization failed: {}",
425 error
426 )));
427 }
428 _ => {
429 continue;
431 }
432 }
433 }
434 Ok(_) => {
435 continue;
437 }
438 Err(e) => {
439 return Err(e.into());
440 }
441 }
442 }
443 }
444
445 pub async fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
473 let message: ControlResponseMessage = response.into();
474 let json_line = Protocol::serialize(&message)?;
475 debug!(
476 "[TOOL_APPROVAL] Sending control response: {}",
477 json_line.trim()
478 );
479
480 self.stdin
481 .write_all(json_line.as_bytes())
482 .await
483 .map_err(Error::Io)?;
484 self.stdin.flush().await.map_err(Error::Io)?;
485 Ok(())
486 }
487
488 pub fn is_tool_approval_enabled(&self) -> bool {
490 self.tool_approval_enabled
491 }
492}
493
494pub struct ResponseStream<'a> {
497 client: &'a mut AsyncClient,
498 finished: bool,
499}
500
501impl ResponseStream<'_> {
502 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
504 let mut responses = Vec::new();
505
506 while !self.finished {
507 let output = self.client.receive().await?;
508 let is_result = matches!(&output, ClaudeOutput::Result(_));
509 responses.push(output);
510
511 if is_result {
512 self.finished = true;
513 break;
514 }
515 }
516
517 Ok(responses)
518 }
519
520 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
522 if self.finished {
523 return None;
524 }
525
526 match self.client.receive().await {
527 Ok(output) => {
528 if matches!(&output, ClaudeOutput::Result(_)) {
529 self.finished = true;
530 }
531 Some(Ok(output))
532 }
533 Err(e) => {
534 self.finished = true;
535 Some(Err(e))
536 }
537 }
538 }
539}
540
541impl Drop for AsyncClient {
542 fn drop(&mut self) {
543 if self.is_alive() {
544 if let Err(e) = self.child.start_kill() {
546 error!("Failed to kill Claude process on drop: {}", e);
547 }
548 }
549 }
550}
551
552impl Protocol {
554 pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
556 writer: &mut W,
557 message: &T,
558 ) -> Result<()> {
559 let line = Self::serialize(message)?;
560 debug!("[PROTOCOL] Sending async: {}", line.trim());
561 writer.write_all(line.as_bytes()).await?;
562 writer.flush().await?;
563 Ok(())
564 }
565
566 pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
568 reader: &mut R,
569 ) -> Result<T> {
570 let mut line = String::new();
571 let bytes_read = reader.read_line(&mut line).await?;
572 if bytes_read == 0 {
573 return Err(Error::ConnectionClosed);
574 }
575 debug!("[PROTOCOL] Received async: {}", line.trim());
576 Self::deserialize(&line)
577 }
578}
579
580pub struct AsyncStreamProcessor<R> {
582 reader: AsyncBufReader<R>,
583}
584
585impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
586 pub fn new(reader: R) -> Self {
588 Self {
589 reader: AsyncBufReader::new(reader),
590 }
591 }
592
593 pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
595 Protocol::read_async(&mut self.reader).await
596 }
597
598 pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
600 where
601 T: for<'de> Deserialize<'de>,
602 F: FnMut(T) -> Fut,
603 Fut: std::future::Future<Output = Result<()>>,
604 {
605 loop {
606 match self.next_message().await {
607 Ok(message) => handler(message).await?,
608 Err(Error::ConnectionClosed) => break,
609 Err(e) => return Err(e),
610 }
611 }
612 Ok(())
613 }
614}