claude_codes/client/
async.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput, ContentBlock};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12pub struct AsyncClient {
14 child: Child,
15 stdin: ChildStdin,
16 stdout: BufReader<ChildStdout>,
17 stderr: Option<BufReader<ChildStderr>>,
18 session_uuid: Option<Uuid>,
19}
20
21impl AsyncClient {
22 pub fn new(mut child: Child) -> Result<Self> {
24 let stdin = child
25 .stdin
26 .take()
27 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
28
29 let stdout = BufReader::new(
30 child
31 .stdout
32 .take()
33 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
34 );
35
36 let stderr = child.stderr.take().map(BufReader::new);
37
38 Ok(Self {
39 child,
40 stdin,
41 stdout,
42 stderr,
43 session_uuid: None,
44 })
45 }
46
47 pub async fn with_defaults() -> Result<Self> {
49 crate::version::check_claude_version_async().await?;
55 Self::with_model("sonnet").await
56 }
57
58 pub async fn with_model(model: &str) -> Result<Self> {
60 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
61
62 info!("Started Claude process with model: {}", model);
63 Self::new(child)
64 }
65
66 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
68 let child = builder.spawn().await?;
69 info!("Started Claude process from custom builder");
70 Self::new(child)
71 }
72
73 pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
76 let child = ClaudeCliBuilder::new()
77 .resume(Some(session_uuid.to_string()))
78 .spawn()
79 .await?;
80
81 info!("Resuming Claude session with UUID: {}", session_uuid);
82 let mut client = Self::new(child)?;
83 client.session_uuid = Some(session_uuid);
85 Ok(client)
86 }
87
88 pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
90 let child = ClaudeCliBuilder::new()
91 .model(model)
92 .resume(Some(session_uuid.to_string()))
93 .spawn()
94 .await?;
95
96 info!(
97 "Resuming Claude session with UUID: {} and model: {}",
98 session_uuid, model
99 );
100 let mut client = Self::new(child)?;
101 client.session_uuid = Some(session_uuid);
103 Ok(client)
104 }
105
106 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
109 let session_id = Uuid::new_v4();
110 self.query_with_session(text, session_id).await
111 }
112
113 pub async fn query_with_session(
115 &mut self,
116 text: &str,
117 session_id: Uuid,
118 ) -> Result<Vec<ClaudeOutput>> {
119 let input = ClaudeInput::user_message(text, session_id);
121 self.send(&input).await?;
122
123 let mut responses = Vec::new();
125
126 loop {
127 let output = self.receive().await?;
128 let is_result = matches!(&output, ClaudeOutput::Result(_));
129 responses.push(output);
130
131 if is_result {
132 break;
133 }
134 }
135
136 Ok(responses)
137 }
138
139 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
142 let session_id = Uuid::new_v4();
143 self.query_stream_with_session(text, session_id).await
144 }
145
146 pub async fn query_stream_with_session(
148 &mut self,
149 text: &str,
150 session_id: Uuid,
151 ) -> Result<ResponseStream<'_>> {
152 let input = ClaudeInput::user_message(text, session_id);
154 self.send(&input).await?;
155
156 Ok(ResponseStream {
158 client: self,
159 finished: false,
160 })
161 }
162
163 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
165 let json_line = Protocol::serialize(input)?;
166 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
167
168 self.stdin
169 .write_all(json_line.as_bytes())
170 .await
171 .map_err(Error::Io)?;
172
173 self.stdin.flush().await.map_err(Error::Io)?;
174 Ok(())
175 }
176
177 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
179 let mut line = String::new();
180
181 loop {
182 line.clear();
183 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
184
185 if bytes_read == 0 {
186 return Err(Error::ConnectionClosed);
187 }
188
189 let trimmed = line.trim();
190 if trimmed.is_empty() {
191 continue;
192 }
193
194 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
195
196 match ClaudeOutput::parse_json_tolerant(trimmed) {
198 Ok(output) => {
199 debug!("[INCOMING] Parsed output type: {}", output.message_type());
200
201 if self.session_uuid.is_none() {
203 if let ClaudeOutput::Assistant(ref msg) = output {
204 if let Some(ref uuid_str) = msg.uuid {
205 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
206 debug!("[INCOMING] Captured session UUID: {}", uuid);
207 self.session_uuid = Some(uuid);
208 }
209 }
210 } else if let ClaudeOutput::Result(ref msg) = output {
211 if let Some(ref uuid_str) = msg.uuid {
212 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
213 debug!("[INCOMING] Captured session UUID: {}", uuid);
214 self.session_uuid = Some(uuid);
215 }
216 }
217 }
218 }
219
220 return Ok(output);
221 }
222 Err(parse_error) => {
223 error!("[INCOMING] Failed to deserialize: {}", parse_error);
224 error!("[INCOMING] Raw JSON that failed: {}", trimmed);
225 return Err(Error::Deserialization(format!(
227 "{} (raw: {})",
228 parse_error.error_message, trimmed
229 )));
230 }
231 }
232 }
233 }
234
235 pub fn is_alive(&mut self) -> bool {
237 self.child.try_wait().ok().flatten().is_none()
238 }
239
240 pub async fn shutdown(mut self) -> Result<()> {
242 info!("Shutting down Claude process...");
243 self.child.kill().await.map_err(Error::Io)?;
244 Ok(())
245 }
246
247 pub fn pid(&self) -> Option<u32> {
249 self.child.id()
250 }
251
252 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
254 self.stderr.take()
255 }
256
257 pub fn session_uuid(&self) -> Result<Uuid> {
260 self.session_uuid.ok_or(Error::SessionNotInitialized)
261 }
262
263 pub async fn ping(&mut self) -> bool {
266 let ping_input = ClaudeInput::user_message(
268 "ping - respond with just the word 'pong' and nothing else",
269 self.session_uuid.unwrap_or_else(Uuid::new_v4),
270 );
271
272 if let Err(e) = self.send(&ping_input).await {
274 debug!("Ping failed to send: {}", e);
275 return false;
276 }
277
278 let mut found_pong = false;
280 let mut message_count = 0;
281 const MAX_MESSAGES: usize = 10;
282
283 loop {
284 match self.receive().await {
285 Ok(output) => {
286 message_count += 1;
287
288 if let ClaudeOutput::Assistant(msg) = &output {
290 for content in &msg.message.content {
291 if let ContentBlock::Text(text) = content {
292 if text.text.to_lowercase().contains("pong") {
293 found_pong = true;
294 }
295 }
296 }
297 }
298
299 if matches!(output, ClaudeOutput::Result(_)) {
301 break;
302 }
303
304 if message_count >= MAX_MESSAGES {
306 debug!("Ping exceeded message limit");
307 break;
308 }
309 }
310 Err(e) => {
311 debug!("Ping failed to receive response: {}", e);
312 break;
313 }
314 }
315 }
316
317 found_pong
318 }
319}
320
321pub struct ResponseStream<'a> {
324 client: &'a mut AsyncClient,
325 finished: bool,
326}
327
328impl ResponseStream<'_> {
329 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
331 let mut responses = Vec::new();
332
333 while !self.finished {
334 let output = self.client.receive().await?;
335 let is_result = matches!(&output, ClaudeOutput::Result(_));
336 responses.push(output);
337
338 if is_result {
339 self.finished = true;
340 break;
341 }
342 }
343
344 Ok(responses)
345 }
346
347 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
349 if self.finished {
350 return None;
351 }
352
353 match self.client.receive().await {
354 Ok(output) => {
355 if matches!(&output, ClaudeOutput::Result(_)) {
356 self.finished = true;
357 }
358 Some(Ok(output))
359 }
360 Err(e) => {
361 self.finished = true;
362 Some(Err(e))
363 }
364 }
365 }
366}
367
368impl Drop for AsyncClient {
369 fn drop(&mut self) {
370 if self.is_alive() {
371 if let Err(e) = self.child.start_kill() {
373 error!("Failed to kill Claude process on drop: {}", e);
374 }
375 }
376 }
377}