claude_codes/client/
async.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10
11pub struct AsyncClient {
13 child: Child,
14 stdin: ChildStdin,
15 stdout: BufReader<ChildStdout>,
16 stderr: Option<BufReader<ChildStderr>>,
17}
18
19impl AsyncClient {
20 pub fn new(mut child: Child) -> Result<Self> {
22 let stdin = child
23 .stdin
24 .take()
25 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
26
27 let stdout = BufReader::new(
28 child
29 .stdout
30 .take()
31 .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
32 );
33
34 let stderr = child.stderr.take().map(BufReader::new);
35
36 Ok(Self {
37 child,
38 stdin,
39 stdout,
40 stderr,
41 })
42 }
43
44 pub async fn default() -> Result<Self> {
46 Self::with_model("sonnet").await
47 }
48
49 pub async fn with_model(model: &str) -> Result<Self> {
51 let child = ClaudeCliBuilder::new().model(model).spawn().await?;
52
53 info!("Started Claude process with model: {}", model);
54 Self::new(child)
55 }
56
57 pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
59 let child = builder.spawn().await?;
60 info!("Started Claude process from custom builder");
61 Self::new(child)
62 }
63
64 pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
67 self.query_with_session(text, "default").await
68 }
69
70 pub async fn query_with_session(
72 &mut self,
73 text: &str,
74 session_id: &str,
75 ) -> Result<Vec<ClaudeOutput>> {
76 let input = ClaudeInput::user_message(text, session_id);
78 self.send(&input).await?;
79
80 let mut responses = Vec::new();
82
83 loop {
84 let output = self.receive().await?;
85 let is_result = matches!(&output, ClaudeOutput::Result(_));
86 responses.push(output);
87
88 if is_result {
89 break;
90 }
91 }
92
93 Ok(responses)
94 }
95
96 pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
99 self.query_stream_with_session(text, "default").await
100 }
101
102 pub async fn query_stream_with_session(
104 &mut self,
105 text: &str,
106 session_id: &str,
107 ) -> Result<ResponseStream<'_>> {
108 let input = ClaudeInput::user_message(text, session_id);
110 self.send(&input).await?;
111
112 Ok(ResponseStream {
114 client: self,
115 finished: false,
116 })
117 }
118
119 pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
121 let json_line = Protocol::serialize(input)?;
122 debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
123
124 self.stdin
125 .write_all(json_line.as_bytes())
126 .await
127 .map_err(Error::Io)?;
128
129 self.stdin.flush().await.map_err(Error::Io)?;
130 Ok(())
131 }
132
133 pub async fn receive(&mut self) -> Result<ClaudeOutput> {
135 let mut line = String::new();
136
137 loop {
138 line.clear();
139 let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
140
141 if bytes_read == 0 {
142 return Err(Error::ConnectionClosed);
143 }
144
145 let trimmed = line.trim();
146 if trimmed.is_empty() {
147 continue;
148 }
149
150 debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
151
152 match ClaudeOutput::parse_json(trimmed) {
154 Ok(output) => {
155 debug!("[INCOMING] Parsed output type: {}", output.message_type());
156 return Ok(output);
157 }
158 Err(parse_error) => {
159 error!("[INCOMING] Failed to deserialize: {}", parse_error);
160 return Err(Error::Deserialization(parse_error.error_message));
162 }
163 }
164 }
165 }
166
167 pub fn is_alive(&mut self) -> bool {
169 self.child.try_wait().ok().flatten().is_none()
170 }
171
172 pub async fn shutdown(mut self) -> Result<()> {
174 info!("Shutting down Claude process...");
175 self.child.kill().await.map_err(Error::Io)?;
176 Ok(())
177 }
178
179 pub fn pid(&self) -> Option<u32> {
181 self.child.id()
182 }
183
184 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
186 self.stderr.take()
187 }
188}
189
190pub struct ResponseStream<'a> {
193 client: &'a mut AsyncClient,
194 finished: bool,
195}
196
197impl ResponseStream<'_> {
198 pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
200 let mut responses = Vec::new();
201
202 while !self.finished {
203 let output = self.client.receive().await?;
204 let is_result = matches!(&output, ClaudeOutput::Result(_));
205 responses.push(output);
206
207 if is_result {
208 self.finished = true;
209 break;
210 }
211 }
212
213 Ok(responses)
214 }
215
216 pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
218 if self.finished {
219 return None;
220 }
221
222 match self.client.receive().await {
223 Ok(output) => {
224 if matches!(&output, ClaudeOutput::Result(_)) {
225 self.finished = true;
226 }
227 Some(Ok(output))
228 }
229 Err(e) => {
230 self.finished = true;
231 Some(Err(e))
232 }
233 }
234 }
235}
236
237impl Drop for AsyncClient {
238 fn drop(&mut self) {
239 if self.is_alive() {
240 if let Err(e) = self.child.start_kill() {
242 error!("Failed to kill Claude process on drop: {}", e);
243 }
244 }
245 }
246}