1use std::time::Duration;
2
3use tokio::io::{AsyncBufReadExt, BufReader};
4use tokio::process::Command;
5use tracing::debug;
6
7use crate::Claude;
8use crate::error::{Error, Result};
9use crate::exec::CommandOutput;
10
11#[cfg(feature = "json")]
16#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
17pub struct StreamEvent {
18 #[serde(flatten)]
20 pub data: serde_json::Value,
21}
22
23#[cfg(feature = "json")]
24impl StreamEvent {
25 pub fn event_type(&self) -> Option<&str> {
27 self.data.get("type").and_then(|v| v.as_str())
28 }
29
30 pub fn role(&self) -> Option<&str> {
32 self.data.get("role").and_then(|v| v.as_str())
33 }
34
35 pub fn is_result(&self) -> bool {
37 self.event_type() == Some("result")
38 }
39
40 pub fn result_text(&self) -> Option<&str> {
42 self.data.get("result").and_then(|v| v.as_str())
43 }
44
45 pub fn session_id(&self) -> Option<&str> {
47 self.data.get("session_id").and_then(|v| v.as_str())
48 }
49
50 pub fn cost_usd(&self) -> Option<f64> {
52 self.data.get("cost_usd").and_then(|v| v.as_f64())
53 }
54}
55
56#[cfg(feature = "json")]
83pub async fn stream_query<F>(
84 claude: &Claude,
85 cmd: &crate::command::query::QueryCommand,
86 handler: F,
87) -> Result<CommandOutput>
88where
89 F: FnMut(StreamEvent),
90{
91 if let Some(timeout) = claude.timeout {
92 stream_query_with_timeout(claude, cmd, handler, timeout).await
93 } else {
94 stream_query_internal(claude, cmd, handler).await
95 }
96}
97
98#[cfg(feature = "json")]
99async fn stream_query_internal<F>(
100 claude: &Claude,
101 cmd: &crate::command::query::QueryCommand,
102 mut handler: F,
103) -> Result<CommandOutput>
104where
105 F: FnMut(StreamEvent),
106{
107 use crate::command::ClaudeCommand;
108
109 let args = cmd.args();
110
111 let mut command_args = Vec::new();
112 command_args.extend(claude.global_args.clone());
113 command_args.extend(args);
114
115 debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command");
116
117 let mut cmd = Command::new(&claude.binary);
118 cmd.args(&command_args)
119 .env_remove("CLAUDECODE")
120 .envs(&claude.env)
121 .stdout(std::process::Stdio::piped())
122 .stderr(std::process::Stdio::piped())
123 .stdin(std::process::Stdio::null());
124
125 if let Some(ref dir) = claude.working_dir {
126 cmd.current_dir(dir);
127 }
128
129 let mut child = cmd.spawn().map_err(|e| Error::Io {
130 message: format!("failed to spawn claude: {e}"),
131 source: e,
132 working_dir: claude.working_dir.clone(),
133 })?;
134
135 let stdout = child.stdout.take().expect("stdout was piped");
136 let mut reader = BufReader::new(stdout).lines();
137
138 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
139 message: "failed to read stdout line".to_string(),
140 source: e,
141 working_dir: claude.working_dir.clone(),
142 })? {
143 if line.trim().is_empty() {
144 continue;
145 }
146 match serde_json::from_str::<StreamEvent>(&line) {
147 Ok(event) => handler(event),
148 Err(e) => {
149 debug!(line = %line, error = %e, "failed to parse stream event, skipping");
150 }
151 }
152 }
153
154 let output = child.wait_with_output().await.map_err(|e| Error::Io {
155 message: "failed to wait for claude process".to_string(),
156 source: e,
157 working_dir: claude.working_dir.clone(),
158 })?;
159
160 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
161 let exit_code = output.status.code().unwrap_or(-1);
162
163 if !output.status.success() {
164 return Err(Error::CommandFailed {
165 command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
166 exit_code,
167 stdout: String::new(),
168 stderr,
169 working_dir: claude.working_dir.clone(),
170 });
171 }
172
173 Ok(CommandOutput {
174 stdout: String::new(), stderr,
176 exit_code,
177 success: true,
178 })
179}
180
181#[cfg(feature = "json")]
182async fn stream_query_with_timeout<F>(
183 claude: &Claude,
184 cmd: &crate::command::query::QueryCommand,
185 mut handler: F,
186 timeout: Duration,
187) -> Result<CommandOutput>
188where
189 F: FnMut(StreamEvent),
190{
191 use crate::command::ClaudeCommand;
192
193 let args = cmd.args();
194
195 let mut command_args = Vec::new();
196 command_args.extend(claude.global_args.clone());
197 command_args.extend(args);
198
199 debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command with timeout");
200
201 let mut cmd = Command::new(&claude.binary);
202 cmd.args(&command_args)
203 .env_remove("CLAUDECODE")
204 .envs(&claude.env)
205 .stdout(std::process::Stdio::piped())
206 .stderr(std::process::Stdio::piped())
207 .stdin(std::process::Stdio::null());
208
209 if let Some(ref dir) = claude.working_dir {
210 cmd.current_dir(dir);
211 }
212
213 let mut child = cmd.spawn().map_err(|e| Error::Io {
214 message: format!("failed to spawn claude: {e}"),
215 source: e,
216 working_dir: claude.working_dir.clone(),
217 })?;
218
219 let stdout = child.stdout.take().expect("stdout was piped");
220 let mut reader = BufReader::new(stdout).lines();
221
222 let result = tokio::time::timeout(
224 timeout,
225 read_lines(&mut reader, &mut handler, claude.working_dir.clone()),
226 )
227 .await;
228
229 match result {
230 Ok(Ok(())) => {
231 let output = child.wait_with_output().await.map_err(|e| Error::Io {
233 message: "failed to wait for claude process".to_string(),
234 source: e,
235 working_dir: claude.working_dir.clone(),
236 })?;
237
238 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
239 let exit_code = output.status.code().unwrap_or(-1);
240
241 if !output.status.success() {
242 return Err(Error::CommandFailed {
243 command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
244 exit_code,
245 stdout: String::new(),
246 stderr,
247 working_dir: claude.working_dir.clone(),
248 });
249 }
250
251 Ok(CommandOutput {
252 stdout: String::new(), stderr,
254 exit_code,
255 success: true,
256 })
257 }
258 Ok(Err(e)) => Err(e),
259 Err(_) => {
260 let _ = child.kill().await;
262 Err(Error::Timeout {
263 timeout_seconds: timeout.as_secs(),
264 })
265 }
266 }
267}
268
269#[cfg(feature = "json")]
270async fn read_lines<F>(
271 reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
272 handler: &mut F,
273 working_dir: Option<std::path::PathBuf>,
274) -> Result<()>
275where
276 F: FnMut(StreamEvent),
277{
278 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
279 message: "failed to read stdout line".to_string(),
280 source: e,
281 working_dir: working_dir.clone(),
282 })? {
283 if line.trim().is_empty() {
284 continue;
285 }
286 match serde_json::from_str::<StreamEvent>(&line) {
287 Ok(event) => handler(event),
288 Err(e) => {
289 debug!(line = %line, error = %e, "failed to parse stream event, skipping");
290 }
291 }
292 }
293
294 Ok(())
295}