use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tracing::debug;
use crate::Claude;
use crate::error::{Error, Result};
use crate::exec::CommandOutput;
#[cfg(feature = "json")]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct StreamEvent {
#[serde(flatten)]
pub data: serde_json::Value,
}
#[cfg(feature = "json")]
impl StreamEvent {
pub fn event_type(&self) -> Option<&str> {
self.data.get("type").and_then(|v| v.as_str())
}
pub fn role(&self) -> Option<&str> {
self.data.get("role").and_then(|v| v.as_str())
}
pub fn is_result(&self) -> bool {
self.event_type() == Some("result")
}
pub fn result_text(&self) -> Option<&str> {
self.data.get("result").and_then(|v| v.as_str())
}
pub fn session_id(&self) -> Option<&str> {
self.data.get("session_id").and_then(|v| v.as_str())
}
pub fn cost_usd(&self) -> Option<f64> {
self.data.get("cost_usd").and_then(|v| v.as_f64())
}
}
#[cfg(feature = "json")]
pub async fn stream_query<F>(
claude: &Claude,
cmd: &crate::command::query::QueryCommand,
handler: F,
) -> Result<CommandOutput>
where
F: FnMut(StreamEvent),
{
if let Some(timeout) = claude.timeout {
stream_query_with_timeout(claude, cmd, handler, timeout).await
} else {
stream_query_internal(claude, cmd, handler).await
}
}
#[cfg(feature = "json")]
async fn stream_query_internal<F>(
claude: &Claude,
cmd: &crate::command::query::QueryCommand,
mut handler: F,
) -> Result<CommandOutput>
where
F: FnMut(StreamEvent),
{
use crate::command::ClaudeCommand;
let args = cmd.args();
let mut command_args = Vec::new();
command_args.extend(claude.global_args.clone());
command_args.extend(args);
debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command");
let mut cmd = Command::new(&claude.binary);
cmd.args(&command_args)
.env_remove("CLAUDECODE")
.envs(&claude.env)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null());
if let Some(ref dir) = claude.working_dir {
cmd.current_dir(dir);
}
let mut child = cmd.spawn().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stdout = child.stdout.take().expect("stdout was piped");
let mut reader = BufReader::new(stdout).lines();
while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
message: "failed to read stdout line".to_string(),
source: e,
working_dir: claude.working_dir.clone(),
})? {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<StreamEvent>(&line) {
Ok(event) => handler(event),
Err(e) => {
debug!(line = %line, error = %e, "failed to parse stream event, skipping");
}
}
}
let output = child.wait_with_output().await.map_err(|e| Error::Io {
message: "failed to wait for claude process".to_string(),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code().unwrap_or(-1);
if !output.status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
exit_code,
stdout: String::new(),
stderr,
working_dir: claude.working_dir.clone(),
});
}
Ok(CommandOutput {
stdout: String::new(), stderr,
exit_code,
success: true,
})
}
#[cfg(feature = "json")]
async fn stream_query_with_timeout<F>(
claude: &Claude,
cmd: &crate::command::query::QueryCommand,
mut handler: F,
timeout: Duration,
) -> Result<CommandOutput>
where
F: FnMut(StreamEvent),
{
use crate::command::ClaudeCommand;
let args = cmd.args();
let mut command_args = Vec::new();
command_args.extend(claude.global_args.clone());
command_args.extend(args);
debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command with timeout");
let mut cmd = Command::new(&claude.binary);
cmd.args(&command_args)
.env_remove("CLAUDECODE")
.envs(&claude.env)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null());
if let Some(ref dir) = claude.working_dir {
cmd.current_dir(dir);
}
let mut child = cmd.spawn().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stdout = child.stdout.take().expect("stdout was piped");
let mut reader = BufReader::new(stdout).lines();
let result = tokio::time::timeout(
timeout,
read_lines(&mut reader, &mut handler, claude.working_dir.clone()),
)
.await;
match result {
Ok(Ok(())) => {
let output = child.wait_with_output().await.map_err(|e| Error::Io {
message: "failed to wait for claude process".to_string(),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code().unwrap_or(-1);
if !output.status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
exit_code,
stdout: String::new(),
stderr,
working_dir: claude.working_dir.clone(),
});
}
Ok(CommandOutput {
stdout: String::new(), stderr,
exit_code,
success: true,
})
}
Ok(Err(e)) => Err(e),
Err(_) => {
let _ = child.kill().await;
Err(Error::Timeout {
timeout_seconds: timeout.as_secs(),
})
}
}
}
#[cfg(feature = "json")]
async fn read_lines<F>(
reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
handler: &mut F,
working_dir: Option<std::path::PathBuf>,
) -> Result<()>
where
F: FnMut(StreamEvent),
{
while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
message: "failed to read stdout line".to_string(),
source: e,
working_dir: working_dir.clone(),
})? {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<StreamEvent>(&line) {
Ok(event) => handler(event),
Err(e) => {
debug!(line = %line, error = %e, "failed to parse stream event, skipping");
}
}
}
Ok(())
}