1#[cfg(feature = "json")]
2use std::time::Duration;
3
4#[cfg(all(feature = "json", feature = "async"))]
5use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
6#[cfg(all(feature = "json", feature = "async"))]
7use tokio::process::{ChildStderr, Command};
8#[cfg(feature = "json")]
9use tracing::{debug, warn};
10
11#[cfg(feature = "json")]
12use crate::Claude;
13#[cfg(feature = "json")]
14use crate::error::{Error, Result};
15#[cfg(feature = "json")]
16use crate::exec::CommandOutput;
17
18#[cfg(feature = "json")]
23#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
24pub struct StreamEvent {
25 #[serde(flatten)]
27 pub data: serde_json::Value,
28}
29
30#[cfg(feature = "json")]
31impl StreamEvent {
32 pub fn event_type(&self) -> Option<&str> {
34 self.data.get("type").and_then(|v| v.as_str())
35 }
36
37 pub fn role(&self) -> Option<&str> {
39 self.data.get("role").and_then(|v| v.as_str())
40 }
41
42 pub fn is_result(&self) -> bool {
44 self.event_type() == Some("result")
45 }
46
47 pub fn result_text(&self) -> Option<&str> {
49 self.data.get("result").and_then(|v| v.as_str())
50 }
51
52 pub fn session_id(&self) -> Option<&str> {
54 self.data.get("session_id").and_then(|v| v.as_str())
55 }
56
57 pub fn cost_usd(&self) -> Option<f64> {
62 self.data
63 .get("total_cost_usd")
64 .or_else(|| self.data.get("cost_usd"))
65 .and_then(|v| v.as_f64())
66 }
67}
68
69#[cfg(all(feature = "json", feature = "async"))]
96pub async fn stream_query<F>(
97 claude: &Claude,
98 cmd: &crate::command::query::QueryCommand,
99 handler: F,
100) -> Result<CommandOutput>
101where
102 F: FnMut(StreamEvent),
103{
104 stream_query_impl(claude, cmd, handler, claude.timeout).await
105}
106
107#[cfg(all(feature = "json", feature = "async"))]
119async fn stream_query_impl<F>(
120 claude: &Claude,
121 cmd: &crate::command::query::QueryCommand,
122 mut handler: F,
123 timeout: Option<Duration>,
124) -> Result<CommandOutput>
125where
126 F: FnMut(StreamEvent),
127{
128 use crate::command::ClaudeCommand;
129
130 let args = cmd.args();
131
132 let mut command_args = Vec::new();
133 command_args.extend(claude.global_args.clone());
134 command_args.extend(args);
135
136 debug!(
137 binary = %claude.binary.display(),
138 args = ?command_args,
139 timeout = ?timeout,
140 "streaming claude command"
141 );
142
143 let mut cmd = Command::new(&claude.binary);
144 cmd.args(&command_args)
145 .env_remove("CLAUDECODE")
146 .envs(&claude.env)
147 .stdout(std::process::Stdio::piped())
148 .stderr(std::process::Stdio::piped())
149 .stdin(std::process::Stdio::null());
150
151 if let Some(ref dir) = claude.working_dir {
152 cmd.current_dir(dir);
153 }
154
155 let mut child = cmd.spawn().map_err(|e| Error::Io {
156 message: format!("failed to spawn claude: {e}"),
157 source: e,
158 working_dir: claude.working_dir.clone(),
159 })?;
160
161 let stdout = child.stdout.take().expect("stdout was piped");
162 let mut stderr = child.stderr.take().expect("stderr was piped");
163
164 let mut reader = BufReader::new(stdout).lines();
165
166 let drain = drain_stderr(&mut stderr);
171 let read_future = read_lines(&mut reader, &mut handler, claude.working_dir.clone());
172 let combined = async {
173 let (line_result, stderr_str) = tokio::join!(read_future, drain);
174 (line_result, stderr_str)
175 };
176
177 let (line_result, stderr_str) = match timeout {
178 Some(d) => match tokio::time::timeout(d, combined).await {
179 Ok(pair) => pair,
180 Err(_) => {
181 let _ = child.kill().await;
187 let drain_budget = Duration::from_millis(200);
188 let stderr_str = tokio::time::timeout(drain_budget, drain_stderr(&mut stderr))
189 .await
190 .unwrap_or_default();
191 if !stderr_str.is_empty() {
192 warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
193 }
194 return Err(Error::Timeout {
195 timeout_seconds: d.as_secs(),
196 });
197 }
198 },
199 None => combined.await,
200 };
201
202 if let Err(e) = line_result {
205 let _ = child.kill().await;
206 return Err(e);
207 }
208
209 let status = child.wait().await.map_err(|e| Error::Io {
210 message: "failed to wait for claude process".to_string(),
211 source: e,
212 working_dir: claude.working_dir.clone(),
213 })?;
214
215 let exit_code = status.code().unwrap_or(-1);
216
217 if !status.success() {
218 return Err(Error::CommandFailed {
219 command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
220 exit_code,
221 stdout: String::new(),
222 stderr: stderr_str,
223 working_dir: claude.working_dir.clone(),
224 });
225 }
226
227 Ok(CommandOutput {
228 stdout: String::new(), stderr: stderr_str,
230 exit_code,
231 success: true,
232 })
233}
234
235#[cfg(all(feature = "json", feature = "async"))]
236async fn drain_stderr(stderr: &mut ChildStderr) -> String {
237 let mut buf = Vec::new();
238 let _ = stderr.read_to_end(&mut buf).await;
239 String::from_utf8_lossy(&buf).into_owned()
240}
241
242#[cfg(all(feature = "json", feature = "async"))]
243async fn read_lines<F>(
244 reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
245 handler: &mut F,
246 working_dir: Option<std::path::PathBuf>,
247) -> Result<()>
248where
249 F: FnMut(StreamEvent),
250{
251 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
252 message: "failed to read stdout line".to_string(),
253 source: e,
254 working_dir: working_dir.clone(),
255 })? {
256 if line.trim().is_empty() {
257 continue;
258 }
259 match serde_json::from_str::<StreamEvent>(&line) {
260 Ok(event) => handler(event),
261 Err(e) => {
262 debug!(line = %line, error = %e, "failed to parse stream event, skipping");
263 }
264 }
265 }
266
267 Ok(())
268}
269
270#[cfg(all(feature = "sync", feature = "json"))]
308pub fn stream_query_sync<F>(
309 claude: &Claude,
310 cmd: &crate::command::query::QueryCommand,
311 mut handler: F,
312) -> Result<CommandOutput>
313where
314 F: FnMut(StreamEvent),
315{
316 use std::io::{BufRead as _, Read as _};
317 use std::process::{Command as StdCommand, Stdio};
318 use std::sync::mpsc;
319 use std::thread;
320 use std::time::Instant;
321
322 use crate::command::ClaudeCommand;
323
324 let args = cmd.args();
325 let mut command_args = Vec::new();
326 command_args.extend(claude.global_args.clone());
327 command_args.extend(args);
328
329 debug!(
330 binary = %claude.binary.display(),
331 args = ?command_args,
332 timeout = ?claude.timeout,
333 "streaming claude command (sync)"
334 );
335
336 let mut cmd_builder = StdCommand::new(&claude.binary);
337 cmd_builder
338 .args(&command_args)
339 .env_remove("CLAUDECODE")
340 .env_remove("CLAUDE_CODE_ENTRYPOINT")
341 .envs(&claude.env)
342 .stdin(Stdio::null())
343 .stdout(Stdio::piped())
344 .stderr(Stdio::piped());
345
346 if let Some(ref dir) = claude.working_dir {
347 cmd_builder.current_dir(dir);
348 }
349
350 let mut child = cmd_builder.spawn().map_err(|e| Error::Io {
351 message: format!("failed to spawn claude: {e}"),
352 source: e,
353 working_dir: claude.working_dir.clone(),
354 })?;
355
356 let stdout = child.stdout.take().expect("stdout was piped");
357 let stderr = child.stderr.take().expect("stderr was piped");
358
359 let (tx, rx) = mpsc::channel::<StreamEvent>();
363 let reader_wd = claude.working_dir.clone();
364 let reader_thread = thread::spawn(move || -> Result<()> {
365 let reader = std::io::BufReader::new(stdout);
366 for line_res in reader.lines() {
367 let line = line_res.map_err(|e| Error::Io {
368 message: "failed to read stdout line".to_string(),
369 source: e,
370 working_dir: reader_wd.clone(),
371 })?;
372 if line.trim().is_empty() {
373 continue;
374 }
375 match serde_json::from_str::<StreamEvent>(&line) {
376 Ok(event) => {
377 if tx.send(event).is_err() {
378 return Ok(());
380 }
381 }
382 Err(e) => {
383 debug!(line = %line, error = %e, "failed to parse stream event, skipping");
384 }
385 }
386 }
387 Ok(())
388 });
389
390 let stderr_thread = thread::spawn(move || -> String {
391 let mut buf = Vec::new();
392 let mut stderr = stderr;
393 let _ = stderr.read_to_end(&mut buf);
394 String::from_utf8_lossy(&buf).into_owned()
395 });
396
397 let deadline = claude.timeout.map(|d| Instant::now() + d);
400 let mut timed_out = false;
401
402 loop {
403 let recv_result = match deadline {
404 Some(d) => {
405 let now = Instant::now();
406 if now >= d {
407 timed_out = true;
408 break;
409 }
410 rx.recv_timeout(d - now)
411 }
412 None => rx.recv().map_err(|_| mpsc::RecvTimeoutError::Disconnected),
413 };
414
415 match recv_result {
416 Ok(event) => handler(event),
417 Err(mpsc::RecvTimeoutError::Timeout) => {
418 timed_out = true;
419 break;
420 }
421 Err(mpsc::RecvTimeoutError::Disconnected) => break,
422 }
423 }
424
425 if timed_out {
426 let _ = child.kill();
427 let _ = child.wait();
428 let budget = Duration::from_millis(200);
435 let stderr_str = join_with_budget(stderr_thread, budget).unwrap_or_default();
436 let _ = join_with_budget(reader_thread, budget);
437 if !stderr_str.is_empty() {
438 warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
439 }
440 return Err(Error::Timeout {
441 timeout_seconds: claude.timeout.map(|d| d.as_secs()).unwrap_or_default(),
442 });
443 }
444
445 let reader_result = reader_thread.join().unwrap_or(Ok(()));
447 if let Err(e) = reader_result {
448 let _ = child.kill();
449 let _ = child.wait();
450 let _ = stderr_thread.join();
451 return Err(e);
452 }
453
454 let status = child.wait().map_err(|e| Error::Io {
455 message: "failed to wait for claude process".to_string(),
456 source: e,
457 working_dir: claude.working_dir.clone(),
458 })?;
459 let stderr_str = stderr_thread.join().unwrap_or_default();
460 let exit_code = status.code().unwrap_or(-1);
461
462 if !status.success() {
463 return Err(Error::CommandFailed {
464 command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
465 exit_code,
466 stdout: String::new(),
467 stderr: stderr_str,
468 working_dir: claude.working_dir.clone(),
469 });
470 }
471
472 Ok(CommandOutput {
473 stdout: String::new(),
474 stderr: stderr_str,
475 exit_code,
476 success: true,
477 })
478}
479
480#[cfg(all(feature = "sync", feature = "json"))]
485fn join_with_budget<T: Send + 'static>(
486 handle: std::thread::JoinHandle<T>,
487 budget: Duration,
488) -> Option<T> {
489 use std::sync::mpsc;
490 use std::thread;
491
492 let (tx, rx) = mpsc::channel::<T>();
493 thread::spawn(move || {
494 if let Ok(v) = handle.join() {
495 let _ = tx.send(v);
496 }
497 });
498 rx.recv_timeout(budget).ok()
499}