claude_codes/client/
async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12/// Asynchronous client for communicating with Claude
13pub struct AsyncClient {
14    child: Child,
15    stdin: ChildStdin,
16    stdout: BufReader<ChildStdout>,
17    stderr: Option<BufReader<ChildStderr>>,
18    session_uuid: Option<Uuid>,
19}
20
21impl AsyncClient {
22    /// Create a new async client from a tokio Child process
23    pub fn new(mut child: Child) -> Result<Self> {
24        let stdin = child
25            .stdin
26            .take()
27            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
28
29        let stdout = BufReader::new(
30            child
31                .stdout
32                .take()
33                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
34        );
35
36        let stderr = child.stderr.take().map(BufReader::new);
37
38        Ok(Self {
39            child,
40            stdin,
41            stdout,
42            stderr,
43            session_uuid: None,
44        })
45    }
46
47    /// Create a client with default settings (using logic from start_claude)
48    pub async fn with_defaults() -> Result<Self> {
49        // Check Claude version (only warns once per session)
50        // NOTE: The claude-codes API is in high flux. If you wish to work around
51        // this version check, you can use AsyncClient::new() directly with:
52        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
53        //   AsyncClient::new(child)
54        crate::version::check_claude_version_async().await?;
55        Self::with_model("sonnet").await
56    }
57
58    /// Create a client with a specific model
59    pub async fn with_model(model: &str) -> Result<Self> {
60        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
61
62        info!("Started Claude process with model: {}", model);
63        Self::new(child)
64    }
65
66    /// Create a client from a custom builder
67    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
68        let child = builder.spawn().await?;
69        info!("Started Claude process from custom builder");
70        Self::new(child)
71    }
72
73    /// Send a query and collect all responses until Result message
74    /// This is the simplified version that collects all responses
75    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
76        self.query_with_session(text, "default").await
77    }
78
79    /// Send a query with a custom session ID and collect all responses
80    pub async fn query_with_session(
81        &mut self,
82        text: &str,
83        session_id: &str,
84    ) -> Result<Vec<ClaudeOutput>> {
85        // Send the query
86        let input = ClaudeInput::user_message(text, session_id);
87        self.send(&input).await?;
88
89        // Collect responses until we get a Result message
90        let mut responses = Vec::new();
91
92        loop {
93            let output = self.receive().await?;
94            let is_result = matches!(&output, ClaudeOutput::Result(_));
95            responses.push(output);
96
97            if is_result {
98                break;
99            }
100        }
101
102        Ok(responses)
103    }
104
105    /// Send a query and return an async iterator over responses
106    /// Returns a stream that yields ClaudeOutput until Result message is received
107    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
108        self.query_stream_with_session(text, "default").await
109    }
110
111    /// Send a query with session ID and return an async iterator over responses
112    pub async fn query_stream_with_session(
113        &mut self,
114        text: &str,
115        session_id: &str,
116    ) -> Result<ResponseStream<'_>> {
117        // Send the query first
118        let input = ClaudeInput::user_message(text, session_id);
119        self.send(&input).await?;
120
121        // Return a stream that will read responses
122        Ok(ResponseStream {
123            client: self,
124            finished: false,
125        })
126    }
127
128    /// Send a ClaudeInput directly
129    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
130        let json_line = Protocol::serialize(input)?;
131        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
132
133        self.stdin
134            .write_all(json_line.as_bytes())
135            .await
136            .map_err(Error::Io)?;
137
138        self.stdin.flush().await.map_err(Error::Io)?;
139        Ok(())
140    }
141
142    /// Try to receive a single response
143    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
144        let mut line = String::new();
145
146        loop {
147            line.clear();
148            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
149
150            if bytes_read == 0 {
151                return Err(Error::ConnectionClosed);
152            }
153
154            let trimmed = line.trim();
155            if trimmed.is_empty() {
156                continue;
157            }
158
159            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
160
161            // Use the parse_json method which returns ParseError
162            match ClaudeOutput::parse_json(trimmed) {
163                Ok(output) => {
164                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
165
166                    // Capture UUID from first response if not already set
167                    if self.session_uuid.is_none() {
168                        if let ClaudeOutput::Assistant(ref msg) = output {
169                            if let Some(ref uuid_str) = msg.uuid {
170                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
171                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
172                                    self.session_uuid = Some(uuid);
173                                }
174                            }
175                        } else if let ClaudeOutput::Result(ref msg) = output {
176                            if let Some(ref uuid_str) = msg.uuid {
177                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
178                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
179                                    self.session_uuid = Some(uuid);
180                                }
181                            }
182                        }
183                    }
184
185                    return Ok(output);
186                }
187                Err(parse_error) => {
188                    error!("[INCOMING] Failed to deserialize: {}", parse_error);
189                    // Convert ParseError to our Error type
190                    return Err(Error::Deserialization(parse_error.error_message));
191                }
192            }
193        }
194    }
195
196    /// Check if the Claude process is still running
197    pub fn is_alive(&mut self) -> bool {
198        self.child.try_wait().ok().flatten().is_none()
199    }
200
201    /// Gracefully shutdown the client
202    pub async fn shutdown(mut self) -> Result<()> {
203        info!("Shutting down Claude process...");
204        self.child.kill().await.map_err(Error::Io)?;
205        Ok(())
206    }
207
208    /// Get the process ID
209    pub fn pid(&self) -> Option<u32> {
210        self.child.id()
211    }
212
213    /// Take the stderr reader (can only be called once)
214    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
215        self.stderr.take()
216    }
217
218    /// Get the session UUID if available
219    /// Returns an error if no response has been received yet
220    pub fn session_uuid(&self) -> Result<Uuid> {
221        self.session_uuid.ok_or(Error::SessionNotInitialized)
222    }
223}
224
225/// A response stream that yields ClaudeOutput messages
226/// Holds a reference to the client to read from
227pub struct ResponseStream<'a> {
228    client: &'a mut AsyncClient,
229    finished: bool,
230}
231
232impl ResponseStream<'_> {
233    /// Convert to a vector by collecting all responses
234    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
235        let mut responses = Vec::new();
236
237        while !self.finished {
238            let output = self.client.receive().await?;
239            let is_result = matches!(&output, ClaudeOutput::Result(_));
240            responses.push(output);
241
242            if is_result {
243                self.finished = true;
244                break;
245            }
246        }
247
248        Ok(responses)
249    }
250
251    /// Get the next response
252    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
253        if self.finished {
254            return None;
255        }
256
257        match self.client.receive().await {
258            Ok(output) => {
259                if matches!(&output, ClaudeOutput::Result(_)) {
260                    self.finished = true;
261                }
262                Some(Ok(output))
263            }
264            Err(e) => {
265                self.finished = true;
266                Some(Err(e))
267            }
268        }
269    }
270}
271
272impl Drop for AsyncClient {
273    fn drop(&mut self) {
274        if self.is_alive() {
275            // Try to kill the process
276            if let Err(e) = self.child.start_kill() {
277                error!("Failed to kill Claude process on drop: {}", e);
278            }
279        }
280    }
281}