claude_codes/client/
async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12/// Asynchronous client for communicating with Claude
13pub struct AsyncClient {
14    child: Child,
15    stdin: ChildStdin,
16    stdout: BufReader<ChildStdout>,
17    stderr: Option<BufReader<ChildStderr>>,
18    session_uuid: Option<Uuid>,
19}
20
21impl AsyncClient {
22    /// Create a new async client from a tokio Child process
23    pub fn new(mut child: Child) -> Result<Self> {
24        let stdin = child
25            .stdin
26            .take()
27            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
28
29        let stdout = BufReader::new(
30            child
31                .stdout
32                .take()
33                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
34        );
35
36        let stderr = child.stderr.take().map(BufReader::new);
37
38        Ok(Self {
39            child,
40            stdin,
41            stdout,
42            stderr,
43            session_uuid: None,
44        })
45    }
46
47    /// Create a client with default settings (using logic from start_claude)
48    pub async fn with_defaults() -> Result<Self> {
49        // Check Claude version (only warns once per session)
50        // NOTE: The claude-codes API is in high flux. If you wish to work around
51        // this version check, you can use AsyncClient::new() directly with:
52        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
53        //   AsyncClient::new(child)
54        crate::version::check_claude_version_async().await?;
55        Self::with_model("sonnet").await
56    }
57
58    /// Create a client with a specific model
59    pub async fn with_model(model: &str) -> Result<Self> {
60        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
61
62        info!("Started Claude process with model: {}", model);
63        Self::new(child)
64    }
65
66    /// Create a client from a custom builder
67    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
68        let child = builder.spawn().await?;
69        info!("Started Claude process from custom builder");
70        Self::new(child)
71    }
72
73    /// Resume a previous session by UUID
74    /// This creates a new client that resumes an existing session
75    pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
76        let child = ClaudeCliBuilder::new()
77            .resume(Some(session_uuid.to_string()))
78            .spawn()
79            .await?;
80
81        info!("Resuming Claude session with UUID: {}", session_uuid);
82        let mut client = Self::new(child)?;
83        // Pre-populate the session UUID since we're resuming
84        client.session_uuid = Some(session_uuid);
85        Ok(client)
86    }
87
88    /// Resume a previous session with a specific model
89    pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
90        let child = ClaudeCliBuilder::new()
91            .model(model)
92            .resume(Some(session_uuid.to_string()))
93            .spawn()
94            .await?;
95
96        info!(
97            "Resuming Claude session with UUID: {} and model: {}",
98            session_uuid, model
99        );
100        let mut client = Self::new(child)?;
101        // Pre-populate the session UUID since we're resuming
102        client.session_uuid = Some(session_uuid);
103        Ok(client)
104    }
105
106    /// Send a query and collect all responses until Result message
107    /// This is the simplified version that collects all responses
108    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
109        let session_id = Uuid::new_v4();
110        self.query_with_session(text, session_id).await
111    }
112
113    /// Send a query with a custom session ID and collect all responses
114    pub async fn query_with_session(
115        &mut self,
116        text: &str,
117        session_id: Uuid,
118    ) -> Result<Vec<ClaudeOutput>> {
119        // Send the query
120        let input = ClaudeInput::user_message(text, session_id);
121        self.send(&input).await?;
122
123        // Collect responses until we get a Result message
124        let mut responses = Vec::new();
125
126        loop {
127            let output = self.receive().await?;
128            let is_result = matches!(&output, ClaudeOutput::Result(_));
129            responses.push(output);
130
131            if is_result {
132                break;
133            }
134        }
135
136        Ok(responses)
137    }
138
139    /// Send a query and return an async iterator over responses
140    /// Returns a stream that yields ClaudeOutput until Result message is received
141    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
142        let session_id = Uuid::new_v4();
143        self.query_stream_with_session(text, session_id).await
144    }
145
146    /// Send a query with session ID and return an async iterator over responses
147    pub async fn query_stream_with_session(
148        &mut self,
149        text: &str,
150        session_id: Uuid,
151    ) -> Result<ResponseStream<'_>> {
152        // Send the query first
153        let input = ClaudeInput::user_message(text, session_id);
154        self.send(&input).await?;
155
156        // Return a stream that will read responses
157        Ok(ResponseStream {
158            client: self,
159            finished: false,
160        })
161    }
162
163    /// Send a ClaudeInput directly
164    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
165        let json_line = Protocol::serialize(input)?;
166        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
167
168        self.stdin
169            .write_all(json_line.as_bytes())
170            .await
171            .map_err(Error::Io)?;
172
173        self.stdin.flush().await.map_err(Error::Io)?;
174        Ok(())
175    }
176
177    /// Try to receive a single response
178    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
179        let mut line = String::new();
180
181        loop {
182            line.clear();
183            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
184
185            if bytes_read == 0 {
186                return Err(Error::ConnectionClosed);
187            }
188
189            let trimmed = line.trim();
190            if trimmed.is_empty() {
191                continue;
192            }
193
194            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
195
196            // Use the parse_json method which returns ParseError
197            match ClaudeOutput::parse_json(trimmed) {
198                Ok(output) => {
199                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
200
201                    // Capture UUID from first response if not already set
202                    if self.session_uuid.is_none() {
203                        if let ClaudeOutput::Assistant(ref msg) = output {
204                            if let Some(ref uuid_str) = msg.uuid {
205                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
206                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
207                                    self.session_uuid = Some(uuid);
208                                }
209                            }
210                        } else if let ClaudeOutput::Result(ref msg) = output {
211                            if let Some(ref uuid_str) = msg.uuid {
212                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
213                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
214                                    self.session_uuid = Some(uuid);
215                                }
216                            }
217                        }
218                    }
219
220                    return Ok(output);
221                }
222                Err(parse_error) => {
223                    error!("[INCOMING] Failed to deserialize: {}", parse_error);
224                    // Convert ParseError to our Error type
225                    return Err(Error::Deserialization(parse_error.error_message));
226                }
227            }
228        }
229    }
230
231    /// Check if the Claude process is still running
232    pub fn is_alive(&mut self) -> bool {
233        self.child.try_wait().ok().flatten().is_none()
234    }
235
236    /// Gracefully shutdown the client
237    pub async fn shutdown(mut self) -> Result<()> {
238        info!("Shutting down Claude process...");
239        self.child.kill().await.map_err(Error::Io)?;
240        Ok(())
241    }
242
243    /// Get the process ID
244    pub fn pid(&self) -> Option<u32> {
245        self.child.id()
246    }
247
248    /// Take the stderr reader (can only be called once)
249    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
250        self.stderr.take()
251    }
252
253    /// Get the session UUID if available
254    /// Returns an error if no response has been received yet
255    pub fn session_uuid(&self) -> Result<Uuid> {
256        self.session_uuid.ok_or(Error::SessionNotInitialized)
257    }
258}
259
260/// A response stream that yields ClaudeOutput messages
261/// Holds a reference to the client to read from
262pub struct ResponseStream<'a> {
263    client: &'a mut AsyncClient,
264    finished: bool,
265}
266
267impl ResponseStream<'_> {
268    /// Convert to a vector by collecting all responses
269    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
270        let mut responses = Vec::new();
271
272        while !self.finished {
273            let output = self.client.receive().await?;
274            let is_result = matches!(&output, ClaudeOutput::Result(_));
275            responses.push(output);
276
277            if is_result {
278                self.finished = true;
279                break;
280            }
281        }
282
283        Ok(responses)
284    }
285
286    /// Get the next response
287    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
288        if self.finished {
289            return None;
290        }
291
292        match self.client.receive().await {
293            Ok(output) => {
294                if matches!(&output, ClaudeOutput::Result(_)) {
295                    self.finished = true;
296                }
297                Some(Ok(output))
298            }
299            Err(e) => {
300                self.finished = true;
301                Some(Err(e))
302            }
303        }
304    }
305}
306
307impl Drop for AsyncClient {
308    fn drop(&mut self) {
309        if self.is_alive() {
310            // Try to kill the process
311            if let Err(e) = self.child.start_kill() {
312                error!("Failed to kill Claude process on drop: {}", e);
313            }
314        }
315    }
316}