Skip to main content

claude_wrapper/
streaming.rs

1use tokio::io::{AsyncBufReadExt, BufReader};
2use tokio::process::Command;
3use tracing::debug;
4
5use crate::Claude;
6use crate::error::{Error, Result};
7use crate::exec::CommandOutput;
8
9/// A single line from `--output-format stream-json` output.
10///
11/// Each line is an NDJSON object. The structure varies by message type,
12/// so we provide the raw JSON value and convenience accessors.
13#[cfg(feature = "json")]
14#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
15pub struct StreamEvent {
16    /// The raw JSON object for this event.
17    #[serde(flatten)]
18    pub data: serde_json::Value,
19}
20
21#[cfg(feature = "json")]
22impl StreamEvent {
23    /// Get the event type, if present.
24    pub fn event_type(&self) -> Option<&str> {
25        self.data.get("type").and_then(|v| v.as_str())
26    }
27
28    /// Get the message role, if present.
29    pub fn role(&self) -> Option<&str> {
30        self.data.get("role").and_then(|v| v.as_str())
31    }
32
33    /// Check if this is the final result message.
34    pub fn is_result(&self) -> bool {
35        self.event_type() == Some("result")
36    }
37
38    /// Extract the result text from a result event.
39    pub fn result_text(&self) -> Option<&str> {
40        self.data.get("result").and_then(|v| v.as_str())
41    }
42
43    /// Get the session ID if present.
44    pub fn session_id(&self) -> Option<&str> {
45        self.data.get("session_id").and_then(|v| v.as_str())
46    }
47
48    /// Get the cost in USD if present (usually on result events).
49    pub fn cost_usd(&self) -> Option<f64> {
50        self.data.get("cost_usd").and_then(|v| v.as_f64())
51    }
52}
53
54/// Execute a command with streaming output, calling a handler for each NDJSON line.
55///
56/// This spawns the claude process and reads stdout line-by-line, parsing each
57/// as a JSON event and passing it to the handler. Useful for progress tracking
58/// and real-time output processing.
59///
60/// # Example
61///
62/// ```no_run
63/// use claude_wrapper::{Claude, QueryCommand, OutputFormat};
64/// use claude_wrapper::streaming::{StreamEvent, stream_query};
65///
66/// # async fn example() -> claude_wrapper::Result<()> {
67/// let claude = Claude::builder().build()?;
68///
69/// let cmd = QueryCommand::new("explain quicksort")
70///     .output_format(OutputFormat::StreamJson);
71///
72/// let output = stream_query(&claude, &cmd, |event: StreamEvent| {
73///     if let Some(t) = event.event_type() {
74///         println!("[{t}] {:?}", event.data);
75///     }
76/// }).await?;
77/// # Ok(())
78/// # }
79/// ```
80#[cfg(feature = "json")]
81pub async fn stream_query<F>(
82    claude: &Claude,
83    cmd: &crate::command::query::QueryCommand,
84    mut handler: F,
85) -> Result<CommandOutput>
86where
87    F: FnMut(StreamEvent),
88{
89    use crate::command::ClaudeCommand;
90
91    let args = cmd.args();
92
93    let mut command_args = Vec::new();
94    command_args.extend(claude.global_args.clone());
95    command_args.extend(args);
96
97    debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command");
98
99    let mut cmd = Command::new(&claude.binary);
100    cmd.args(&command_args)
101        .env_remove("CLAUDECODE")
102        .envs(&claude.env)
103        .stdout(std::process::Stdio::piped())
104        .stderr(std::process::Stdio::piped())
105        .stdin(std::process::Stdio::null());
106
107    if let Some(ref dir) = claude.working_dir {
108        cmd.current_dir(dir);
109    }
110
111    let mut child = cmd.spawn().map_err(|e| Error::Io {
112        message: format!("failed to spawn claude: {e}"),
113        source: e,
114        working_dir: claude.working_dir.clone(),
115    })?;
116
117    let stdout = child.stdout.take().expect("stdout was piped");
118    let mut reader = BufReader::new(stdout).lines();
119
120    while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
121        message: "failed to read stdout line".to_string(),
122        source: e,
123        working_dir: claude.working_dir.clone(),
124    })? {
125        if line.trim().is_empty() {
126            continue;
127        }
128        match serde_json::from_str::<StreamEvent>(&line) {
129            Ok(event) => handler(event),
130            Err(e) => {
131                debug!(line = %line, error = %e, "failed to parse stream event, skipping");
132            }
133        }
134    }
135
136    let output = child.wait_with_output().await.map_err(|e| Error::Io {
137        message: "failed to wait for claude process".to_string(),
138        source: e,
139        working_dir: claude.working_dir.clone(),
140    })?;
141
142    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
143    let exit_code = output.status.code().unwrap_or(-1);
144
145    if !output.status.success() {
146        return Err(Error::CommandFailed {
147            command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
148            exit_code,
149            stdout: String::new(),
150            stderr,
151            working_dir: claude.working_dir.clone(),
152        });
153    }
154
155    Ok(CommandOutput {
156        stdout: String::new(), // already consumed via streaming
157        stderr,
158        exit_code,
159        success: true,
160    })
161}