claude_codes/client/
async.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10
11pub struct AsyncClient {
13 child: Child,
14 stdin: ChildStdin,
15 stdout: BufReader<ChildStdout>,
16 stderr: Option<BufReader<ChildStderr>>,
17}
18
19impl AsyncClient {
20 pub fn new(mut child: Child) -> Result<Self> {
22 let stdin = child
23 .stdin
24 .take()
25 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
26
27 let stdout = BufReader::new(
28 child
29 .stdout
30 .take()
31 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
32 );
33
34 let stderr = child.stderr.take().map(BufReader::new);
35
36 Ok(Self {
37 child,
38 stdin,
39 stdout,
40 stderr,
41 })
42 }
43
44 pub async fn with_defaults() -> Result<Self> {
46 crate::version::check_claude_version_async().await?;
52 Self::with_model("sonnet").await
53 }
54
55 pub async fn with_model(model: &str) -> Result<Self> {
57 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
58
59 info!("Started Claude process with model: {}", model);
60 Self::new(child)
61 }
62
63 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
65 let child = builder.spawn().await?;
66 info!("Started Claude process from custom builder");
67 Self::new(child)
68 }
69
70 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
73 self.query_with_session(text, "default").await
74 }
75
76 pub async fn query_with_session(
78 &mut self,
79 text: &str,
80 session_id: &str,
81 ) -> Result<Vec<ClaudeOutput>> {
82 let input = ClaudeInput::user_message(text, session_id);
84 self.send(&input).await?;
85
86 let mut responses = Vec::new();
88
89 loop {
90 let output = self.receive().await?;
91 let is_result = matches!(&output, ClaudeOutput::Result(_));
92 responses.push(output);
93
94 if is_result {
95 break;
96 }
97 }
98
99 Ok(responses)
100 }
101
102 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
105 self.query_stream_with_session(text, "default").await
106 }
107
108 pub async fn query_stream_with_session(
110 &mut self,
111 text: &str,
112 session_id: &str,
113 ) -> Result<ResponseStream<'_>> {
114 let input = ClaudeInput::user_message(text, session_id);
116 self.send(&input).await?;
117
118 Ok(ResponseStream {
120 client: self,
121 finished: false,
122 })
123 }
124
125 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
127 let json_line = Protocol::serialize(input)?;
128 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
129
130 self.stdin
131 .write_all(json_line.as_bytes())
132 .await
133 .map_err(Error::Io)?;
134
135 self.stdin.flush().await.map_err(Error::Io)?;
136 Ok(())
137 }
138
139 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
141 let mut line = String::new();
142
143 loop {
144 line.clear();
145 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
146
147 if bytes_read == 0 {
148 return Err(Error::ConnectionClosed);
149 }
150
151 let trimmed = line.trim();
152 if trimmed.is_empty() {
153 continue;
154 }
155
156 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
157
158 match ClaudeOutput::parse_json(trimmed) {
160 Ok(output) => {
161 debug!("[INCOMING] Parsed output type: {}", output.message_type());
162 return Ok(output);
163 }
164 Err(parse_error) => {
165 error!("[INCOMING] Failed to deserialize: {}", parse_error);
166 return Err(Error::Deserialization(parse_error.error_message));
168 }
169 }
170 }
171 }
172
173 pub fn is_alive(&mut self) -> bool {
175 self.child.try_wait().ok().flatten().is_none()
176 }
177
178 pub async fn shutdown(mut self) -> Result<()> {
180 info!("Shutting down Claude process...");
181 self.child.kill().await.map_err(Error::Io)?;
182 Ok(())
183 }
184
185 pub fn pid(&self) -> Option<u32> {
187 self.child.id()
188 }
189
190 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
192 self.stderr.take()
193 }
194}
195
196pub struct ResponseStream<'a> {
199 client: &'a mut AsyncClient,
200 finished: bool,
201}
202
203impl ResponseStream<'_> {
204 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
206 let mut responses = Vec::new();
207
208 while !self.finished {
209 let output = self.client.receive().await?;
210 let is_result = matches!(&output, ClaudeOutput::Result(_));
211 responses.push(output);
212
213 if is_result {
214 self.finished = true;
215 break;
216 }
217 }
218
219 Ok(responses)
220 }
221
222 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
224 if self.finished {
225 return None;
226 }
227
228 match self.client.receive().await {
229 Ok(output) => {
230 if matches!(&output, ClaudeOutput::Result(_)) {
231 self.finished = true;
232 }
233 Some(Ok(output))
234 }
235 Err(e) => {
236 self.finished = true;
237 Some(Err(e))
238 }
239 }
240 }
241}
242
243impl Drop for AsyncClient {
244 fn drop(&mut self) {
245 if self.is_alive() {
246 if let Err(e) = self.child.start_kill() {
248 error!("Failed to kill Claude process on drop: {}", e);
249 }
250 }
251 }
252}