claude_wrapper/
streaming.rs1use std::time::Duration;
2
3use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
4use tokio::process::{ChildStderr, Command};
5use tracing::{debug, warn};
6
7use crate::Claude;
8use crate::error::{Error, Result};
9use crate::exec::CommandOutput;
10
11#[cfg(feature = "json")]
16#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
17pub struct StreamEvent {
18 #[serde(flatten)]
20 pub data: serde_json::Value,
21}
22
23#[cfg(feature = "json")]
24impl StreamEvent {
25 pub fn event_type(&self) -> Option<&str> {
27 self.data.get("type").and_then(|v| v.as_str())
28 }
29
30 pub fn role(&self) -> Option<&str> {
32 self.data.get("role").and_then(|v| v.as_str())
33 }
34
35 pub fn is_result(&self) -> bool {
37 self.event_type() == Some("result")
38 }
39
40 pub fn result_text(&self) -> Option<&str> {
42 self.data.get("result").and_then(|v| v.as_str())
43 }
44
45 pub fn session_id(&self) -> Option<&str> {
47 self.data.get("session_id").and_then(|v| v.as_str())
48 }
49
50 pub fn cost_usd(&self) -> Option<f64> {
55 self.data
56 .get("total_cost_usd")
57 .or_else(|| self.data.get("cost_usd"))
58 .and_then(|v| v.as_f64())
59 }
60}
61
62#[cfg(feature = "json")]
89pub async fn stream_query<F>(
90 claude: &Claude,
91 cmd: &crate::command::query::QueryCommand,
92 handler: F,
93) -> Result<CommandOutput>
94where
95 F: FnMut(StreamEvent),
96{
97 stream_query_impl(claude, cmd, handler, claude.timeout).await
98}
99
100#[cfg(feature = "json")]
112async fn stream_query_impl<F>(
113 claude: &Claude,
114 cmd: &crate::command::query::QueryCommand,
115 mut handler: F,
116 timeout: Option<Duration>,
117) -> Result<CommandOutput>
118where
119 F: FnMut(StreamEvent),
120{
121 use crate::command::ClaudeCommand;
122
123 let args = cmd.args();
124
125 let mut command_args = Vec::new();
126 command_args.extend(claude.global_args.clone());
127 command_args.extend(args);
128
129 debug!(
130 binary = %claude.binary.display(),
131 args = ?command_args,
132 timeout = ?timeout,
133 "streaming claude command"
134 );
135
136 let mut cmd = Command::new(&claude.binary);
137 cmd.args(&command_args)
138 .env_remove("CLAUDECODE")
139 .envs(&claude.env)
140 .stdout(std::process::Stdio::piped())
141 .stderr(std::process::Stdio::piped())
142 .stdin(std::process::Stdio::null());
143
144 if let Some(ref dir) = claude.working_dir {
145 cmd.current_dir(dir);
146 }
147
148 let mut child = cmd.spawn().map_err(|e| Error::Io {
149 message: format!("failed to spawn claude: {e}"),
150 source: e,
151 working_dir: claude.working_dir.clone(),
152 })?;
153
154 let stdout = child.stdout.take().expect("stdout was piped");
155 let mut stderr = child.stderr.take().expect("stderr was piped");
156
157 let mut reader = BufReader::new(stdout).lines();
158
159 let drain = drain_stderr(&mut stderr);
164 let read_future = read_lines(&mut reader, &mut handler, claude.working_dir.clone());
165 let combined = async {
166 let (line_result, stderr_str) = tokio::join!(read_future, drain);
167 (line_result, stderr_str)
168 };
169
170 let (line_result, stderr_str) = match timeout {
171 Some(d) => match tokio::time::timeout(d, combined).await {
172 Ok(pair) => pair,
173 Err(_) => {
174 let _ = child.kill().await;
180 let drain_budget = Duration::from_millis(200);
181 let stderr_str = tokio::time::timeout(drain_budget, drain_stderr(&mut stderr))
182 .await
183 .unwrap_or_default();
184 if !stderr_str.is_empty() {
185 warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
186 }
187 return Err(Error::Timeout {
188 timeout_seconds: d.as_secs(),
189 });
190 }
191 },
192 None => combined.await,
193 };
194
195 if let Err(e) = line_result {
198 let _ = child.kill().await;
199 return Err(e);
200 }
201
202 let status = child.wait().await.map_err(|e| Error::Io {
203 message: "failed to wait for claude process".to_string(),
204 source: e,
205 working_dir: claude.working_dir.clone(),
206 })?;
207
208 let exit_code = status.code().unwrap_or(-1);
209
210 if !status.success() {
211 return Err(Error::CommandFailed {
212 command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
213 exit_code,
214 stdout: String::new(),
215 stderr: stderr_str,
216 working_dir: claude.working_dir.clone(),
217 });
218 }
219
220 Ok(CommandOutput {
221 stdout: String::new(), stderr: stderr_str,
223 exit_code,
224 success: true,
225 })
226}
227
228#[cfg(feature = "json")]
229async fn drain_stderr(stderr: &mut ChildStderr) -> String {
230 let mut buf = Vec::new();
231 let _ = stderr.read_to_end(&mut buf).await;
232 String::from_utf8_lossy(&buf).into_owned()
233}
234
235#[cfg(feature = "json")]
236async fn read_lines<F>(
237 reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
238 handler: &mut F,
239 working_dir: Option<std::path::PathBuf>,
240) -> Result<()>
241where
242 F: FnMut(StreamEvent),
243{
244 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
245 message: "failed to read stdout line".to_string(),
246 source: e,
247 working_dir: working_dir.clone(),
248 })? {
249 if line.trim().is_empty() {
250 continue;
251 }
252 match serde_json::from_str::<StreamEvent>(&line) {
253 Ok(event) => handler(event),
254 Err(e) => {
255 debug!(line = %line, error = %e, "failed to parse stream event, skipping");
256 }
257 }
258 }
259
260 Ok(())
261}