Skip to main content

claude_wrapper/
streaming.rs

1use std::time::Duration;
2
3use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
4use tokio::process::{ChildStderr, Command};
5use tracing::{debug, warn};
6
7use crate::Claude;
8use crate::error::{Error, Result};
9use crate::exec::CommandOutput;
10
11/// A single line from `--output-format stream-json` output.
12///
13/// Each line is an NDJSON object. The structure varies by message type,
14/// so we provide the raw JSON value and convenience accessors.
15#[cfg(feature = "json")]
16#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
17pub struct StreamEvent {
18    /// The raw JSON object for this event.
19    #[serde(flatten)]
20    pub data: serde_json::Value,
21}
22
23#[cfg(feature = "json")]
24impl StreamEvent {
25    /// Get the event type, if present.
26    pub fn event_type(&self) -> Option<&str> {
27        self.data.get("type").and_then(|v| v.as_str())
28    }
29
30    /// Get the message role, if present.
31    pub fn role(&self) -> Option<&str> {
32        self.data.get("role").and_then(|v| v.as_str())
33    }
34
35    /// Check if this is the final result message.
36    pub fn is_result(&self) -> bool {
37        self.event_type() == Some("result")
38    }
39
40    /// Extract the result text from a result event.
41    pub fn result_text(&self) -> Option<&str> {
42        self.data.get("result").and_then(|v| v.as_str())
43    }
44
45    /// Get the session ID if present.
46    pub fn session_id(&self) -> Option<&str> {
47        self.data.get("session_id").and_then(|v| v.as_str())
48    }
49
50    /// Get the cost in USD if present (usually on result events).
51    ///
52    /// Prefers `total_cost_usd` (the CLI's primary key) and falls back
53    /// to the legacy `cost_usd` alias.
54    pub fn cost_usd(&self) -> Option<f64> {
55        self.data
56            .get("total_cost_usd")
57            .or_else(|| self.data.get("cost_usd"))
58            .and_then(|v| v.as_f64())
59    }
60}
61
62/// Execute a command with streaming output, calling a handler for each NDJSON line.
63///
64/// This spawns the claude process and reads stdout line-by-line, parsing each
65/// as a JSON event and passing it to the handler. Useful for progress tracking
66/// and real-time output processing.
67///
68/// # Example
69///
70/// ```no_run
71/// use claude_wrapper::{Claude, QueryCommand, OutputFormat};
72/// use claude_wrapper::streaming::{StreamEvent, stream_query};
73///
74/// # async fn example() -> claude_wrapper::Result<()> {
75/// let claude = Claude::builder().build()?;
76///
77/// let cmd = QueryCommand::new("explain quicksort")
78///     .output_format(OutputFormat::StreamJson);
79///
80/// let output = stream_query(&claude, &cmd, |event: StreamEvent| {
81///     if let Some(t) = event.event_type() {
82///         println!("[{t}] {:?}", event.data);
83///     }
84/// }).await?;
85/// # Ok(())
86/// # }
87/// ```
88#[cfg(feature = "json")]
89pub async fn stream_query<F>(
90    claude: &Claude,
91    cmd: &crate::command::query::QueryCommand,
92    handler: F,
93) -> Result<CommandOutput>
94where
95    F: FnMut(StreamEvent),
96{
97    stream_query_impl(claude, cmd, handler, claude.timeout).await
98}
99
100/// Unified streaming implementation with optional timeout.
101///
102/// Reads stderr concurrently in a background task so a chatty child
103/// cannot deadlock by filling the stderr pipe buffer, and so any
104/// captured stderr is available even on timeout or IO error.
105///
106/// On timeout, the child is killed and reaped (`kill().await` sends
107/// SIGKILL and waits), and whatever stderr was produced is logged at
108/// warn level. The returned `Error::Timeout` does not carry partial
109/// output -- streamed stdout events were already dispatched to the
110/// handler as they arrived.
111#[cfg(feature = "json")]
112async fn stream_query_impl<F>(
113    claude: &Claude,
114    cmd: &crate::command::query::QueryCommand,
115    mut handler: F,
116    timeout: Option<Duration>,
117) -> Result<CommandOutput>
118where
119    F: FnMut(StreamEvent),
120{
121    use crate::command::ClaudeCommand;
122
123    let args = cmd.args();
124
125    let mut command_args = Vec::new();
126    command_args.extend(claude.global_args.clone());
127    command_args.extend(args);
128
129    debug!(
130        binary = %claude.binary.display(),
131        args = ?command_args,
132        timeout = ?timeout,
133        "streaming claude command"
134    );
135
136    let mut cmd = Command::new(&claude.binary);
137    cmd.args(&command_args)
138        .env_remove("CLAUDECODE")
139        .envs(&claude.env)
140        .stdout(std::process::Stdio::piped())
141        .stderr(std::process::Stdio::piped())
142        .stdin(std::process::Stdio::null());
143
144    if let Some(ref dir) = claude.working_dir {
145        cmd.current_dir(dir);
146    }
147
148    let mut child = cmd.spawn().map_err(|e| Error::Io {
149        message: format!("failed to spawn claude: {e}"),
150        source: e,
151        working_dir: claude.working_dir.clone(),
152    })?;
153
154    let stdout = child.stdout.take().expect("stdout was piped");
155    let mut stderr = child.stderr.take().expect("stderr was piped");
156
157    let mut reader = BufReader::new(stdout).lines();
158
159    // Run stdout line reading and stderr draining concurrently so a
160    // chatty child can't deadlock by filling the stderr pipe buffer.
161    // tokio::join! polls both futures on the same task (no tokio::spawn
162    // needed, so we avoid pulling in the `rt` feature).
163    let drain = drain_stderr(&mut stderr);
164    let read_future = read_lines(&mut reader, &mut handler, claude.working_dir.clone());
165    let combined = async {
166        let (line_result, stderr_str) = tokio::join!(read_future, drain);
167        (line_result, stderr_str)
168    };
169
170    let (line_result, stderr_str) = match timeout {
171        Some(d) => match tokio::time::timeout(d, combined).await {
172            Ok(pair) => pair,
173            Err(_) => {
174                // Timeout: kill the child (reaps via start_kill + wait)
175                // and try to drain whatever stderr remains. kill() only
176                // targets the direct child, so a subprocess tree holding
177                // our pipe fds could block the drain -- cap it with a
178                // short deadline.
179                let _ = child.kill().await;
180                let drain_budget = Duration::from_millis(200);
181                let stderr_str = tokio::time::timeout(drain_budget, drain_stderr(&mut stderr))
182                    .await
183                    .unwrap_or_default();
184                if !stderr_str.is_empty() {
185                    warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
186                }
187                return Err(Error::Timeout {
188                    timeout_seconds: d.as_secs(),
189                });
190            }
191        },
192        None => combined.await,
193    };
194
195    // If reading lines failed partway through (IO error, not timeout),
196    // clean up the child before returning.
197    if let Err(e) = line_result {
198        let _ = child.kill().await;
199        return Err(e);
200    }
201
202    let status = child.wait().await.map_err(|e| Error::Io {
203        message: "failed to wait for claude process".to_string(),
204        source: e,
205        working_dir: claude.working_dir.clone(),
206    })?;
207
208    let exit_code = status.code().unwrap_or(-1);
209
210    if !status.success() {
211        return Err(Error::CommandFailed {
212            command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
213            exit_code,
214            stdout: String::new(),
215            stderr: stderr_str,
216            working_dir: claude.working_dir.clone(),
217        });
218    }
219
220    Ok(CommandOutput {
221        stdout: String::new(), // already consumed via streaming
222        stderr: stderr_str,
223        exit_code,
224        success: true,
225    })
226}
227
228#[cfg(feature = "json")]
229async fn drain_stderr(stderr: &mut ChildStderr) -> String {
230    let mut buf = Vec::new();
231    let _ = stderr.read_to_end(&mut buf).await;
232    String::from_utf8_lossy(&buf).into_owned()
233}
234
235#[cfg(feature = "json")]
236async fn read_lines<F>(
237    reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
238    handler: &mut F,
239    working_dir: Option<std::path::PathBuf>,
240) -> Result<()>
241where
242    F: FnMut(StreamEvent),
243{
244    while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
245        message: "failed to read stdout line".to_string(),
246        source: e,
247        working_dir: working_dir.clone(),
248    })? {
249        if line.trim().is_empty() {
250            continue;
251        }
252        match serde_json::from_str::<StreamEvent>(&line) {
253            Ok(event) => handler(event),
254            Err(e) => {
255                debug!(line = %line, error = %e, "failed to parse stream event, skipping");
256            }
257        }
258    }
259
260    Ok(())
261}