claude_codes/client/
async.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12pub struct AsyncClient {
14 child: Child,
15 stdin: ChildStdin,
16 stdout: BufReader<ChildStdout>,
17 stderr: Option<BufReader<ChildStderr>>,
18 session_uuid: Option<Uuid>,
19}
20
21impl AsyncClient {
22 pub fn new(mut child: Child) -> Result<Self> {
24 let stdin = child
25 .stdin
26 .take()
27 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
28
29 let stdout = BufReader::new(
30 child
31 .stdout
32 .take()
33 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
34 );
35
36 let stderr = child.stderr.take().map(BufReader::new);
37
38 Ok(Self {
39 child,
40 stdin,
41 stdout,
42 stderr,
43 session_uuid: None,
44 })
45 }
46
47 pub async fn with_defaults() -> Result<Self> {
49 crate::version::check_claude_version_async().await?;
55 Self::with_model("sonnet").await
56 }
57
58 pub async fn with_model(model: &str) -> Result<Self> {
60 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
61
62 info!("Started Claude process with model: {}", model);
63 Self::new(child)
64 }
65
66 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
68 let child = builder.spawn().await?;
69 info!("Started Claude process from custom builder");
70 Self::new(child)
71 }
72
73 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
76 self.query_with_session(text, "default").await
77 }
78
79 pub async fn query_with_session(
81 &mut self,
82 text: &str,
83 session_id: &str,
84 ) -> Result<Vec<ClaudeOutput>> {
85 let input = ClaudeInput::user_message(text, session_id);
87 self.send(&input).await?;
88
89 let mut responses = Vec::new();
91
92 loop {
93 let output = self.receive().await?;
94 let is_result = matches!(&output, ClaudeOutput::Result(_));
95 responses.push(output);
96
97 if is_result {
98 break;
99 }
100 }
101
102 Ok(responses)
103 }
104
105 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
108 self.query_stream_with_session(text, "default").await
109 }
110
111 pub async fn query_stream_with_session(
113 &mut self,
114 text: &str,
115 session_id: &str,
116 ) -> Result<ResponseStream<'_>> {
117 let input = ClaudeInput::user_message(text, session_id);
119 self.send(&input).await?;
120
121 Ok(ResponseStream {
123 client: self,
124 finished: false,
125 })
126 }
127
128 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
130 let json_line = Protocol::serialize(input)?;
131 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
132
133 self.stdin
134 .write_all(json_line.as_bytes())
135 .await
136 .map_err(Error::Io)?;
137
138 self.stdin.flush().await.map_err(Error::Io)?;
139 Ok(())
140 }
141
142 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
144 let mut line = String::new();
145
146 loop {
147 line.clear();
148 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
149
150 if bytes_read == 0 {
151 return Err(Error::ConnectionClosed);
152 }
153
154 let trimmed = line.trim();
155 if trimmed.is_empty() {
156 continue;
157 }
158
159 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
160
161 match ClaudeOutput::parse_json(trimmed) {
163 Ok(output) => {
164 debug!("[INCOMING] Parsed output type: {}", output.message_type());
165
166 if self.session_uuid.is_none() {
168 if let ClaudeOutput::Assistant(ref msg) = output {
169 if let Some(ref uuid_str) = msg.uuid {
170 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
171 debug!("[INCOMING] Captured session UUID: {}", uuid);
172 self.session_uuid = Some(uuid);
173 }
174 }
175 } else if let ClaudeOutput::Result(ref msg) = output {
176 if let Some(ref uuid_str) = msg.uuid {
177 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
178 debug!("[INCOMING] Captured session UUID: {}", uuid);
179 self.session_uuid = Some(uuid);
180 }
181 }
182 }
183 }
184
185 return Ok(output);
186 }
187 Err(parse_error) => {
188 error!("[INCOMING] Failed to deserialize: {}", parse_error);
189 return Err(Error::Deserialization(parse_error.error_message));
191 }
192 }
193 }
194 }
195
196 pub fn is_alive(&mut self) -> bool {
198 self.child.try_wait().ok().flatten().is_none()
199 }
200
201 pub async fn shutdown(mut self) -> Result<()> {
203 info!("Shutting down Claude process...");
204 self.child.kill().await.map_err(Error::Io)?;
205 Ok(())
206 }
207
208 pub fn pid(&self) -> Option<u32> {
210 self.child.id()
211 }
212
213 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
215 self.stderr.take()
216 }
217
218 pub fn session_uuid(&self) -> Result<Uuid> {
221 self.session_uuid.ok_or(Error::SessionNotInitialized)
222 }
223}
224
225pub struct ResponseStream<'a> {
228 client: &'a mut AsyncClient,
229 finished: bool,
230}
231
232impl ResponseStream<'_> {
233 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
235 let mut responses = Vec::new();
236
237 while !self.finished {
238 let output = self.client.receive().await?;
239 let is_result = matches!(&output, ClaudeOutput::Result(_));
240 responses.push(output);
241
242 if is_result {
243 self.finished = true;
244 break;
245 }
246 }
247
248 Ok(responses)
249 }
250
251 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
253 if self.finished {
254 return None;
255 }
256
257 match self.client.receive().await {
258 Ok(output) => {
259 if matches!(&output, ClaudeOutput::Result(_)) {
260 self.finished = true;
261 }
262 Some(Ok(output))
263 }
264 Err(e) => {
265 self.finished = true;
266 Some(Err(e))
267 }
268 }
269 }
270}
271
272impl Drop for AsyncClient {
273 fn drop(&mut self) {
274 if self.is_alive() {
275 if let Err(e) = self.child.start_kill() {
277 error!("Failed to kill Claude process on drop: {}", e);
278 }
279 }
280 }
281}