Skip to main content

claude_wrapper/
streaming.rs

1#[cfg(feature = "json")]
2use std::time::Duration;
3
4#[cfg(all(feature = "json", feature = "async"))]
5use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
6#[cfg(all(feature = "json", feature = "async"))]
7use tokio::process::{ChildStderr, Command};
8#[cfg(feature = "json")]
9use tracing::{debug, warn};
10
11#[cfg(feature = "json")]
12use crate::Claude;
13#[cfg(feature = "json")]
14use crate::error::{Error, Result};
15#[cfg(feature = "json")]
16use crate::exec::CommandOutput;
17
18/// A single line from `--output-format stream-json` output.
19///
20/// Each line is an NDJSON object. The structure varies by message type,
21/// so we provide the raw JSON value and convenience accessors.
22#[cfg(feature = "json")]
23#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
24pub struct StreamEvent {
25    /// The raw JSON object for this event.
26    #[serde(flatten)]
27    pub data: serde_json::Value,
28}
29
30#[cfg(feature = "json")]
31impl StreamEvent {
32    /// Get the event type, if present.
33    pub fn event_type(&self) -> Option<&str> {
34        self.data.get("type").and_then(|v| v.as_str())
35    }
36
37    /// Get the message role, if present.
38    pub fn role(&self) -> Option<&str> {
39        self.data.get("role").and_then(|v| v.as_str())
40    }
41
42    /// Check if this is the final result message.
43    pub fn is_result(&self) -> bool {
44        self.event_type() == Some("result")
45    }
46
47    /// Extract the result text from a result event.
48    pub fn result_text(&self) -> Option<&str> {
49        self.data.get("result").and_then(|v| v.as_str())
50    }
51
52    /// Get the session ID if present.
53    pub fn session_id(&self) -> Option<&str> {
54        self.data.get("session_id").and_then(|v| v.as_str())
55    }
56
57    /// Get the cost in USD if present (usually on result events).
58    ///
59    /// Prefers `total_cost_usd` (the CLI's primary key) and falls back
60    /// to the legacy `cost_usd` alias.
61    pub fn cost_usd(&self) -> Option<f64> {
62        self.data
63            .get("total_cost_usd")
64            .or_else(|| self.data.get("cost_usd"))
65            .and_then(|v| v.as_f64())
66    }
67}
68
69/// Execute a command with streaming output, calling a handler for each NDJSON line.
70///
71/// This spawns the claude process and reads stdout line-by-line, parsing each
72/// as a JSON event and passing it to the handler. Useful for progress tracking
73/// and real-time output processing.
74///
75/// # Example
76///
77/// ```no_run
78/// use claude_wrapper::{Claude, QueryCommand, OutputFormat};
79/// use claude_wrapper::streaming::{StreamEvent, stream_query};
80///
81/// # async fn example() -> claude_wrapper::Result<()> {
82/// let claude = Claude::builder().build()?;
83///
84/// let cmd = QueryCommand::new("explain quicksort")
85///     .output_format(OutputFormat::StreamJson);
86///
87/// let output = stream_query(&claude, &cmd, |event: StreamEvent| {
88///     if let Some(t) = event.event_type() {
89///         println!("[{t}] {:?}", event.data);
90///     }
91/// }).await?;
92/// # Ok(())
93/// # }
94/// ```
95#[cfg(all(feature = "json", feature = "async"))]
96pub async fn stream_query<F>(
97    claude: &Claude,
98    cmd: &crate::command::query::QueryCommand,
99    handler: F,
100) -> Result<CommandOutput>
101where
102    F: FnMut(StreamEvent),
103{
104    stream_query_impl(claude, cmd, handler, claude.timeout).await
105}
106
107/// Unified streaming implementation with optional timeout.
108///
109/// Reads stderr concurrently in a background task so a chatty child
110/// cannot deadlock by filling the stderr pipe buffer, and so any
111/// captured stderr is available even on timeout or IO error.
112///
113/// On timeout, the child is killed and reaped (`kill().await` sends
114/// SIGKILL and waits), and whatever stderr was produced is logged at
115/// warn level. The returned `Error::Timeout` does not carry partial
116/// output -- streamed stdout events were already dispatched to the
117/// handler as they arrived.
118#[cfg(all(feature = "json", feature = "async"))]
119async fn stream_query_impl<F>(
120    claude: &Claude,
121    cmd: &crate::command::query::QueryCommand,
122    mut handler: F,
123    timeout: Option<Duration>,
124) -> Result<CommandOutput>
125where
126    F: FnMut(StreamEvent),
127{
128    use crate::command::ClaudeCommand;
129
130    let args = cmd.args();
131
132    let mut command_args = Vec::new();
133    command_args.extend(claude.global_args.clone());
134    command_args.extend(args);
135
136    debug!(
137        binary = %claude.binary.display(),
138        args = ?command_args,
139        timeout = ?timeout,
140        "streaming claude command"
141    );
142
143    let mut cmd = Command::new(&claude.binary);
144    cmd.args(&command_args)
145        .env_remove("CLAUDECODE")
146        .envs(&claude.env)
147        .stdout(std::process::Stdio::piped())
148        .stderr(std::process::Stdio::piped())
149        .stdin(std::process::Stdio::null());
150
151    if let Some(ref dir) = claude.working_dir {
152        cmd.current_dir(dir);
153    }
154
155    let mut child = cmd.spawn().map_err(|e| Error::Io {
156        message: format!("failed to spawn claude: {e}"),
157        source: e,
158        working_dir: claude.working_dir.clone(),
159    })?;
160
161    let stdout = child.stdout.take().expect("stdout was piped");
162    let mut stderr = child.stderr.take().expect("stderr was piped");
163
164    let mut reader = BufReader::new(stdout).lines();
165
166    // Run stdout line reading and stderr draining concurrently so a
167    // chatty child can't deadlock by filling the stderr pipe buffer.
168    // tokio::join! polls both futures on the same task (no tokio::spawn
169    // needed, so we avoid pulling in the `rt` feature).
170    let drain = drain_stderr(&mut stderr);
171    let read_future = read_lines(&mut reader, &mut handler, claude.working_dir.clone());
172    let combined = async {
173        let (line_result, stderr_str) = tokio::join!(read_future, drain);
174        (line_result, stderr_str)
175    };
176
177    let (line_result, stderr_str) = match timeout {
178        Some(d) => match tokio::time::timeout(d, combined).await {
179            Ok(pair) => pair,
180            Err(_) => {
181                // Timeout: kill the child (reaps via start_kill + wait)
182                // and try to drain whatever stderr remains. kill() only
183                // targets the direct child, so a subprocess tree holding
184                // our pipe fds could block the drain -- cap it with a
185                // short deadline.
186                let _ = child.kill().await;
187                let drain_budget = Duration::from_millis(200);
188                let stderr_str = tokio::time::timeout(drain_budget, drain_stderr(&mut stderr))
189                    .await
190                    .unwrap_or_default();
191                if !stderr_str.is_empty() {
192                    warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
193                }
194                return Err(Error::Timeout {
195                    timeout_seconds: d.as_secs(),
196                });
197            }
198        },
199        None => combined.await,
200    };
201
202    // If reading lines failed partway through (IO error, not timeout),
203    // clean up the child before returning.
204    if let Err(e) = line_result {
205        let _ = child.kill().await;
206        return Err(e);
207    }
208
209    let status = child.wait().await.map_err(|e| Error::Io {
210        message: "failed to wait for claude process".to_string(),
211        source: e,
212        working_dir: claude.working_dir.clone(),
213    })?;
214
215    let exit_code = status.code().unwrap_or(-1);
216
217    if !status.success() {
218        return Err(Error::CommandFailed {
219            command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
220            exit_code,
221            stdout: String::new(),
222            stderr: stderr_str,
223            working_dir: claude.working_dir.clone(),
224        });
225    }
226
227    Ok(CommandOutput {
228        stdout: String::new(), // already consumed via streaming
229        stderr: stderr_str,
230        exit_code,
231        success: true,
232    })
233}
234
235#[cfg(all(feature = "json", feature = "async"))]
236async fn drain_stderr(stderr: &mut ChildStderr) -> String {
237    let mut buf = Vec::new();
238    let _ = stderr.read_to_end(&mut buf).await;
239    String::from_utf8_lossy(&buf).into_owned()
240}
241
242#[cfg(all(feature = "json", feature = "async"))]
243async fn read_lines<F>(
244    reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
245    handler: &mut F,
246    working_dir: Option<std::path::PathBuf>,
247) -> Result<()>
248where
249    F: FnMut(StreamEvent),
250{
251    while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
252        message: "failed to read stdout line".to_string(),
253        source: e,
254        working_dir: working_dir.clone(),
255    })? {
256        if line.trim().is_empty() {
257            continue;
258        }
259        match serde_json::from_str::<StreamEvent>(&line) {
260            Ok(event) => handler(event),
261            Err(e) => {
262                debug!(line = %line, error = %e, "failed to parse stream event, skipping");
263            }
264        }
265    }
266
267    Ok(())
268}
269
270// ---------- sync streaming ----------
271
272/// Blocking mirror of [`stream_query`]. Reads NDJSON lines from the
273/// child's stdout on a worker thread, dispatches each parsed event
274/// to `handler` on the caller's thread, and drains stderr on a
275/// separate worker thread so the child can't deadlock on a full pipe.
276///
277/// Requires both `sync` and `json` features.
278///
279/// The handler is invoked on the caller's thread — no `Send` bound —
280/// so it can capture non-`Send` state. If a timeout is configured on
281/// the [`Claude`] client, the child is SIGKILLed and reaped once the
282/// deadline passes; partial events already dispatched to the handler
283/// are not rolled back.
284///
285/// # Example
286///
287/// ```no_run
288/// # #[cfg(all(feature = "sync", feature = "json"))]
289/// # {
290/// use claude_wrapper::{Claude, OutputFormat, QueryCommand};
291/// use claude_wrapper::streaming::{StreamEvent, stream_query_sync};
292///
293/// # fn example() -> claude_wrapper::Result<()> {
294/// let claude = Claude::builder().build()?;
295/// let cmd = QueryCommand::new("explain quicksort")
296///     .output_format(OutputFormat::StreamJson);
297///
298/// stream_query_sync(&claude, &cmd, |event: StreamEvent| {
299///     if let Some(t) = event.event_type() {
300///         println!("[{t}] {:?}", event.data);
301///     }
302/// })?;
303/// # Ok(())
304/// # }
305/// # }
306/// ```
307#[cfg(all(feature = "sync", feature = "json"))]
308pub fn stream_query_sync<F>(
309    claude: &Claude,
310    cmd: &crate::command::query::QueryCommand,
311    mut handler: F,
312) -> Result<CommandOutput>
313where
314    F: FnMut(StreamEvent),
315{
316    use std::io::{BufRead as _, Read as _};
317    use std::process::{Command as StdCommand, Stdio};
318    use std::sync::mpsc;
319    use std::thread;
320    use std::time::Instant;
321
322    use crate::command::ClaudeCommand;
323
324    let args = cmd.args();
325    let mut command_args = Vec::new();
326    command_args.extend(claude.global_args.clone());
327    command_args.extend(args);
328
329    debug!(
330        binary = %claude.binary.display(),
331        args = ?command_args,
332        timeout = ?claude.timeout,
333        "streaming claude command (sync)"
334    );
335
336    let mut cmd_builder = StdCommand::new(&claude.binary);
337    cmd_builder
338        .args(&command_args)
339        .env_remove("CLAUDECODE")
340        .env_remove("CLAUDE_CODE_ENTRYPOINT")
341        .envs(&claude.env)
342        .stdin(Stdio::null())
343        .stdout(Stdio::piped())
344        .stderr(Stdio::piped());
345
346    if let Some(ref dir) = claude.working_dir {
347        cmd_builder.current_dir(dir);
348    }
349
350    let mut child = cmd_builder.spawn().map_err(|e| Error::Io {
351        message: format!("failed to spawn claude: {e}"),
352        source: e,
353        working_dir: claude.working_dir.clone(),
354    })?;
355
356    let stdout = child.stdout.take().expect("stdout was piped");
357    let stderr = child.stderr.take().expect("stderr was piped");
358
359    // Reader thread: parse NDJSON lines and push StreamEvents through
360    // the channel. Handler runs on the caller's thread so it doesn't
361    // need Send. Bubbles IO errors out via the thread's return value.
362    let (tx, rx) = mpsc::channel::<StreamEvent>();
363    let reader_wd = claude.working_dir.clone();
364    let reader_thread = thread::spawn(move || -> Result<()> {
365        let reader = std::io::BufReader::new(stdout);
366        for line_res in reader.lines() {
367            let line = line_res.map_err(|e| Error::Io {
368                message: "failed to read stdout line".to_string(),
369                source: e,
370                working_dir: reader_wd.clone(),
371            })?;
372            if line.trim().is_empty() {
373                continue;
374            }
375            match serde_json::from_str::<StreamEvent>(&line) {
376                Ok(event) => {
377                    if tx.send(event).is_err() {
378                        // Receiver gone — main thread has bailed out.
379                        return Ok(());
380                    }
381                }
382                Err(e) => {
383                    debug!(line = %line, error = %e, "failed to parse stream event, skipping");
384                }
385            }
386        }
387        Ok(())
388    });
389
390    let stderr_thread = thread::spawn(move || -> String {
391        let mut buf = Vec::new();
392        let mut stderr = stderr;
393        let _ = stderr.read_to_end(&mut buf);
394        String::from_utf8_lossy(&buf).into_owned()
395    });
396
397    // Main loop: dispatch events on the caller's thread, honouring the
398    // configured timeout. Break on disconnect (reader done) or timeout.
399    let deadline = claude.timeout.map(|d| Instant::now() + d);
400    let mut timed_out = false;
401
402    loop {
403        let recv_result = match deadline {
404            Some(d) => {
405                let now = Instant::now();
406                if now >= d {
407                    timed_out = true;
408                    break;
409                }
410                rx.recv_timeout(d - now)
411            }
412            None => rx.recv().map_err(|_| mpsc::RecvTimeoutError::Disconnected),
413        };
414
415        match recv_result {
416            Ok(event) => handler(event),
417            Err(mpsc::RecvTimeoutError::Timeout) => {
418                timed_out = true;
419                break;
420            }
421            Err(mpsc::RecvTimeoutError::Disconnected) => break,
422        }
423    }
424
425    if timed_out {
426        let _ = child.kill();
427        let _ = child.wait();
428        // Both worker threads can block indefinitely if an orphaned
429        // grandchild inherited our pipe fds and keeps the write end
430        // open (e.g. a `bash` script whose `sleep` subprocess outlives
431        // the SIGKILLed shell). Cap the joins so the timeout error
432        // still returns promptly; any thread that misses the deadline
433        // leaks its JoinHandle, which is acceptable for this edge.
434        let budget = Duration::from_millis(200);
435        let stderr_str = join_with_budget(stderr_thread, budget).unwrap_or_default();
436        let _ = join_with_budget(reader_thread, budget);
437        if !stderr_str.is_empty() {
438            warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
439        }
440        return Err(Error::Timeout {
441            timeout_seconds: claude.timeout.map(|d| d.as_secs()).unwrap_or_default(),
442        });
443    }
444
445    // Normal completion: collect reader result (may carry IO error).
446    let reader_result = reader_thread.join().unwrap_or(Ok(()));
447    if let Err(e) = reader_result {
448        let _ = child.kill();
449        let _ = child.wait();
450        let _ = stderr_thread.join();
451        return Err(e);
452    }
453
454    let status = child.wait().map_err(|e| Error::Io {
455        message: "failed to wait for claude process".to_string(),
456        source: e,
457        working_dir: claude.working_dir.clone(),
458    })?;
459    let stderr_str = stderr_thread.join().unwrap_or_default();
460    let exit_code = status.code().unwrap_or(-1);
461
462    if !status.success() {
463        return Err(Error::CommandFailed {
464            command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
465            exit_code,
466            stdout: String::new(),
467            stderr: stderr_str,
468            working_dir: claude.working_dir.clone(),
469        });
470    }
471
472    Ok(CommandOutput {
473        stdout: String::new(),
474        stderr: stderr_str,
475        exit_code,
476        success: true,
477    })
478}
479
480/// Join a worker thread with a time budget. Returns `Some(value)` if
481/// the thread finished in time, `None` if the deadline passed first.
482/// A missed deadline leaks the `JoinHandle`; the thread completes
483/// eventually and its value is dropped.
484#[cfg(all(feature = "sync", feature = "json"))]
485fn join_with_budget<T: Send + 'static>(
486    handle: std::thread::JoinHandle<T>,
487    budget: Duration,
488) -> Option<T> {
489    use std::sync::mpsc;
490    use std::thread;
491
492    let (tx, rx) = mpsc::channel::<T>();
493    thread::spawn(move || {
494        if let Ok(v) = handle.join() {
495            let _ = tx.send(v);
496        }
497    });
498    rx.recv_timeout(budget).ok()
499}