claude_codes/client/
async.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12pub struct AsyncClient {
14 child: Child,
15 stdin: ChildStdin,
16 stdout: BufReader<ChildStdout>,
17 stderr: Option<BufReader<ChildStderr>>,
18 session_uuid: Option<Uuid>,
19}
20
21impl AsyncClient {
22 pub fn new(mut child: Child) -> Result<Self> {
24 let stdin = child
25 .stdin
26 .take()
27 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
28
29 let stdout = BufReader::new(
30 child
31 .stdout
32 .take()
33 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
34 );
35
36 let stderr = child.stderr.take().map(BufReader::new);
37
38 Ok(Self {
39 child,
40 stdin,
41 stdout,
42 stderr,
43 session_uuid: None,
44 })
45 }
46
47 pub async fn with_defaults() -> Result<Self> {
49 crate::version::check_claude_version_async().await?;
55 Self::with_model("sonnet").await
56 }
57
58 pub async fn with_model(model: &str) -> Result<Self> {
60 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
61
62 info!("Started Claude process with model: {}", model);
63 Self::new(child)
64 }
65
66 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
68 let child = builder.spawn().await?;
69 info!("Started Claude process from custom builder");
70 Self::new(child)
71 }
72
73 pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
76 let child = ClaudeCliBuilder::new()
77 .resume(Some(session_uuid.to_string()))
78 .spawn()
79 .await?;
80
81 info!("Resuming Claude session with UUID: {}", session_uuid);
82 let mut client = Self::new(child)?;
83 client.session_uuid = Some(session_uuid);
85 Ok(client)
86 }
87
88 pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
90 let child = ClaudeCliBuilder::new()
91 .model(model)
92 .resume(Some(session_uuid.to_string()))
93 .spawn()
94 .await?;
95
96 info!(
97 "Resuming Claude session with UUID: {} and model: {}",
98 session_uuid, model
99 );
100 let mut client = Self::new(child)?;
101 client.session_uuid = Some(session_uuid);
103 Ok(client)
104 }
105
106 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
109 let session_id = Uuid::new_v4();
110 self.query_with_session(text, session_id).await
111 }
112
113 pub async fn query_with_session(
115 &mut self,
116 text: &str,
117 session_id: Uuid,
118 ) -> Result<Vec<ClaudeOutput>> {
119 let input = ClaudeInput::user_message(text, session_id);
121 self.send(&input).await?;
122
123 let mut responses = Vec::new();
125
126 loop {
127 let output = self.receive().await?;
128 let is_result = matches!(&output, ClaudeOutput::Result(_));
129 responses.push(output);
130
131 if is_result {
132 break;
133 }
134 }
135
136 Ok(responses)
137 }
138
139 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
142 let session_id = Uuid::new_v4();
143 self.query_stream_with_session(text, session_id).await
144 }
145
146 pub async fn query_stream_with_session(
148 &mut self,
149 text: &str,
150 session_id: Uuid,
151 ) -> Result<ResponseStream<'_>> {
152 let input = ClaudeInput::user_message(text, session_id);
154 self.send(&input).await?;
155
156 Ok(ResponseStream {
158 client: self,
159 finished: false,
160 })
161 }
162
163 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
165 let json_line = Protocol::serialize(input)?;
166 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
167
168 self.stdin
169 .write_all(json_line.as_bytes())
170 .await
171 .map_err(Error::Io)?;
172
173 self.stdin.flush().await.map_err(Error::Io)?;
174 Ok(())
175 }
176
177 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
179 let mut line = String::new();
180
181 loop {
182 line.clear();
183 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
184
185 if bytes_read == 0 {
186 return Err(Error::ConnectionClosed);
187 }
188
189 let trimmed = line.trim();
190 if trimmed.is_empty() {
191 continue;
192 }
193
194 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
195
196 match ClaudeOutput::parse_json(trimmed) {
198 Ok(output) => {
199 debug!("[INCOMING] Parsed output type: {}", output.message_type());
200
201 if self.session_uuid.is_none() {
203 if let ClaudeOutput::Assistant(ref msg) = output {
204 if let Some(ref uuid_str) = msg.uuid {
205 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
206 debug!("[INCOMING] Captured session UUID: {}", uuid);
207 self.session_uuid = Some(uuid);
208 }
209 }
210 } else if let ClaudeOutput::Result(ref msg) = output {
211 if let Some(ref uuid_str) = msg.uuid {
212 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
213 debug!("[INCOMING] Captured session UUID: {}", uuid);
214 self.session_uuid = Some(uuid);
215 }
216 }
217 }
218 }
219
220 return Ok(output);
221 }
222 Err(parse_error) => {
223 error!("[INCOMING] Failed to deserialize: {}", parse_error);
224 return Err(Error::Deserialization(parse_error.error_message));
226 }
227 }
228 }
229 }
230
231 pub fn is_alive(&mut self) -> bool {
233 self.child.try_wait().ok().flatten().is_none()
234 }
235
236 pub async fn shutdown(mut self) -> Result<()> {
238 info!("Shutting down Claude process...");
239 self.child.kill().await.map_err(Error::Io)?;
240 Ok(())
241 }
242
243 pub fn pid(&self) -> Option<u32> {
245 self.child.id()
246 }
247
248 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
250 self.stderr.take()
251 }
252
253 pub fn session_uuid(&self) -> Result<Uuid> {
256 self.session_uuid.ok_or(Error::SessionNotInitialized)
257 }
258}
259
260pub struct ResponseStream<'a> {
263 client: &'a mut AsyncClient,
264 finished: bool,
265}
266
267impl ResponseStream<'_> {
268 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
270 let mut responses = Vec::new();
271
272 while !self.finished {
273 let output = self.client.receive().await?;
274 let is_result = matches!(&output, ClaudeOutput::Result(_));
275 responses.push(output);
276
277 if is_result {
278 self.finished = true;
279 break;
280 }
281 }
282
283 Ok(responses)
284 }
285
286 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
288 if self.finished {
289 return None;
290 }
291
292 match self.client.receive().await {
293 Ok(output) => {
294 if matches!(&output, ClaudeOutput::Result(_)) {
295 self.finished = true;
296 }
297 Some(Ok(output))
298 }
299 Err(e) => {
300 self.finished = true;
301 Some(Err(e))
302 }
303 }
304 }
305}
306
307impl Drop for AsyncClient {
308 fn drop(&mut self) {
309 if self.is_alive() {
310 if let Err(e) = self.child.start_kill() {
312 error!("Failed to kill Claude process on drop: {}", e);
313 }
314 }
315 }
316}