claude_codes/
client_sync.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6 ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7 ControlResponseMessage,
8};
9use crate::protocol::Protocol;
10use log::{debug, warn};
11use serde::{Deserialize, Serialize};
12use std::io::{BufRead, BufReader, Write};
13use std::process::{Child, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16pub struct SyncClient {
18 child: Child,
19 stdin: ChildStdin,
20 stdout: BufReader<ChildStdout>,
21 session_uuid: Option<Uuid>,
22 tool_approval_enabled: bool,
24}
25
26const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
28
29impl SyncClient {
30 pub fn new(mut child: Child) -> Result<Self> {
32 let stdin = child
33 .stdin
34 .take()
35 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
36 let stdout = child
37 .stdout
38 .take()
39 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
40
41 Ok(Self {
42 child,
43 stdin,
44 stdout: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
45 session_uuid: None,
46 tool_approval_enabled: false,
47 })
48 }
49
50 pub fn with_defaults() -> Result<Self> {
52 crate::version::check_claude_version()?;
58 let child = ClaudeCliBuilder::new().spawn_sync()?;
59 Self::new(child)
60 }
61
62 pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
65 let child = ClaudeCliBuilder::new()
66 .resume(Some(session_uuid.to_string()))
67 .spawn_sync()?;
68
69 debug!("Resuming Claude session with UUID: {}", session_uuid);
70 let mut client = Self::new(child)?;
71 client.session_uuid = Some(session_uuid);
73 Ok(client)
74 }
75
76 pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
78 let child = ClaudeCliBuilder::new()
79 .model(model)
80 .resume(Some(session_uuid.to_string()))
81 .spawn_sync()?;
82
83 debug!(
84 "Resuming Claude session with UUID: {} and model: {}",
85 session_uuid, model
86 );
87 let mut client = Self::new(child)?;
88 client.session_uuid = Some(session_uuid);
90 Ok(client)
91 }
92
93 pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
95 let mut responses = Vec::new();
96 for response in self.query_stream(input)? {
97 responses.push(response?);
98 }
99 Ok(responses)
100 }
101
102 pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
104 Protocol::write_sync(&mut self.stdin, &input)?;
106
107 Ok(ResponseIterator {
108 client: self,
109 finished: false,
110 })
111 }
112
113 fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
115 let mut line = String::new();
116 match self.stdout.read_line(&mut line) {
117 Ok(0) => {
118 debug!("[CLIENT] Stream closed");
119 Ok(None)
120 }
121 Ok(_) => {
122 let trimmed = line.trim();
123 if trimmed.is_empty() {
124 debug!("[CLIENT] Skipping empty line");
125 return self.read_next();
126 }
127
128 debug!("[CLIENT] Received: {}", trimmed);
129 match ClaudeOutput::parse_json_tolerant(trimmed) {
130 Ok(output) => {
131 if self.session_uuid.is_none() {
133 if let ClaudeOutput::Assistant(ref msg) = output {
134 if let Some(ref uuid_str) = msg.uuid {
135 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
136 debug!("[CLIENT] Captured session UUID: {}", uuid);
137 self.session_uuid = Some(uuid);
138 }
139 }
140 } else if let ClaudeOutput::Result(ref msg) = output {
141 if let Some(ref uuid_str) = msg.uuid {
142 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
143 debug!("[CLIENT] Captured session UUID: {}", uuid);
144 self.session_uuid = Some(uuid);
145 }
146 }
147 }
148 }
149
150 if matches!(output, ClaudeOutput::Result(_)) {
152 debug!("[CLIENT] Received result message, stream complete");
153 Ok(Some(output))
154 } else {
155 Ok(Some(output))
156 }
157 }
158 Err(parse_error) => {
159 warn!("[CLIENT] Failed to deserialize message from Claude CLI. Please report this at https://github.com/meawoppl/rust-claude-codes/issues with the raw message below.");
160 warn!("[CLIENT] Parse error: {}", parse_error.error_message);
161 warn!("[CLIENT] Raw message: {}", trimmed);
162 Err(parse_error.into())
163 }
164 }
165 }
166 Err(e) => {
167 debug!("[CLIENT] Error reading from stdout: {}", e);
168 Err(Error::Io(e))
169 }
170 }
171 }
172
173 pub fn shutdown(&mut self) -> Result<()> {
175 debug!("[CLIENT] Shutting down client");
176 self.child.kill().map_err(Error::Io)?;
177 self.child.wait().map_err(Error::Io)?;
178 Ok(())
179 }
180
181 pub fn session_uuid(&self) -> Result<Uuid> {
184 self.session_uuid.ok_or(Error::SessionNotInitialized)
185 }
186
187 pub fn ping(&mut self) -> bool {
190 let ping_input = ClaudeInput::user_message(
192 "ping - respond with just the word 'pong' and nothing else",
193 self.session_uuid.unwrap_or_else(Uuid::new_v4),
194 );
195
196 match self.query(ping_input) {
198 Ok(responses) => {
199 for output in responses {
201 if let ClaudeOutput::Assistant(msg) = &output {
202 for content in &msg.message.content {
203 if let ContentBlock::Text(text) = content {
204 if text.text.to_lowercase().contains("pong") {
205 return true;
206 }
207 }
208 }
209 }
210 }
211 false
212 }
213 Err(e) => {
214 debug!("Ping failed: {}", e);
215 false
216 }
217 }
218 }
219
220 pub fn enable_tool_approval(&mut self) -> Result<()> {
253 if self.tool_approval_enabled {
254 debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
255 return Ok(());
256 }
257
258 let request_id = format!("init-{}", Uuid::new_v4());
259 let init_request = ControlRequestMessage::initialize(&request_id);
260
261 debug!("[TOOL_APPROVAL] Sending initialization handshake");
262 Protocol::write_sync(&mut self.stdin, &init_request)?;
263
264 loop {
266 let mut line = String::new();
267 let bytes_read = self.stdout.read_line(&mut line).map_err(Error::Io)?;
268
269 if bytes_read == 0 {
270 return Err(Error::ConnectionClosed);
271 }
272
273 let trimmed = line.trim();
274 if trimmed.is_empty() {
275 continue;
276 }
277
278 debug!("[TOOL_APPROVAL] Received: {}", trimmed);
279
280 match ClaudeOutput::parse_json_tolerant(trimmed) {
282 Ok(ClaudeOutput::ControlResponse(resp)) => {
283 use crate::io::ControlResponsePayload;
284 match &resp.response {
285 ControlResponsePayload::Success {
286 request_id: rid, ..
287 } if rid == &request_id => {
288 debug!("[TOOL_APPROVAL] Initialization successful");
289 self.tool_approval_enabled = true;
290 return Ok(());
291 }
292 ControlResponsePayload::Error { error, .. } => {
293 return Err(Error::Protocol(format!(
294 "Tool approval initialization failed: {}",
295 error
296 )));
297 }
298 _ => {
299 continue;
301 }
302 }
303 }
304 Ok(_) => {
305 continue;
307 }
308 Err(e) => {
309 return Err(e.into());
310 }
311 }
312 }
313 }
314
315 pub fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
340 let message: ControlResponseMessage = response.into();
341 debug!(
342 "[TOOL_APPROVAL] Sending control response: {:?}",
343 serde_json::to_string(&message)
344 );
345 Protocol::write_sync(&mut self.stdin, &message)?;
346 Ok(())
347 }
348
349 pub fn interrupt(&mut self) -> Result<()> {
354 let input = ClaudeInput::interrupt();
355 Protocol::write_sync(&mut self.stdin, &input)?;
356 Ok(())
357 }
358
359 pub fn is_tool_approval_enabled(&self) -> bool {
361 self.tool_approval_enabled
362 }
363}
364
365impl Protocol {
367 pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
369 let line = Self::serialize(message)?;
370 debug!("[PROTOCOL] Sending: {}", line.trim());
371 writer.write_all(line.as_bytes())?;
372 writer.flush()?;
373 Ok(())
374 }
375
376 pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
378 let mut line = String::new();
379 let bytes_read = reader.read_line(&mut line)?;
380 if bytes_read == 0 {
381 return Err(Error::ConnectionClosed);
382 }
383 debug!("[PROTOCOL] Received: {}", line.trim());
384 Self::deserialize(&line)
385 }
386}
387
388pub struct ResponseIterator<'a> {
390 client: &'a mut SyncClient,
391 finished: bool,
392}
393
394impl Iterator for ResponseIterator<'_> {
395 type Item = Result<ClaudeOutput>;
396
397 fn next(&mut self) -> Option<Self::Item> {
398 if self.finished {
399 return None;
400 }
401
402 match self.client.read_next() {
403 Ok(Some(output)) => {
404 if matches!(output, ClaudeOutput::Result(_)) {
406 self.finished = true;
407 }
408 Some(Ok(output))
409 }
410 Ok(None) => {
411 self.finished = true;
412 None
413 }
414 Err(e) => {
415 self.finished = true;
416 Some(Err(e))
417 }
418 }
419 }
420}
421
422impl Drop for SyncClient {
423 fn drop(&mut self) {
424 if let Err(e) = self.shutdown() {
425 debug!("[CLIENT] Error during shutdown: {}", e);
426 }
427 }
428}
429
430pub struct StreamProcessor<R> {
432 reader: BufReader<R>,
433}
434
435impl<R: std::io::Read> StreamProcessor<R> {
436 pub fn new(reader: R) -> Self {
438 Self {
439 reader: BufReader::new(reader),
440 }
441 }
442
443 pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
445 Protocol::read_sync(&mut self.reader)
446 }
447
448 pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
450 where
451 T: for<'de> Deserialize<'de>,
452 F: FnMut(T) -> Result<()>,
453 {
454 loop {
455 match self.next_message() {
456 Ok(message) => handler(message)?,
457 Err(Error::ConnectionClosed) => break,
458 Err(e) => return Err(e),
459 }
460 }
461 Ok(())
462 }
463}