1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6 ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7 ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, error, info, warn};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
13use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16pub struct AsyncClient {
18 child: Child,
19 stdin: ChildStdin,
20 stdout: BufReader<ChildStdout>,
21 stderr: Option<BufReader<ChildStderr>>,
22 session_uuid: Option<Uuid>,
23 tool_approval_enabled: bool,
25}
26
27const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
29
30impl AsyncClient {
31 pub fn new(mut child: Child) -> Result<Self> {
33 let stdin = child
34 .stdin
35 .take()
36 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
37
38 let stdout = BufReader::with_capacity(
39 STDOUT_BUFFER_SIZE,
40 child
41 .stdout
42 .take()
43 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
44 );
45
46 let stderr = child.stderr.take().map(BufReader::new);
47
48 Ok(Self {
49 child,
50 stdin,
51 stdout,
52 stderr,
53 session_uuid: None,
54 tool_approval_enabled: false,
55 })
56 }
57
58 pub async fn with_defaults() -> Result<Self> {
60 crate::version::check_claude_version_async().await?;
66 Self::with_model("sonnet").await
67 }
68
69 pub async fn with_model(model: &str) -> Result<Self> {
71 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
72
73 info!("Started Claude process with model: {}", model);
74 Self::new(child)
75 }
76
77 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
79 let child = builder.spawn().await?;
80 info!("Started Claude process from custom builder");
81 Self::new(child)
82 }
83
84 pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
87 let child = ClaudeCliBuilder::new()
88 .resume(Some(session_uuid.to_string()))
89 .spawn()
90 .await?;
91
92 info!("Resuming Claude session with UUID: {}", session_uuid);
93 let mut client = Self::new(child)?;
94 client.session_uuid = Some(session_uuid);
96 Ok(client)
97 }
98
99 pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
101 let child = ClaudeCliBuilder::new()
102 .model(model)
103 .resume(Some(session_uuid.to_string()))
104 .spawn()
105 .await?;
106
107 info!(
108 "Resuming Claude session with UUID: {} and model: {}",
109 session_uuid, model
110 );
111 let mut client = Self::new(child)?;
112 client.session_uuid = Some(session_uuid);
114 Ok(client)
115 }
116
117 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
120 let session_id = Uuid::new_v4();
121 self.query_with_session(text, session_id).await
122 }
123
124 pub async fn query_with_session(
126 &mut self,
127 text: &str,
128 session_id: Uuid,
129 ) -> Result<Vec<ClaudeOutput>> {
130 let input = ClaudeInput::user_message(text, session_id);
132 self.send(&input).await?;
133
134 let mut responses = Vec::new();
136
137 loop {
138 let output = self.receive().await?;
139 let is_result = matches!(&output, ClaudeOutput::Result(_));
140 responses.push(output);
141
142 if is_result {
143 break;
144 }
145 }
146
147 Ok(responses)
148 }
149
150 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
153 let session_id = Uuid::new_v4();
154 self.query_stream_with_session(text, session_id).await
155 }
156
157 pub async fn query_stream_with_session(
159 &mut self,
160 text: &str,
161 session_id: Uuid,
162 ) -> Result<ResponseStream<'_>> {
163 let input = ClaudeInput::user_message(text, session_id);
165 self.send(&input).await?;
166
167 Ok(ResponseStream {
169 client: self,
170 finished: false,
171 })
172 }
173
174 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
176 let json_line = Protocol::serialize(input)?;
177 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
178
179 self.stdin
180 .write_all(json_line.as_bytes())
181 .await
182 .map_err(Error::Io)?;
183
184 self.stdin.flush().await.map_err(Error::Io)?;
185 Ok(())
186 }
187
188 pub async fn interrupt(&mut self) -> Result<()> {
193 self.send(&ClaudeInput::interrupt()).await
194 }
195
196 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
214 let trimmed = self.read_frame_line().await?;
215 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
216
217 match ClaudeOutput::parse_json_tolerant(&trimmed) {
219 Ok(output) => {
220 debug!("[INCOMING] Parsed output type: {}", output.message_type());
221
222 if self.session_uuid.is_none() {
224 if let ClaudeOutput::Assistant(ref msg) = output {
225 if let Some(ref uuid_str) = msg.uuid {
226 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
227 debug!("[INCOMING] Captured session UUID: {}", uuid);
228 self.session_uuid = Some(uuid);
229 }
230 }
231 } else if let ClaudeOutput::Result(ref msg) = output {
232 if let Some(ref uuid_str) = msg.uuid {
233 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
234 debug!("[INCOMING] Captured session UUID: {}", uuid);
235 self.session_uuid = Some(uuid);
236 }
237 }
238 }
239 }
240
241 Ok(output)
242 }
243 Err(parse_error) => {
244 warn!("[INCOMING] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
245 warn!("[INCOMING] Parse error: {}", parse_error.error_message);
246 warn!("[INCOMING] Raw message: {}", trimmed);
247 Err(parse_error.into())
248 }
249 }
250 }
251
252 async fn read_frame_line(&mut self) -> Result<String> {
260 let mut line = String::new();
261 loop {
262 line.clear();
263 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
264 if bytes_read == 0 {
265 return Err(Error::ConnectionClosed);
266 }
267 let trimmed = line.trim();
268 if trimmed.is_empty() {
269 continue;
270 }
271 return Ok(trimmed.to_string());
272 }
273 }
274
275 pub async fn receive_raw(&mut self) -> Result<serde_json::Value> {
283 let trimmed = self.read_frame_line().await?;
284 match serde_json::from_str::<serde_json::Value>(&trimmed) {
285 Ok(value) => Ok(value),
286 Err(e) => match trimmed.find('{') {
287 Some(start) => Ok(serde_json::from_str::<serde_json::Value>(&trimmed[start..])
288 .map_err(|_| Error::Json(e))?),
289 None => Err(Error::Json(e)),
290 },
291 }
292 }
293
294 pub fn is_alive(&mut self) -> bool {
296 self.child.try_wait().ok().flatten().is_none()
297 }
298
299 pub async fn shutdown(mut self) -> Result<()> {
301 info!("Shutting down Claude process...");
302 self.child.kill().await.map_err(Error::Io)?;
303 Ok(())
304 }
305
306 pub fn pid(&self) -> Option<u32> {
308 self.child.id()
309 }
310
311 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
313 self.stderr.take()
314 }
315
316 pub fn session_uuid(&self) -> Result<Uuid> {
319 self.session_uuid.ok_or(Error::SessionNotInitialized)
320 }
321
322 pub async fn ping(&mut self) -> bool {
325 let ping_input = ClaudeInput::user_message(
327 "ping - respond with just the word 'pong' and nothing else",
328 self.session_uuid.unwrap_or_else(Uuid::new_v4),
329 );
330
331 if let Err(e) = self.send(&ping_input).await {
333 debug!("Ping failed to send: {}", e);
334 return false;
335 }
336
337 let mut found_pong = false;
339 let mut message_count = 0;
340 const MAX_MESSAGES: usize = 10;
341
342 loop {
343 match self.receive().await {
344 Ok(output) => {
345 message_count += 1;
346
347 if let ClaudeOutput::Assistant(msg) = &output {
349 for content in &msg.message.content {
350 if let ContentBlock::Text(text) = content {
351 if text.text.to_lowercase().contains("pong") {
352 found_pong = true;
353 }
354 }
355 }
356 }
357
358 if matches!(output, ClaudeOutput::Result(_)) {
360 break;
361 }
362
363 if message_count >= MAX_MESSAGES {
365 debug!("Ping exceeded message limit");
366 break;
367 }
368 }
369 Err(e) => {
370 debug!("Ping failed to receive response: {}", e);
371 break;
372 }
373 }
374 }
375
376 found_pong
377 }
378
379 pub async fn enable_tool_approval(&mut self) -> Result<()> {
413 if self.tool_approval_enabled {
414 debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
415 return Ok(());
416 }
417
418 let request_id = format!("init-{}", Uuid::new_v4());
419 let init_request = ControlRequestMessage::initialize(&request_id);
420
421 debug!("[TOOL_APPROVAL] Sending initialization handshake");
422 let json_line = Protocol::serialize(&init_request)?;
423 self.stdin
424 .write_all(json_line.as_bytes())
425 .await
426 .map_err(Error::Io)?;
427 self.stdin.flush().await.map_err(Error::Io)?;
428
429 loop {
431 let mut line = String::new();
432 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
433
434 if bytes_read == 0 {
435 return Err(Error::ConnectionClosed);
436 }
437
438 let trimmed = line.trim();
439 if trimmed.is_empty() {
440 continue;
441 }
442
443 debug!("[TOOL_APPROVAL] Received: {}", trimmed);
444
445 match ClaudeOutput::parse_json_tolerant(trimmed) {
447 Ok(ClaudeOutput::ControlResponse(resp)) => {
448 use crate::io::ControlResponsePayload;
449 match &resp.response {
450 ControlResponsePayload::Success {
451 request_id: rid, ..
452 } if rid == &request_id => {
453 debug!("[TOOL_APPROVAL] Initialization successful");
454 self.tool_approval_enabled = true;
455 return Ok(());
456 }
457 ControlResponsePayload::Error { error, .. } => {
458 return Err(Error::Protocol(format!(
459 "Tool approval initialization failed: {}",
460 error
461 )));
462 }
463 _ => {
464 continue;
466 }
467 }
468 }
469 Ok(_) => {
470 continue;
472 }
473 Err(e) => {
474 return Err(e.into());
475 }
476 }
477 }
478 }
479
480 pub async fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
508 let message: ControlResponseMessage = response.into();
509 let json_line = Protocol::serialize(&message)?;
510 debug!(
511 "[TOOL_APPROVAL] Sending control response: {}",
512 json_line.trim()
513 );
514
515 self.stdin
516 .write_all(json_line.as_bytes())
517 .await
518 .map_err(Error::Io)?;
519 self.stdin.flush().await.map_err(Error::Io)?;
520 Ok(())
521 }
522
523 pub fn is_tool_approval_enabled(&self) -> bool {
525 self.tool_approval_enabled
526 }
527}
528
529pub struct ResponseStream<'a> {
532 client: &'a mut AsyncClient,
533 finished: bool,
534}
535
536impl ResponseStream<'_> {
537 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
539 let mut responses = Vec::new();
540
541 while !self.finished {
542 let output = self.client.receive().await?;
543 let is_result = matches!(&output, ClaudeOutput::Result(_));
544 responses.push(output);
545
546 if is_result {
547 self.finished = true;
548 break;
549 }
550 }
551
552 Ok(responses)
553 }
554
555 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
557 if self.finished {
558 return None;
559 }
560
561 match self.client.receive().await {
562 Ok(output) => {
563 if matches!(&output, ClaudeOutput::Result(_)) {
564 self.finished = true;
565 }
566 Some(Ok(output))
567 }
568 Err(e) => {
569 self.finished = true;
570 Some(Err(e))
571 }
572 }
573 }
574}
575
576impl Drop for AsyncClient {
577 fn drop(&mut self) {
578 if self.is_alive() {
579 if let Err(e) = self.child.start_kill() {
581 error!("Failed to kill Claude process on drop: {}", e);
582 }
583 }
584 }
585}
586
587impl Protocol {
589 pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
591 writer: &mut W,
592 message: &T,
593 ) -> Result<()> {
594 let line = Self::serialize(message)?;
595 debug!("[PROTOCOL] Sending async: {}", line.trim());
596 writer.write_all(line.as_bytes()).await?;
597 writer.flush().await?;
598 Ok(())
599 }
600
601 pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
603 reader: &mut R,
604 ) -> Result<T> {
605 let mut line = String::new();
606 let bytes_read = reader.read_line(&mut line).await?;
607 if bytes_read == 0 {
608 return Err(Error::ConnectionClosed);
609 }
610 debug!("[PROTOCOL] Received async: {}", line.trim());
611 Self::deserialize(&line)
612 }
613}
614
615pub struct AsyncStreamProcessor<R> {
617 reader: AsyncBufReader<R>,
618}
619
620impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
621 pub fn new(reader: R) -> Self {
623 Self {
624 reader: AsyncBufReader::new(reader),
625 }
626 }
627
628 pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
630 Protocol::read_async(&mut self.reader).await
631 }
632
633 pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
635 where
636 T: for<'de> Deserialize<'de>,
637 F: FnMut(T) -> Fut,
638 Fut: std::future::Future<Output = Result<()>>,
639 {
640 loop {
641 match self.next_message().await {
642 Ok(message) => handler(message).await?,
643 Err(Error::ConnectionClosed) => break,
644 Err(e) => return Err(e),
645 }
646 }
647 Ok(())
648 }
649}