1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6 ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7 ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, error, info, warn};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufReader as AsyncBufReader};
13use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16pub struct AsyncClient {
18 child: Child,
19 stdin: ChildStdin,
20 stdout: BufReader<ChildStdout>,
21 stderr: Option<BufReader<ChildStderr>>,
22 session_uuid: Option<Uuid>,
23 tool_approval_enabled: bool,
25}
26
27const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
29
30impl AsyncClient {
31 pub fn new(mut child: Child) -> Result<Self> {
33 let stdin = child
34 .stdin
35 .take()
36 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
37
38 let stdout = BufReader::with_capacity(
39 STDOUT_BUFFER_SIZE,
40 child
41 .stdout
42 .take()
43 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
44 );
45
46 let stderr = child.stderr.take().map(BufReader::new);
47
48 Ok(Self {
49 child,
50 stdin,
51 stdout,
52 stderr,
53 session_uuid: None,
54 tool_approval_enabled: false,
55 })
56 }
57
58 pub async fn with_defaults() -> Result<Self> {
60 crate::version::check_claude_version_async().await?;
66 Self::with_model("sonnet").await
67 }
68
69 pub async fn with_model(model: &str) -> Result<Self> {
71 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
72
73 info!("Started Claude process with model: {}", model);
74 Self::new(child)
75 }
76
77 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
79 let child = builder.spawn().await?;
80 info!("Started Claude process from custom builder");
81 Self::new(child)
82 }
83
84 pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
87 let child = ClaudeCliBuilder::new()
88 .resume(Some(session_uuid.to_string()))
89 .spawn()
90 .await?;
91
92 info!("Resuming Claude session with UUID: {}", session_uuid);
93 let mut client = Self::new(child)?;
94 client.session_uuid = Some(session_uuid);
96 Ok(client)
97 }
98
99 pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
101 let child = ClaudeCliBuilder::new()
102 .model(model)
103 .resume(Some(session_uuid.to_string()))
104 .spawn()
105 .await?;
106
107 info!(
108 "Resuming Claude session with UUID: {} and model: {}",
109 session_uuid, model
110 );
111 let mut client = Self::new(child)?;
112 client.session_uuid = Some(session_uuid);
114 Ok(client)
115 }
116
117 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
120 let session_id = Uuid::new_v4();
121 self.query_with_session(text, session_id).await
122 }
123
124 pub async fn query_with_session(
126 &mut self,
127 text: &str,
128 session_id: Uuid,
129 ) -> Result<Vec<ClaudeOutput>> {
130 let input = ClaudeInput::user_message(text, session_id);
132 self.send(&input).await?;
133
134 let mut responses = Vec::new();
136
137 loop {
138 let output = self.receive().await?;
139 let is_result = matches!(&output, ClaudeOutput::Result(_));
140 responses.push(output);
141
142 if is_result {
143 break;
144 }
145 }
146
147 Ok(responses)
148 }
149
150 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
153 let session_id = Uuid::new_v4();
154 self.query_stream_with_session(text, session_id).await
155 }
156
157 pub async fn query_stream_with_session(
159 &mut self,
160 text: &str,
161 session_id: Uuid,
162 ) -> Result<ResponseStream<'_>> {
163 let input = ClaudeInput::user_message(text, session_id);
165 self.send(&input).await?;
166
167 Ok(ResponseStream {
169 client: self,
170 finished: false,
171 })
172 }
173
174 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
176 let json_line = Protocol::serialize(input)?;
177 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
178
179 self.stdin
180 .write_all(json_line.as_bytes())
181 .await
182 .map_err(Error::Io)?;
183
184 self.stdin.flush().await.map_err(Error::Io)?;
185 Ok(())
186 }
187
188 pub async fn interrupt(&mut self) -> Result<()> {
193 self.send(&ClaudeInput::interrupt()).await
194 }
195
196 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
214 let mut line = String::new();
215
216 loop {
217 line.clear();
218 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
219
220 if bytes_read == 0 {
221 return Err(Error::ConnectionClosed);
222 }
223
224 let trimmed = line.trim();
225 if trimmed.is_empty() {
226 continue;
227 }
228
229 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
230
231 match ClaudeOutput::parse_json_tolerant(trimmed) {
233 Ok(output) => {
234 debug!("[INCOMING] Parsed output type: {}", output.message_type());
235
236 if self.session_uuid.is_none() {
238 if let ClaudeOutput::Assistant(ref msg) = output {
239 if let Some(ref uuid_str) = msg.uuid {
240 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
241 debug!("[INCOMING] Captured session UUID: {}", uuid);
242 self.session_uuid = Some(uuid);
243 }
244 }
245 } else if let ClaudeOutput::Result(ref msg) = output {
246 if let Some(ref uuid_str) = msg.uuid {
247 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
248 debug!("[INCOMING] Captured session UUID: {}", uuid);
249 self.session_uuid = Some(uuid);
250 }
251 }
252 }
253 }
254
255 return Ok(output);
256 }
257 Err(parse_error) => {
258 warn!("[INCOMING] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
259 warn!("[INCOMING] Parse error: {}", parse_error.error_message);
260 warn!("[INCOMING] Raw message: {}", trimmed);
261 return Err(parse_error.into());
262 }
263 }
264 }
265 }
266
267 pub fn is_alive(&mut self) -> bool {
269 self.child.try_wait().ok().flatten().is_none()
270 }
271
272 pub async fn shutdown(mut self) -> Result<()> {
274 info!("Shutting down Claude process...");
275 self.child.kill().await.map_err(Error::Io)?;
276 Ok(())
277 }
278
279 pub fn pid(&self) -> Option<u32> {
281 self.child.id()
282 }
283
284 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
286 self.stderr.take()
287 }
288
289 pub fn session_uuid(&self) -> Result<Uuid> {
292 self.session_uuid.ok_or(Error::SessionNotInitialized)
293 }
294
295 pub async fn ping(&mut self) -> bool {
298 let ping_input = ClaudeInput::user_message(
300 "ping - respond with just the word 'pong' and nothing else",
301 self.session_uuid.unwrap_or_else(Uuid::new_v4),
302 );
303
304 if let Err(e) = self.send(&ping_input).await {
306 debug!("Ping failed to send: {}", e);
307 return false;
308 }
309
310 let mut found_pong = false;
312 let mut message_count = 0;
313 const MAX_MESSAGES: usize = 10;
314
315 loop {
316 match self.receive().await {
317 Ok(output) => {
318 message_count += 1;
319
320 if let ClaudeOutput::Assistant(msg) = &output {
322 for content in &msg.message.content {
323 if let ContentBlock::Text(text) = content {
324 if text.text.to_lowercase().contains("pong") {
325 found_pong = true;
326 }
327 }
328 }
329 }
330
331 if matches!(output, ClaudeOutput::Result(_)) {
333 break;
334 }
335
336 if message_count >= MAX_MESSAGES {
338 debug!("Ping exceeded message limit");
339 break;
340 }
341 }
342 Err(e) => {
343 debug!("Ping failed to receive response: {}", e);
344 break;
345 }
346 }
347 }
348
349 found_pong
350 }
351
352 pub async fn enable_tool_approval(&mut self) -> Result<()> {
386 if self.tool_approval_enabled {
387 debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
388 return Ok(());
389 }
390
391 let request_id = format!("init-{}", Uuid::new_v4());
392 let init_request = ControlRequestMessage::initialize(&request_id);
393
394 debug!("[TOOL_APPROVAL] Sending initialization handshake");
395 let json_line = Protocol::serialize(&init_request)?;
396 self.stdin
397 .write_all(json_line.as_bytes())
398 .await
399 .map_err(Error::Io)?;
400 self.stdin.flush().await.map_err(Error::Io)?;
401
402 loop {
404 let mut line = String::new();
405 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
406
407 if bytes_read == 0 {
408 return Err(Error::ConnectionClosed);
409 }
410
411 let trimmed = line.trim();
412 if trimmed.is_empty() {
413 continue;
414 }
415
416 debug!("[TOOL_APPROVAL] Received: {}", trimmed);
417
418 match ClaudeOutput::parse_json_tolerant(trimmed) {
420 Ok(ClaudeOutput::ControlResponse(resp)) => {
421 use crate::io::ControlResponsePayload;
422 match &resp.response {
423 ControlResponsePayload::Success {
424 request_id: rid, ..
425 } if rid == &request_id => {
426 debug!("[TOOL_APPROVAL] Initialization successful");
427 self.tool_approval_enabled = true;
428 return Ok(());
429 }
430 ControlResponsePayload::Error { error, .. } => {
431 return Err(Error::Protocol(format!(
432 "Tool approval initialization failed: {}",
433 error
434 )));
435 }
436 _ => {
437 continue;
439 }
440 }
441 }
442 Ok(_) => {
443 continue;
445 }
446 Err(e) => {
447 return Err(e.into());
448 }
449 }
450 }
451 }
452
453 pub async fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
481 let message: ControlResponseMessage = response.into();
482 let json_line = Protocol::serialize(&message)?;
483 debug!(
484 "[TOOL_APPROVAL] Sending control response: {}",
485 json_line.trim()
486 );
487
488 self.stdin
489 .write_all(json_line.as_bytes())
490 .await
491 .map_err(Error::Io)?;
492 self.stdin.flush().await.map_err(Error::Io)?;
493 Ok(())
494 }
495
496 pub fn is_tool_approval_enabled(&self) -> bool {
498 self.tool_approval_enabled
499 }
500}
501
502pub struct ResponseStream<'a> {
505 client: &'a mut AsyncClient,
506 finished: bool,
507}
508
509impl ResponseStream<'_> {
510 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
512 let mut responses = Vec::new();
513
514 while !self.finished {
515 let output = self.client.receive().await?;
516 let is_result = matches!(&output, ClaudeOutput::Result(_));
517 responses.push(output);
518
519 if is_result {
520 self.finished = true;
521 break;
522 }
523 }
524
525 Ok(responses)
526 }
527
528 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
530 if self.finished {
531 return None;
532 }
533
534 match self.client.receive().await {
535 Ok(output) => {
536 if matches!(&output, ClaudeOutput::Result(_)) {
537 self.finished = true;
538 }
539 Some(Ok(output))
540 }
541 Err(e) => {
542 self.finished = true;
543 Some(Err(e))
544 }
545 }
546 }
547}
548
549impl Drop for AsyncClient {
550 fn drop(&mut self) {
551 if self.is_alive() {
552 if let Err(e) = self.child.start_kill() {
554 error!("Failed to kill Claude process on drop: {}", e);
555 }
556 }
557 }
558}
559
560impl Protocol {
562 pub async fn write_async<W: AsyncWriteExt + Unpin, T: Serialize>(
564 writer: &mut W,
565 message: &T,
566 ) -> Result<()> {
567 let line = Self::serialize(message)?;
568 debug!("[PROTOCOL] Sending async: {}", line.trim());
569 writer.write_all(line.as_bytes()).await?;
570 writer.flush().await?;
571 Ok(())
572 }
573
574 pub async fn read_async<R: AsyncBufReadExt + Unpin, T: for<'de> Deserialize<'de>>(
576 reader: &mut R,
577 ) -> Result<T> {
578 let mut line = String::new();
579 let bytes_read = reader.read_line(&mut line).await?;
580 if bytes_read == 0 {
581 return Err(Error::ConnectionClosed);
582 }
583 debug!("[PROTOCOL] Received async: {}", line.trim());
584 Self::deserialize(&line)
585 }
586}
587
588pub struct AsyncStreamProcessor<R> {
590 reader: AsyncBufReader<R>,
591}
592
593impl<R: tokio::io::AsyncRead + Unpin> AsyncStreamProcessor<R> {
594 pub fn new(reader: R) -> Self {
596 Self {
597 reader: AsyncBufReader::new(reader),
598 }
599 }
600
601 pub async fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
603 Protocol::read_async(&mut self.reader).await
604 }
605
606 pub async fn process_all<T, F, Fut>(&mut self, mut handler: F) -> Result<()>
608 where
609 T: for<'de> Deserialize<'de>,
610 F: FnMut(T) -> Fut,
611 Fut: std::future::Future<Output = Result<()>>,
612 {
613 loop {
614 match self.next_message().await {
615 Ok(message) => handler(message).await?,
616 Err(Error::ConnectionClosed) => break,
617 Err(e) => return Err(e),
618 }
619 }
620 Ok(())
621 }
622}