claude_codes/
client_sync.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6 ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7 ControlResponseMessage, ParseError,
8};
9use crate::protocol::Protocol;
10use log::{debug, warn};
11use serde::{Deserialize, Serialize};
12use std::io::{BufRead, BufReader, Write};
13use std::process::{Child, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16pub struct SyncClient {
18 child: Child,
19 stdin: ChildStdin,
20 stdout: BufReader<ChildStdout>,
21 session_uuid: Option<Uuid>,
22 tool_approval_enabled: bool,
24}
25
26const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
28
29impl SyncClient {
30 pub fn new(mut child: Child) -> Result<Self> {
32 let stdin = child
33 .stdin
34 .take()
35 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
36 let stdout = child
37 .stdout
38 .take()
39 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
40
41 Ok(Self {
42 child,
43 stdin,
44 stdout: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
45 session_uuid: None,
46 tool_approval_enabled: false,
47 })
48 }
49
50 pub fn with_defaults() -> Result<Self> {
52 crate::version::check_claude_version()?;
58 let child = ClaudeCliBuilder::new().spawn_sync().map_err(Error::Io)?;
59 Self::new(child)
60 }
61
62 pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
65 let child = ClaudeCliBuilder::new()
66 .resume(Some(session_uuid.to_string()))
67 .spawn_sync()
68 .map_err(Error::Io)?;
69
70 debug!("Resuming Claude session with UUID: {}", session_uuid);
71 let mut client = Self::new(child)?;
72 client.session_uuid = Some(session_uuid);
74 Ok(client)
75 }
76
77 pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
79 let child = ClaudeCliBuilder::new()
80 .model(model)
81 .resume(Some(session_uuid.to_string()))
82 .spawn_sync()
83 .map_err(Error::Io)?;
84
85 debug!(
86 "Resuming Claude session with UUID: {} and model: {}",
87 session_uuid, model
88 );
89 let mut client = Self::new(child)?;
90 client.session_uuid = Some(session_uuid);
92 Ok(client)
93 }
94
95 pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
97 let mut responses = Vec::new();
98 for response in self.query_stream(input)? {
99 responses.push(response?);
100 }
101 Ok(responses)
102 }
103
104 pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
106 Protocol::write_sync(&mut self.stdin, &input)?;
108
109 Ok(ResponseIterator {
110 client: self,
111 finished: false,
112 })
113 }
114
115 fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
117 let mut line = String::new();
118 match self.stdout.read_line(&mut line) {
119 Ok(0) => {
120 debug!("[CLIENT] Stream closed");
121 Ok(None)
122 }
123 Ok(_) => {
124 let trimmed = line.trim();
125 if trimmed.is_empty() {
126 debug!("[CLIENT] Skipping empty line");
127 return self.read_next();
128 }
129
130 debug!("[CLIENT] Received: {}", trimmed);
131 match ClaudeOutput::parse_json_tolerant(trimmed) {
132 Ok(output) => {
133 if self.session_uuid.is_none() {
135 if let ClaudeOutput::Assistant(ref msg) = output {
136 if let Some(ref uuid_str) = msg.uuid {
137 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
138 debug!("[CLIENT] Captured session UUID: {}", uuid);
139 self.session_uuid = Some(uuid);
140 }
141 }
142 } else if let ClaudeOutput::Result(ref msg) = output {
143 if let Some(ref uuid_str) = msg.uuid {
144 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
145 debug!("[CLIENT] Captured session UUID: {}", uuid);
146 self.session_uuid = Some(uuid);
147 }
148 }
149 }
150 }
151
152 if matches!(output, ClaudeOutput::Result(_)) {
154 debug!("[CLIENT] Received result message, stream complete");
155 Ok(Some(output))
156 } else {
157 Ok(Some(output))
158 }
159 }
160 Err(ParseError { error_message, .. }) => {
161 warn!("[CLIENT] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
162 warn!("[CLIENT] Parse error: {}", error_message);
163 warn!("[CLIENT] Raw message: {}", trimmed);
164 Err(Error::Deserialization(format!(
165 "{} (raw: {})",
166 error_message, trimmed
167 )))
168 }
169 }
170 }
171 Err(e) => {
172 debug!("[CLIENT] Error reading from stdout: {}", e);
173 Err(Error::Io(e))
174 }
175 }
176 }
177
178 pub fn shutdown(&mut self) -> Result<()> {
180 debug!("[CLIENT] Shutting down client");
181 self.child.kill().map_err(Error::Io)?;
182 self.child.wait().map_err(Error::Io)?;
183 Ok(())
184 }
185
186 pub fn session_uuid(&self) -> Result<Uuid> {
189 self.session_uuid.ok_or(Error::SessionNotInitialized)
190 }
191
192 pub fn ping(&mut self) -> bool {
195 let ping_input = ClaudeInput::user_message(
197 "ping - respond with just the word 'pong' and nothing else",
198 self.session_uuid.unwrap_or_else(Uuid::new_v4),
199 );
200
201 match self.query(ping_input) {
203 Ok(responses) => {
204 for output in responses {
206 if let ClaudeOutput::Assistant(msg) = &output {
207 for content in &msg.message.content {
208 if let ContentBlock::Text(text) = content {
209 if text.text.to_lowercase().contains("pong") {
210 return true;
211 }
212 }
213 }
214 }
215 }
216 false
217 }
218 Err(e) => {
219 debug!("Ping failed: {}", e);
220 false
221 }
222 }
223 }
224
225 pub fn enable_tool_approval(&mut self) -> Result<()> {
258 if self.tool_approval_enabled {
259 debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
260 return Ok(());
261 }
262
263 let request_id = format!("init-{}", Uuid::new_v4());
264 let init_request = ControlRequestMessage::initialize(&request_id);
265
266 debug!("[TOOL_APPROVAL] Sending initialization handshake");
267 Protocol::write_sync(&mut self.stdin, &init_request)?;
268
269 loop {
271 let mut line = String::new();
272 let bytes_read = self.stdout.read_line(&mut line).map_err(Error::Io)?;
273
274 if bytes_read == 0 {
275 return Err(Error::ConnectionClosed);
276 }
277
278 let trimmed = line.trim();
279 if trimmed.is_empty() {
280 continue;
281 }
282
283 debug!("[TOOL_APPROVAL] Received: {}", trimmed);
284
285 match ClaudeOutput::parse_json_tolerant(trimmed) {
287 Ok(ClaudeOutput::ControlResponse(resp)) => {
288 use crate::io::ControlResponsePayload;
289 match &resp.response {
290 ControlResponsePayload::Success {
291 request_id: rid, ..
292 } if rid == &request_id => {
293 debug!("[TOOL_APPROVAL] Initialization successful");
294 self.tool_approval_enabled = true;
295 return Ok(());
296 }
297 ControlResponsePayload::Error { error, .. } => {
298 return Err(Error::Protocol(format!(
299 "Tool approval initialization failed: {}",
300 error
301 )));
302 }
303 _ => {
304 continue;
306 }
307 }
308 }
309 Ok(_) => {
310 continue;
312 }
313 Err(e) => {
314 return Err(Error::Deserialization(e.to_string()));
315 }
316 }
317 }
318 }
319
320 pub fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
345 let message: ControlResponseMessage = response.into();
346 debug!(
347 "[TOOL_APPROVAL] Sending control response: {:?}",
348 serde_json::to_string(&message)
349 );
350 Protocol::write_sync(&mut self.stdin, &message)?;
351 Ok(())
352 }
353
354 pub fn is_tool_approval_enabled(&self) -> bool {
356 self.tool_approval_enabled
357 }
358}
359
360impl Protocol {
362 pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
364 let line = Self::serialize(message)?;
365 debug!("[PROTOCOL] Sending: {}", line.trim());
366 writer.write_all(line.as_bytes())?;
367 writer.flush()?;
368 Ok(())
369 }
370
371 pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
373 let mut line = String::new();
374 let bytes_read = reader.read_line(&mut line)?;
375 if bytes_read == 0 {
376 return Err(Error::ConnectionClosed);
377 }
378 debug!("[PROTOCOL] Received: {}", line.trim());
379 Self::deserialize(&line)
380 }
381}
382
383pub struct ResponseIterator<'a> {
385 client: &'a mut SyncClient,
386 finished: bool,
387}
388
389impl Iterator for ResponseIterator<'_> {
390 type Item = Result<ClaudeOutput>;
391
392 fn next(&mut self) -> Option<Self::Item> {
393 if self.finished {
394 return None;
395 }
396
397 match self.client.read_next() {
398 Ok(Some(output)) => {
399 if matches!(output, ClaudeOutput::Result(_)) {
401 self.finished = true;
402 }
403 Some(Ok(output))
404 }
405 Ok(None) => {
406 self.finished = true;
407 None
408 }
409 Err(e) => {
410 self.finished = true;
411 Some(Err(e))
412 }
413 }
414 }
415}
416
417impl Drop for SyncClient {
418 fn drop(&mut self) {
419 if let Err(e) = self.shutdown() {
420 debug!("[CLIENT] Error during shutdown: {}", e);
421 }
422 }
423}
424
425pub struct StreamProcessor<R> {
427 reader: BufReader<R>,
428}
429
430impl<R: std::io::Read> StreamProcessor<R> {
431 pub fn new(reader: R) -> Self {
433 Self {
434 reader: BufReader::new(reader),
435 }
436 }
437
438 pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
440 Protocol::read_sync(&mut self.reader)
441 }
442
443 pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
445 where
446 T: for<'de> Deserialize<'de>,
447 F: FnMut(T) -> Result<()>,
448 {
449 loop {
450 match self.next_message() {
451 Ok(message) => handler(message)?,
452 Err(Error::ConnectionClosed) => break,
453 Err(e) => return Err(e),
454 }
455 }
456 Ok(())
457 }
458}