claude_codes/client/
async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput, ContentBlock};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10use uuid::Uuid;
11
12/// Asynchronous client for communicating with Claude
13pub struct AsyncClient {
14    child: Child,
15    stdin: ChildStdin,
16    stdout: BufReader<ChildStdout>,
17    stderr: Option<BufReader<ChildStderr>>,
18    session_uuid: Option<Uuid>,
19}
20
21impl AsyncClient {
22    /// Create a new async client from a tokio Child process
23    pub fn new(mut child: Child) -> Result<Self> {
24        let stdin = child
25            .stdin
26            .take()
27            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
28
29        let stdout = BufReader::new(
30            child
31                .stdout
32                .take()
33                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
34        );
35
36        let stderr = child.stderr.take().map(BufReader::new);
37
38        Ok(Self {
39            child,
40            stdin,
41            stdout,
42            stderr,
43            session_uuid: None,
44        })
45    }
46
47    /// Create a client with default settings (using logic from start_claude)
48    pub async fn with_defaults() -> Result<Self> {
49        // Check Claude version (only warns once per session)
50        // NOTE: The claude-codes API is in high flux. If you wish to work around
51        // this version check, you can use AsyncClient::new() directly with:
52        //   let child = ClaudeCliBuilder::new().model("sonnet").spawn().await?;
53        //   AsyncClient::new(child)
54        crate::version::check_claude_version_async().await?;
55        Self::with_model("sonnet").await
56    }
57
58    /// Create a client with a specific model
59    pub async fn with_model(model: &str) -> Result<Self> {
60        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
61
62        info!("Started Claude process with model: {}", model);
63        Self::new(child)
64    }
65
66    /// Create a client from a custom builder
67    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
68        let child = builder.spawn().await?;
69        info!("Started Claude process from custom builder");
70        Self::new(child)
71    }
72
73    /// Resume a previous session by UUID
74    /// This creates a new client that resumes an existing session
75    pub async fn resume_session(session_uuid: Uuid) -> Result<Self> {
76        let child = ClaudeCliBuilder::new()
77            .resume(Some(session_uuid.to_string()))
78            .spawn()
79            .await?;
80
81        info!("Resuming Claude session with UUID: {}", session_uuid);
82        let mut client = Self::new(child)?;
83        // Pre-populate the session UUID since we're resuming
84        client.session_uuid = Some(session_uuid);
85        Ok(client)
86    }
87
88    /// Resume a previous session with a specific model
89    pub async fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
90        let child = ClaudeCliBuilder::new()
91            .model(model)
92            .resume(Some(session_uuid.to_string()))
93            .spawn()
94            .await?;
95
96        info!(
97            "Resuming Claude session with UUID: {} and model: {}",
98            session_uuid, model
99        );
100        let mut client = Self::new(child)?;
101        // Pre-populate the session UUID since we're resuming
102        client.session_uuid = Some(session_uuid);
103        Ok(client)
104    }
105
106    /// Send a query and collect all responses until Result message
107    /// This is the simplified version that collects all responses
108    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
109        let session_id = Uuid::new_v4();
110        self.query_with_session(text, session_id).await
111    }
112
113    /// Send a query with a custom session ID and collect all responses
114    pub async fn query_with_session(
115        &mut self,
116        text: &str,
117        session_id: Uuid,
118    ) -> Result<Vec<ClaudeOutput>> {
119        // Send the query
120        let input = ClaudeInput::user_message(text, session_id);
121        self.send(&input).await?;
122
123        // Collect responses until we get a Result message
124        let mut responses = Vec::new();
125
126        loop {
127            let output = self.receive().await?;
128            let is_result = matches!(&output, ClaudeOutput::Result(_));
129            responses.push(output);
130
131            if is_result {
132                break;
133            }
134        }
135
136        Ok(responses)
137    }
138
139    /// Send a query and return an async iterator over responses
140    /// Returns a stream that yields ClaudeOutput until Result message is received
141    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
142        let session_id = Uuid::new_v4();
143        self.query_stream_with_session(text, session_id).await
144    }
145
146    /// Send a query with session ID and return an async iterator over responses
147    pub async fn query_stream_with_session(
148        &mut self,
149        text: &str,
150        session_id: Uuid,
151    ) -> Result<ResponseStream<'_>> {
152        // Send the query first
153        let input = ClaudeInput::user_message(text, session_id);
154        self.send(&input).await?;
155
156        // Return a stream that will read responses
157        Ok(ResponseStream {
158            client: self,
159            finished: false,
160        })
161    }
162
163    /// Send a ClaudeInput directly
164    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
165        let json_line = Protocol::serialize(input)?;
166        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
167
168        self.stdin
169            .write_all(json_line.as_bytes())
170            .await
171            .map_err(Error::Io)?;
172
173        self.stdin.flush().await.map_err(Error::Io)?;
174        Ok(())
175    }
176
177    /// Try to receive a single response
178    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
179        let mut line = String::new();
180
181        loop {
182            line.clear();
183            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
184
185            if bytes_read == 0 {
186                return Err(Error::ConnectionClosed);
187            }
188
189            let trimmed = line.trim();
190            if trimmed.is_empty() {
191                continue;
192            }
193
194            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
195
196            // Use the parse_json_tolerant method which handles ANSI escape codes
197            match ClaudeOutput::parse_json_tolerant(trimmed) {
198                Ok(output) => {
199                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
200
201                    // Capture UUID from first response if not already set
202                    if self.session_uuid.is_none() {
203                        if let ClaudeOutput::Assistant(ref msg) = output {
204                            if let Some(ref uuid_str) = msg.uuid {
205                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
206                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
207                                    self.session_uuid = Some(uuid);
208                                }
209                            }
210                        } else if let ClaudeOutput::Result(ref msg) = output {
211                            if let Some(ref uuid_str) = msg.uuid {
212                                if let Ok(uuid) = Uuid::parse_str(uuid_str) {
213                                    debug!("[INCOMING] Captured session UUID: {}", uuid);
214                                    self.session_uuid = Some(uuid);
215                                }
216                            }
217                        }
218                    }
219
220                    return Ok(output);
221                }
222                Err(parse_error) => {
223                    error!("[INCOMING] Failed to deserialize: {}", parse_error);
224                    error!("[INCOMING] Raw JSON that failed: {}", trimmed);
225                    // Convert ParseError to our Error type
226                    return Err(Error::Deserialization(format!(
227                        "{} (raw: {})",
228                        parse_error.error_message, trimmed
229                    )));
230                }
231            }
232        }
233    }
234
235    /// Check if the Claude process is still running
236    pub fn is_alive(&mut self) -> bool {
237        self.child.try_wait().ok().flatten().is_none()
238    }
239
240    /// Gracefully shutdown the client
241    pub async fn shutdown(mut self) -> Result<()> {
242        info!("Shutting down Claude process...");
243        self.child.kill().await.map_err(Error::Io)?;
244        Ok(())
245    }
246
247    /// Get the process ID
248    pub fn pid(&self) -> Option<u32> {
249        self.child.id()
250    }
251
252    /// Take the stderr reader (can only be called once)
253    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
254        self.stderr.take()
255    }
256
257    /// Get the session UUID if available
258    /// Returns an error if no response has been received yet
259    pub fn session_uuid(&self) -> Result<Uuid> {
260        self.session_uuid.ok_or(Error::SessionNotInitialized)
261    }
262
263    /// Test if the Claude connection is working by sending a ping message
264    /// Returns true if Claude responds with "pong", false otherwise
265    pub async fn ping(&mut self) -> bool {
266        // Send a simple ping request
267        let ping_input = ClaudeInput::user_message(
268            "ping - respond with just the word 'pong' and nothing else",
269            self.session_uuid.unwrap_or_else(Uuid::new_v4),
270        );
271
272        // Try to send the ping
273        if let Err(e) = self.send(&ping_input).await {
274            debug!("Ping failed to send: {}", e);
275            return false;
276        }
277
278        // Try to receive responses until we get a result or error
279        let mut found_pong = false;
280        let mut message_count = 0;
281        const MAX_MESSAGES: usize = 10;
282
283        loop {
284            match self.receive().await {
285                Ok(output) => {
286                    message_count += 1;
287
288                    // Check if it's an assistant message containing "pong"
289                    if let ClaudeOutput::Assistant(msg) = &output {
290                        for content in &msg.message.content {
291                            if let ContentBlock::Text(text) = content {
292                                if text.text.to_lowercase().contains("pong") {
293                                    found_pong = true;
294                                }
295                            }
296                        }
297                    }
298
299                    // Stop on result message
300                    if matches!(output, ClaudeOutput::Result(_)) {
301                        break;
302                    }
303
304                    // Safety limit
305                    if message_count >= MAX_MESSAGES {
306                        debug!("Ping exceeded message limit");
307                        break;
308                    }
309                }
310                Err(e) => {
311                    debug!("Ping failed to receive response: {}", e);
312                    break;
313                }
314            }
315        }
316
317        found_pong
318    }
319}
320
321/// A response stream that yields ClaudeOutput messages
322/// Holds a reference to the client to read from
323pub struct ResponseStream<'a> {
324    client: &'a mut AsyncClient,
325    finished: bool,
326}
327
328impl ResponseStream<'_> {
329    /// Convert to a vector by collecting all responses
330    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
331        let mut responses = Vec::new();
332
333        while !self.finished {
334            let output = self.client.receive().await?;
335            let is_result = matches!(&output, ClaudeOutput::Result(_));
336            responses.push(output);
337
338            if is_result {
339                self.finished = true;
340                break;
341            }
342        }
343
344        Ok(responses)
345    }
346
347    /// Get the next response
348    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
349        if self.finished {
350            return None;
351        }
352
353        match self.client.receive().await {
354            Ok(output) => {
355                if matches!(&output, ClaudeOutput::Result(_)) {
356                    self.finished = true;
357                }
358                Some(Ok(output))
359            }
360            Err(e) => {
361                self.finished = true;
362                Some(Err(e))
363            }
364        }
365    }
366}
367
368impl Drop for AsyncClient {
369    fn drop(&mut self) {
370        if self.is_alive() {
371            // Try to kill the process
372            if let Err(e) = self.child.start_kill() {
373                error!("Failed to kill Claude process on drop: {}", e);
374            }
375        }
376    }
377}