Skip to main content

claude_wrapper/
streaming.rs

1use std::time::Duration;
2
3use tokio::io::{AsyncBufReadExt, BufReader};
4use tokio::process::Command;
5use tracing::debug;
6
7use crate::Claude;
8use crate::error::{Error, Result};
9use crate::exec::CommandOutput;
10
11/// A single line from `--output-format stream-json` output.
12///
13/// Each line is an NDJSON object. The structure varies by message type,
14/// so we provide the raw JSON value and convenience accessors.
15#[cfg(feature = "json")]
16#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
17pub struct StreamEvent {
18    /// The raw JSON object for this event.
19    #[serde(flatten)]
20    pub data: serde_json::Value,
21}
22
23#[cfg(feature = "json")]
24impl StreamEvent {
25    /// Get the event type, if present.
26    pub fn event_type(&self) -> Option<&str> {
27        self.data.get("type").and_then(|v| v.as_str())
28    }
29
30    /// Get the message role, if present.
31    pub fn role(&self) -> Option<&str> {
32        self.data.get("role").and_then(|v| v.as_str())
33    }
34
35    /// Check if this is the final result message.
36    pub fn is_result(&self) -> bool {
37        self.event_type() == Some("result")
38    }
39
40    /// Extract the result text from a result event.
41    pub fn result_text(&self) -> Option<&str> {
42        self.data.get("result").and_then(|v| v.as_str())
43    }
44
45    /// Get the session ID if present.
46    pub fn session_id(&self) -> Option<&str> {
47        self.data.get("session_id").and_then(|v| v.as_str())
48    }
49
50    /// Get the cost in USD if present (usually on result events).
51    pub fn cost_usd(&self) -> Option<f64> {
52        self.data.get("cost_usd").and_then(|v| v.as_f64())
53    }
54}
55
56/// Execute a command with streaming output, calling a handler for each NDJSON line.
57///
58/// This spawns the claude process and reads stdout line-by-line, parsing each
59/// as a JSON event and passing it to the handler. Useful for progress tracking
60/// and real-time output processing.
61///
62/// # Example
63///
64/// ```no_run
65/// use claude_wrapper::{Claude, QueryCommand, OutputFormat};
66/// use claude_wrapper::streaming::{StreamEvent, stream_query};
67///
68/// # async fn example() -> claude_wrapper::Result<()> {
69/// let claude = Claude::builder().build()?;
70///
71/// let cmd = QueryCommand::new("explain quicksort")
72///     .output_format(OutputFormat::StreamJson);
73///
74/// let output = stream_query(&claude, &cmd, |event: StreamEvent| {
75///     if let Some(t) = event.event_type() {
76///         println!("[{t}] {:?}", event.data);
77///     }
78/// }).await?;
79/// # Ok(())
80/// # }
81/// ```
82#[cfg(feature = "json")]
83pub async fn stream_query<F>(
84    claude: &Claude,
85    cmd: &crate::command::query::QueryCommand,
86    handler: F,
87) -> Result<CommandOutput>
88where
89    F: FnMut(StreamEvent),
90{
91    if let Some(timeout) = claude.timeout {
92        stream_query_with_timeout(claude, cmd, handler, timeout).await
93    } else {
94        stream_query_internal(claude, cmd, handler).await
95    }
96}
97
98#[cfg(feature = "json")]
99async fn stream_query_internal<F>(
100    claude: &Claude,
101    cmd: &crate::command::query::QueryCommand,
102    mut handler: F,
103) -> Result<CommandOutput>
104where
105    F: FnMut(StreamEvent),
106{
107    use crate::command::ClaudeCommand;
108
109    let args = cmd.args();
110
111    let mut command_args = Vec::new();
112    command_args.extend(claude.global_args.clone());
113    command_args.extend(args);
114
115    debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command");
116
117    let mut cmd = Command::new(&claude.binary);
118    cmd.args(&command_args)
119        .env_remove("CLAUDECODE")
120        .envs(&claude.env)
121        .stdout(std::process::Stdio::piped())
122        .stderr(std::process::Stdio::piped())
123        .stdin(std::process::Stdio::null());
124
125    if let Some(ref dir) = claude.working_dir {
126        cmd.current_dir(dir);
127    }
128
129    let mut child = cmd.spawn().map_err(|e| Error::Io {
130        message: format!("failed to spawn claude: {e}"),
131        source: e,
132        working_dir: claude.working_dir.clone(),
133    })?;
134
135    let stdout = child.stdout.take().expect("stdout was piped");
136    let mut reader = BufReader::new(stdout).lines();
137
138    while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
139        message: "failed to read stdout line".to_string(),
140        source: e,
141        working_dir: claude.working_dir.clone(),
142    })? {
143        if line.trim().is_empty() {
144            continue;
145        }
146        match serde_json::from_str::<StreamEvent>(&line) {
147            Ok(event) => handler(event),
148            Err(e) => {
149                debug!(line = %line, error = %e, "failed to parse stream event, skipping");
150            }
151        }
152    }
153
154    let output = child.wait_with_output().await.map_err(|e| Error::Io {
155        message: "failed to wait for claude process".to_string(),
156        source: e,
157        working_dir: claude.working_dir.clone(),
158    })?;
159
160    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
161    let exit_code = output.status.code().unwrap_or(-1);
162
163    if !output.status.success() {
164        return Err(Error::CommandFailed {
165            command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
166            exit_code,
167            stdout: String::new(),
168            stderr,
169            working_dir: claude.working_dir.clone(),
170        });
171    }
172
173    Ok(CommandOutput {
174        stdout: String::new(), // already consumed via streaming
175        stderr,
176        exit_code,
177        success: true,
178    })
179}
180
181#[cfg(feature = "json")]
182async fn stream_query_with_timeout<F>(
183    claude: &Claude,
184    cmd: &crate::command::query::QueryCommand,
185    mut handler: F,
186    timeout: Duration,
187) -> Result<CommandOutput>
188where
189    F: FnMut(StreamEvent),
190{
191    use crate::command::ClaudeCommand;
192
193    let args = cmd.args();
194
195    let mut command_args = Vec::new();
196    command_args.extend(claude.global_args.clone());
197    command_args.extend(args);
198
199    debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command with timeout");
200
201    let mut cmd = Command::new(&claude.binary);
202    cmd.args(&command_args)
203        .env_remove("CLAUDECODE")
204        .envs(&claude.env)
205        .stdout(std::process::Stdio::piped())
206        .stderr(std::process::Stdio::piped())
207        .stdin(std::process::Stdio::null());
208
209    if let Some(ref dir) = claude.working_dir {
210        cmd.current_dir(dir);
211    }
212
213    let mut child = cmd.spawn().map_err(|e| Error::Io {
214        message: format!("failed to spawn claude: {e}"),
215        source: e,
216        working_dir: claude.working_dir.clone(),
217    })?;
218
219    let stdout = child.stdout.take().expect("stdout was piped");
220    let mut reader = BufReader::new(stdout).lines();
221
222    // Wrap the line-reading in a timeout
223    let result = tokio::time::timeout(
224        timeout,
225        read_lines(&mut reader, &mut handler, claude.working_dir.clone()),
226    )
227    .await;
228
229    match result {
230        Ok(Ok(())) => {
231            // Successfully read all lines; now wait for the process
232            let output = child.wait_with_output().await.map_err(|e| Error::Io {
233                message: "failed to wait for claude process".to_string(),
234                source: e,
235                working_dir: claude.working_dir.clone(),
236            })?;
237
238            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
239            let exit_code = output.status.code().unwrap_or(-1);
240
241            if !output.status.success() {
242                return Err(Error::CommandFailed {
243                    command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
244                    exit_code,
245                    stdout: String::new(),
246                    stderr,
247                    working_dir: claude.working_dir.clone(),
248                });
249            }
250
251            Ok(CommandOutput {
252                stdout: String::new(), // already consumed via streaming
253                stderr,
254                exit_code,
255                success: true,
256            })
257        }
258        Ok(Err(e)) => Err(e),
259        Err(_) => {
260            // Timeout occurred; kill the child process
261            let _ = child.kill().await;
262            Err(Error::Timeout {
263                timeout_seconds: timeout.as_secs(),
264            })
265        }
266    }
267}
268
269#[cfg(feature = "json")]
270async fn read_lines<F>(
271    reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
272    handler: &mut F,
273    working_dir: Option<std::path::PathBuf>,
274) -> Result<()>
275where
276    F: FnMut(StreamEvent),
277{
278    while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
279        message: "failed to read stdout line".to_string(),
280        source: e,
281        working_dir: working_dir.clone(),
282    })? {
283        if line.trim().is_empty() {
284            continue;
285        }
286        match serde_json::from_str::<StreamEvent>(&line) {
287            Ok(event) => handler(event),
288            Err(e) => {
289                debug!(line = %line, error = %e, "failed to parse stream event, skipping");
290            }
291        }
292    }
293
294    Ok(())
295}