claude_codes/
client_sync.rs1use crate::cli::ClaudeCliBuilder;
4use crate::error::{Error, Result};
5use crate::io::{
6 ClaudeInput, ClaudeOutput, ContentBlock, ControlRequestMessage, ControlResponse,
7 ControlResponseMessage, ParseError,
8};
9use crate::protocol::Protocol;
10use log::debug;
11use serde::{Deserialize, Serialize};
12use std::io::{BufRead, BufReader, Write};
13use std::process::{Child, ChildStdin, ChildStdout};
14use uuid::Uuid;
15
16pub struct SyncClient {
18 child: Child,
19 stdin: ChildStdin,
20 stdout: BufReader<ChildStdout>,
21 session_uuid: Option<Uuid>,
22 tool_approval_enabled: bool,
24}
25
26impl SyncClient {
27 pub fn new(mut child: Child) -> Result<Self> {
29 let stdin = child
30 .stdin
31 .take()
32 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
33 let stdout = child
34 .stdout
35 .take()
36 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
37
38 Ok(Self {
39 child,
40 stdin,
41 stdout: BufReader::new(stdout),
42 session_uuid: None,
43 tool_approval_enabled: false,
44 })
45 }
46
47 pub fn with_defaults() -> Result<Self> {
49 crate::version::check_claude_version()?;
55 let child = ClaudeCliBuilder::new().spawn_sync().map_err(Error::Io)?;
56 Self::new(child)
57 }
58
59 pub fn resume_session(session_uuid: Uuid) -> Result<Self> {
62 let child = ClaudeCliBuilder::new()
63 .resume(Some(session_uuid.to_string()))
64 .spawn_sync()
65 .map_err(Error::Io)?;
66
67 debug!("Resuming Claude session with UUID: {}", session_uuid);
68 let mut client = Self::new(child)?;
69 client.session_uuid = Some(session_uuid);
71 Ok(client)
72 }
73
74 pub fn resume_session_with_model(session_uuid: Uuid, model: &str) -> Result<Self> {
76 let child = ClaudeCliBuilder::new()
77 .model(model)
78 .resume(Some(session_uuid.to_string()))
79 .spawn_sync()
80 .map_err(Error::Io)?;
81
82 debug!(
83 "Resuming Claude session with UUID: {} and model: {}",
84 session_uuid, model
85 );
86 let mut client = Self::new(child)?;
87 client.session_uuid = Some(session_uuid);
89 Ok(client)
90 }
91
92 pub fn query(&mut self, input: ClaudeInput) -> Result<Vec<ClaudeOutput>> {
94 let mut responses = Vec::new();
95 for response in self.query_stream(input)? {
96 responses.push(response?);
97 }
98 Ok(responses)
99 }
100
101 pub fn query_stream(&mut self, input: ClaudeInput) -> Result<ResponseIterator<'_>> {
103 Protocol::write_sync(&mut self.stdin, &input)?;
105
106 Ok(ResponseIterator {
107 client: self,
108 finished: false,
109 })
110 }
111
112 fn read_next(&mut self) -> Result<Option<ClaudeOutput>> {
114 let mut line = String::new();
115 match self.stdout.read_line(&mut line) {
116 Ok(0) => {
117 debug!("[CLIENT] Stream closed");
118 Ok(None)
119 }
120 Ok(_) => {
121 let trimmed = line.trim();
122 if trimmed.is_empty() {
123 debug!("[CLIENT] Skipping empty line");
124 return self.read_next();
125 }
126
127 debug!("[CLIENT] Received: {}", trimmed);
128 match ClaudeOutput::parse_json_tolerant(trimmed) {
129 Ok(output) => {
130 if self.session_uuid.is_none() {
132 if let ClaudeOutput::Assistant(ref msg) = output {
133 if let Some(ref uuid_str) = msg.uuid {
134 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
135 debug!("[CLIENT] Captured session UUID: {}", uuid);
136 self.session_uuid = Some(uuid);
137 }
138 }
139 } else if let ClaudeOutput::Result(ref msg) = output {
140 if let Some(ref uuid_str) = msg.uuid {
141 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
142 debug!("[CLIENT] Captured session UUID: {}", uuid);
143 self.session_uuid = Some(uuid);
144 }
145 }
146 }
147 }
148
149 if matches!(output, ClaudeOutput::Result(_)) {
151 debug!("[CLIENT] Received result message, stream complete");
152 Ok(Some(output))
153 } else {
154 Ok(Some(output))
155 }
156 }
157 Err(ParseError { error_message, .. }) => {
158 debug!("[CLIENT] Failed to deserialize: {}", error_message);
159 debug!("[CLIENT] Raw JSON that failed: {}", trimmed);
160 Err(Error::Deserialization(format!(
161 "{} (raw: {})",
162 error_message, trimmed
163 )))
164 }
165 }
166 }
167 Err(e) => {
168 debug!("[CLIENT] Error reading from stdout: {}", e);
169 Err(Error::Io(e))
170 }
171 }
172 }
173
174 pub fn shutdown(&mut self) -> Result<()> {
176 debug!("[CLIENT] Shutting down client");
177 self.child.kill().map_err(Error::Io)?;
178 self.child.wait().map_err(Error::Io)?;
179 Ok(())
180 }
181
182 pub fn session_uuid(&self) -> Result<Uuid> {
185 self.session_uuid.ok_or(Error::SessionNotInitialized)
186 }
187
188 pub fn ping(&mut self) -> bool {
191 let ping_input = ClaudeInput::user_message(
193 "ping - respond with just the word 'pong' and nothing else",
194 self.session_uuid.unwrap_or_else(Uuid::new_v4),
195 );
196
197 match self.query(ping_input) {
199 Ok(responses) => {
200 for output in responses {
202 if let ClaudeOutput::Assistant(msg) = &output {
203 for content in &msg.message.content {
204 if let ContentBlock::Text(text) = content {
205 if text.text.to_lowercase().contains("pong") {
206 return true;
207 }
208 }
209 }
210 }
211 }
212 false
213 }
214 Err(e) => {
215 debug!("Ping failed: {}", e);
216 false
217 }
218 }
219 }
220
221 pub fn enable_tool_approval(&mut self) -> Result<()> {
254 if self.tool_approval_enabled {
255 debug!("[TOOL_APPROVAL] Already enabled, skipping initialization");
256 return Ok(());
257 }
258
259 let request_id = format!("init-{}", Uuid::new_v4());
260 let init_request = ControlRequestMessage::initialize(&request_id);
261
262 debug!("[TOOL_APPROVAL] Sending initialization handshake");
263 Protocol::write_sync(&mut self.stdin, &init_request)?;
264
265 loop {
267 let mut line = String::new();
268 let bytes_read = self.stdout.read_line(&mut line).map_err(Error::Io)?;
269
270 if bytes_read == 0 {
271 return Err(Error::ConnectionClosed);
272 }
273
274 let trimmed = line.trim();
275 if trimmed.is_empty() {
276 continue;
277 }
278
279 debug!("[TOOL_APPROVAL] Received: {}", trimmed);
280
281 match ClaudeOutput::parse_json_tolerant(trimmed) {
283 Ok(ClaudeOutput::ControlResponse(resp)) => {
284 use crate::io::ControlResponsePayload;
285 match &resp.response {
286 ControlResponsePayload::Success {
287 request_id: rid, ..
288 } if rid == &request_id => {
289 debug!("[TOOL_APPROVAL] Initialization successful");
290 self.tool_approval_enabled = true;
291 return Ok(());
292 }
293 ControlResponsePayload::Error { error, .. } => {
294 return Err(Error::Protocol(format!(
295 "Tool approval initialization failed: {}",
296 error
297 )));
298 }
299 _ => {
300 continue;
302 }
303 }
304 }
305 Ok(_) => {
306 continue;
308 }
309 Err(e) => {
310 return Err(Error::Deserialization(e.to_string()));
311 }
312 }
313 }
314 }
315
316 pub fn send_control_response(&mut self, response: ControlResponse) -> Result<()> {
341 let message: ControlResponseMessage = response.into();
342 debug!(
343 "[TOOL_APPROVAL] Sending control response: {:?}",
344 serde_json::to_string(&message)
345 );
346 Protocol::write_sync(&mut self.stdin, &message)?;
347 Ok(())
348 }
349
350 pub fn is_tool_approval_enabled(&self) -> bool {
352 self.tool_approval_enabled
353 }
354}
355
356impl Protocol {
358 pub fn write_sync<W: Write, T: Serialize>(writer: &mut W, message: &T) -> Result<()> {
360 let line = Self::serialize(message)?;
361 debug!("[PROTOCOL] Sending: {}", line.trim());
362 writer.write_all(line.as_bytes())?;
363 writer.flush()?;
364 Ok(())
365 }
366
367 pub fn read_sync<R: BufRead, T: for<'de> Deserialize<'de>>(reader: &mut R) -> Result<T> {
369 let mut line = String::new();
370 let bytes_read = reader.read_line(&mut line)?;
371 if bytes_read == 0 {
372 return Err(Error::ConnectionClosed);
373 }
374 debug!("[PROTOCOL] Received: {}", line.trim());
375 Self::deserialize(&line)
376 }
377}
378
379pub struct ResponseIterator<'a> {
381 client: &'a mut SyncClient,
382 finished: bool,
383}
384
385impl Iterator for ResponseIterator<'_> {
386 type Item = Result<ClaudeOutput>;
387
388 fn next(&mut self) -> Option<Self::Item> {
389 if self.finished {
390 return None;
391 }
392
393 match self.client.read_next() {
394 Ok(Some(output)) => {
395 if matches!(output, ClaudeOutput::Result(_)) {
397 self.finished = true;
398 }
399 Some(Ok(output))
400 }
401 Ok(None) => {
402 self.finished = true;
403 None
404 }
405 Err(e) => {
406 self.finished = true;
407 Some(Err(e))
408 }
409 }
410 }
411}
412
413impl Drop for SyncClient {
414 fn drop(&mut self) {
415 if let Err(e) = self.shutdown() {
416 debug!("[CLIENT] Error during shutdown: {}", e);
417 }
418 }
419}
420
421pub struct StreamProcessor<R> {
423 reader: BufReader<R>,
424}
425
426impl<R: std::io::Read> StreamProcessor<R> {
427 pub fn new(reader: R) -> Self {
429 Self {
430 reader: BufReader::new(reader),
431 }
432 }
433
434 pub fn next_message<T: for<'de> Deserialize<'de>>(&mut self) -> Result<T> {
436 Protocol::read_sync(&mut self.reader)
437 }
438
439 pub fn process_all<T, F>(&mut self, mut handler: F) -> Result<()>
441 where
442 T: for<'de> Deserialize<'de>,
443 F: FnMut(T) -> Result<()>,
444 {
445 loop {
446 match self.next_message() {
447 Ok(message) => handler(message)?,
448 Err(Error::ConnectionClosed) => break,
449 Err(e) => return Err(e),
450 }
451 }
452 Ok(())
453 }
454}