claude_codes/
client_sync.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{ClaudeInput, ClaudeOutput, ContentBlock, ParseError};
6use crate::protocol::Protocol;
7use log::debug;
8use serde::{Deserialize, Serialize};
9use std::io::{BufRead, BufReader, Write};
10use std::process::{Child, ChildStdin, ChildStdout};
11use uuid::Uuid;
12
13pub struct SyncClient {
15 child: Child,
16 stdin: ChildStdin,
17 stdout: BufReader<ChildStdout>,
18 session_uuid: Option<Uuid>,
19}
20
21impl SyncClient {
22 pub fn new(mut child: Child) -> Result<Self> {
24 let stdin = child
25 .stdin
26 .take()
27 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
28 let stdout = child
29 .stdout
30 .take()
31 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
32
33 Ok(Self {
34 child,
35 stdin,
36 stdout: BufReader::new(stdout),
37 session_uuid: None,
38 })
39 }
40
41 pub fn with_defaults() -> Result<Self> {
43 crate::version::check_claude_version()?;
49 let child = ClaudeCliBuilder::new().spawn_sync().map_err(Error::Io)?;
50 Self::new(child)
51 }
52
53 pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
56 let child = ClaudeCliBuilder::new()
57 .resume(Some(session_uuid.to_string()))
58 .spawn_sync()
59 .map_err(Error::Io)?;
60
61 debug!("Resuming Claude session with UUID: {}", session_uuid);
62 let mut client = Self::new(child)?;
63 client.session_uuid = Some(session_uuid);
65 Ok(client)
66 }
67
68 pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
70 let child = ClaudeCliBuilder::new()
71 .model(model)
72 .resume(Some(session_uuid.to_string()))
73 .spawn_sync()
74 .map_err(Error::Io)?;
75
76 debug!(
77 "Resuming Claude session with UUID: {} and model: {}",
78 session_uuid, model
79 );
80 let mut client = Self::new(child)?;
81 client.session_uuid = Some(session_uuid);
83 Ok(client)
84 }
85
86 pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
88 let mut responses = Vec::new();
89 for response in self.query_stream(input)? {
90 responses.push(response?);
91 }
92 Ok(responses)
93 }
94
95 pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
97 Protocol::write_sync(&mut self.stdin, &input)?;
99
100 Ok(ResponseIterator {
101 client: self,
102 finished: false,
103 })
104 }
105
106 fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
108 let mut line = String::new();
109 match self.stdout.read_line(&mut line) {
110 Ok(0) => {
111 debug!("[CLIENT] Stream closed");
112 Ok(None)
113 }
114 Ok(_) => {
115 let trimmed = line.trim();
116 if trimmed.is_empty() {
117 debug!("[CLIENT] Skipping empty line");
118 return self.read_next();
119 }
120
121 debug!("[CLIENT] Received: {}", trimmed);
122 match ClaudeOutput::parse_json_tolerant(trimmed) {
123 Ok(output) => {
124 if self.session_uuid.is_none() {
126 if let ClaudeOutput::Assistant(ref msg) = output {
127 if let Some(ref uuid_str) = msg.uuid {
128 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
129 debug!("[CLIENT] Captured session UUID: {}", uuid);
130 self.session_uuid = Some(uuid);
131 }
132 }
133 } else if let ClaudeOutput::Result(ref msg) = output {
134 if let Some(ref uuid_str) = msg.uuid {
135 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
136 debug!("[CLIENT] Captured session UUID: {}", uuid);
137 self.session_uuid = Some(uuid);
138 }
139 }
140 }
141 }
142
143 if matches!(output, ClaudeOutput::Result(_)) {
145 debug!("[CLIENT] Received result message, stream complete");
146 Ok(Some(output))
147 } else {
148 Ok(Some(output))
149 }
150 }
151 Err(ParseError { error_message, .. }) => {
152 debug!("[CLIENT] Failed to deserialize: {}", error_message);
153 debug!("[CLIENT] Raw JSON that failed: {}", trimmed);
154 Err(Error::Deserialization(format!(
155 "{} (raw: {})",
156 error_message, trimmed
157 )))
158 }
159 }
160 }
161 Err(e) => {
162 debug!("[CLIENT] Error reading from stdout: {}", e);
163 Err(Error::Io(e))
164 }
165 }
166 }
167
168 pub fn shutdown(&mut self) -> Result<()> {
170 debug!("[CLIENT] Shutting down client");
171 self.child.kill().map_err(Error::Io)?;
172 self.child.wait().map_err(Error::Io)?;
173 Ok(())
174 }
175
176 pub fn session_uuid(&self) -> Result<Uuid> {
179 self.session_uuid.ok_or(Error::SessionNotInitialized)
180 }
181
182 pub fn ping(&mut self) -> bool {
185 let ping_input = ClaudeInput::user_message(
187 "ping - respond with just the word 'pong' and nothing else",
188 self.session_uuid.unwrap_or_else(Uuid::new_v4),
189 );
190
191 match self.query(ping_input) {
193 Ok(responses) => {
194 for output in responses {
196 if let ClaudeOutput::Assistant(msg) = &output {
197 for content in &msg.message.content {
198 if let ContentBlock::Text(text) = content {
199 if text.text.to_lowercase().contains("pong") {
200 return true;
201 }
202 }
203 }
204 }
205 }
206 false
207 }
208 Err(e) => {
209 debug!("Ping failed: {}", e);
210 false
211 }
212 }
213 }
214}
215
216impl Protocol {
218 pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
220 let line = Self::serialize(message)?;
221 debug!("[PROTOCOL] Sending: {}", line.trim());
222 writer.write_all(line.as_bytes())?;
223 writer.flush()?;
224 Ok(())
225 }
226
227 pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
229 let mut line = String::new();
230 let bytes_read = reader.read_line(&mut line)?;
231 if bytes_read == 0 {
232 return Err(Error::ConnectionClosed);
233 }
234 debug!("[PROTOCOL] Received: {}", line.trim());
235 Self::deserialize(&line)
236 }
237}
238
239pub struct ResponseIterator<'a> {
241 client: &'a mut SyncClient,
242 finished: bool,
243}
244
245impl Iterator for ResponseIterator<'_> {
246 type Item = Result<ClaudeOutput>;
247
248 fn next(&mut self) -> Option<Self::Item> {
249 if self.finished {
250 return None;
251 }
252
253 match self.client.read_next() {
254 Ok(Some(output)) => {
255 if matches!(output, ClaudeOutput::Result(_)) {
257 self.finished = true;
258 }
259 Some(Ok(output))
260 }
261 Ok(None) => {
262 self.finished = true;
263 None
264 }
265 Err(e) => {
266 self.finished = true;
267 Some(Err(e))
268 }
269 }
270 }
271}
272
273impl Drop for SyncClient {
274 fn drop(&mut self) {
275 if let Err(e) = self.shutdown() {
276 debug!("[CLIENT] Error during shutdown: {}", e);
277 }
278 }
279}
280
281pub struct StreamProcessor<R> {
283 reader: BufReader<R>,
284}
285
286impl<R: std::io::Read> StreamProcessor<R> {
287 pub fn new(reader: R) -> Self {
289 Self {
290 reader: BufReader::new(reader),
291 }
292 }
293
294 pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
296 Protocol::read_sync(&mut self.reader)
297 }
298
299 pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
301 where
302 T: for<'de> Deserialize<'de>,
303 F: FnMut(T) -> Result<()>,
304 {
305 loop {
306 match self.next_message() {
307 Ok(message) => handler(message)?,
308 Err(Error::ConnectionClosed) => break,
309 Err(e) => return Err(e),
310 }
311 }
312 Ok(())
313 }
314}