claude_codes/client/
async.rs

1//! Asynchronous client for Claude communication
2
3use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput};
6use crate::protocol::Protocol;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout};
9use tracing::{debug, error, info};
10
11/// Asynchronous client for communicating with Claude
12pub struct AsyncClient {
13    child: Child,
14    stdin: ChildStdin,
15    stdout: BufReader<ChildStdout>,
16    stderr: Option<BufReader<ChildStderr>>,
17}
18
19impl AsyncClient {
20    /// Create a new async client from a tokio Child process
21    pub fn new(mut child: Child) -> Result<Self> {
22        let stdin = child
23            .stdin
24            .take()
25            .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdin handle")))?;
26
27        let stdout = BufReader::new(
28            child
29                .stdout
30                .take()
31                .ok_or_else(|| Error::Io(std::io::Error::other("Failed to get stdout handle")))?,
32        );
33
34        let stderr = child.stderr.take().map(BufReader::new);
35
36        Ok(Self {
37            child,
38            stdin,
39            stdout,
40            stderr,
41        })
42    }
43
44    /// Create a client with default settings (using logic from start_claude)
45    pub async fn default() -> Result<Self> {
46        Self::with_model("sonnet").await
47    }
48
49    /// Create a client with a specific model
50    pub async fn with_model(model: &str) -> Result<Self> {
51        let child = ClaudeCliBuilder::new().model(model).spawn().await?;
52
53        info!("Started Claude process with model: {}", model);
54        Self::new(child)
55    }
56
57    /// Create a client from a custom builder
58    pub async fn from_builder(builder: ClaudeCliBuilder) -> Result<Self> {
59        let child = builder.spawn().await?;
60        info!("Started Claude process from custom builder");
61        Self::new(child)
62    }
63
64    /// Send a query and collect all responses until Result message
65    /// This is the simplified version that collects all responses
66    pub async fn query(&mut self, text: &str) -> Result<Vec<ClaudeOutput>> {
67        self.query_with_session(text, "default").await
68    }
69
70    /// Send a query with a custom session ID and collect all responses
71    pub async fn query_with_session(
72        &mut self,
73        text: &str,
74        session_id: &str,
75    ) -> Result<Vec<ClaudeOutput>> {
76        // Send the query
77        let input = ClaudeInput::user_message(text, session_id);
78        self.send(&input).await?;
79
80        // Collect responses until we get a Result message
81        let mut responses = Vec::new();
82
83        loop {
84            let output = self.receive().await?;
85            let is_result = matches!(&output, ClaudeOutput::Result(_));
86            responses.push(output);
87
88            if is_result {
89                break;
90            }
91        }
92
93        Ok(responses)
94    }
95
96    /// Send a query and return an async iterator over responses
97    /// Returns a stream that yields ClaudeOutput until Result message is received
98    pub async fn query_stream(&mut self, text: &str) -> Result<ResponseStream<'_>> {
99        self.query_stream_with_session(text, "default").await
100    }
101
102    /// Send a query with session ID and return an async iterator over responses
103    pub async fn query_stream_with_session(
104        &mut self,
105        text: &str,
106        session_id: &str,
107    ) -> Result<ResponseStream<'_>> {
108        // Send the query first
109        let input = ClaudeInput::user_message(text, session_id);
110        self.send(&input).await?;
111
112        // Return a stream that will read responses
113        Ok(ResponseStream {
114            client: self,
115            finished: false,
116        })
117    }
118
119    /// Send a ClaudeInput directly
120    pub async fn send(&mut self, input: &ClaudeInput) -> Result<()> {
121        let json_line = Protocol::serialize(input)?;
122        debug!("[OUTGOING] Sending JSON to Claude: {}", json_line.trim());
123
124        self.stdin
125            .write_all(json_line.as_bytes())
126            .await
127            .map_err(Error::Io)?;
128
129        self.stdin.flush().await.map_err(Error::Io)?;
130        Ok(())
131    }
132
133    /// Try to receive a single response
134    pub async fn receive(&mut self) -> Result<ClaudeOutput> {
135        let mut line = String::new();
136
137        loop {
138            line.clear();
139            let bytes_read = self.stdout.read_line(&mut line).await.map_err(Error::Io)?;
140
141            if bytes_read == 0 {
142                return Err(Error::ConnectionClosed);
143            }
144
145            let trimmed = line.trim();
146            if trimmed.is_empty() {
147                continue;
148            }
149
150            debug!("[INCOMING] Received JSON from Claude: {}", trimmed);
151
152            // Use the parse_json method which returns ParseError
153            match ClaudeOutput::parse_json(trimmed) {
154                Ok(output) => {
155                    debug!("[INCOMING] Parsed output type: {}", output.message_type());
156                    return Ok(output);
157                }
158                Err(parse_error) => {
159                    error!("[INCOMING] Failed to deserialize: {}", parse_error);
160                    // Convert ParseError to our Error type
161                    return Err(Error::Deserialization(parse_error.error_message));
162                }
163            }
164        }
165    }
166
167    /// Check if the Claude process is still running
168    pub fn is_alive(&mut self) -> bool {
169        self.child.try_wait().ok().flatten().is_none()
170    }
171
172    /// Gracefully shutdown the client
173    pub async fn shutdown(mut self) -> Result<()> {
174        info!("Shutting down Claude process...");
175        self.child.kill().await.map_err(Error::Io)?;
176        Ok(())
177    }
178
179    /// Get the process ID
180    pub fn pid(&self) -> Option<u32> {
181        self.child.id()
182    }
183
184    /// Take the stderr reader (can only be called once)
185    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
186        self.stderr.take()
187    }
188}
189
190/// A response stream that yields ClaudeOutput messages
191/// Holds a reference to the client to read from
192pub struct ResponseStream<'a> {
193    client: &'a mut AsyncClient,
194    finished: bool,
195}
196
197impl ResponseStream<'_> {
198    /// Convert to a vector by collecting all responses
199    pub async fn collect(mut self) -> Result<Vec<ClaudeOutput>> {
200        let mut responses = Vec::new();
201
202        while !self.finished {
203            let output = self.client.receive().await?;
204            let is_result = matches!(&output, ClaudeOutput::Result(_));
205            responses.push(output);
206
207            if is_result {
208                self.finished = true;
209                break;
210            }
211        }
212
213        Ok(responses)
214    }
215
216    /// Get the next response
217    pub async fn next(&mut self) -> Option<Result<ClaudeOutput>> {
218        if self.finished {
219            return None;
220        }
221
222        match self.client.receive().await {
223            Ok(output) => {
224                if matches!(&output, ClaudeOutput::Result(_)) {
225                    self.finished = true;
226                }
227                Some(Ok(output))
228            }
229            Err(e) => {
230                self.finished = true;
231                Some(Err(e))
232            }
233        }
234    }
235}
236
237impl Drop for AsyncClient {
238    fn drop(&mut self) {
239        if self.is_alive() {
240            // Try to kill the process
241            if let Err(e) = self.child.start_kill() {
242                error!("Failed to kill Claude process on drop: {}", e);
243            }
244        }
245    }
246}