#[cfg(feature = "json")]
use std::time::Duration;
#[cfg(all(feature = "json", feature = "async"))]
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
#[cfg(all(feature = "json", feature = "async"))]
use tokio::process::{ChildStderr, Command};
#[cfg(feature = "json")]
use tracing::{debug, warn};
#[cfg(feature = "json")]
use crate::Claude;
#[cfg(feature = "json")]
use crate::error::{Error, Result};
#[cfg(feature = "json")]
use crate::exec::CommandOutput;
#[cfg(feature = "json")]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct StreamEvent {
#[serde(flatten)]
pub data: serde_json::Value,
}
#[cfg(feature = "json")]
impl StreamEvent {
pub fn event_type(&self) -> Option<&str> {
self.data.get("type").and_then(|v| v.as_str())
}
pub fn role(&self) -> Option<&str> {
self.data.get("role").and_then(|v| v.as_str())
}
pub fn is_result(&self) -> bool {
self.event_type() == Some("result")
}
pub fn result_text(&self) -> Option<&str> {
self.data.get("result").and_then(|v| v.as_str())
}
pub fn session_id(&self) -> Option<&str> {
self.data.get("session_id").and_then(|v| v.as_str())
}
pub fn cost_usd(&self) -> Option<f64> {
self.data
.get("total_cost_usd")
.or_else(|| self.data.get("cost_usd"))
.and_then(|v| v.as_f64())
}
pub fn partial_message(&self) -> Option<PartialMessageEvent> {
let event = if self.event_type() == Some("stream_event") {
self.data.get("event")?
} else {
&self.data
};
let inner_type = event.get("type")?.as_str()?;
let index = event.get("index").and_then(serde_json::Value::as_u64)?;
let index = u32::try_from(index).ok()?;
match inner_type {
"content_block_start" => {
let block_type = parse_block_type(event.get("content_block")?);
Some(PartialMessageEvent::BlockStart { index, block_type })
}
"content_block_delta" => {
let delta = parse_block_delta(event.get("delta")?);
Some(PartialMessageEvent::BlockDelta { index, delta })
}
"content_block_stop" => Some(PartialMessageEvent::BlockStop { index }),
_ => None,
}
}
}
#[cfg(feature = "json")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PartialMessageEvent {
BlockStart {
index: u32,
block_type: BlockType,
},
BlockDelta {
index: u32,
delta: BlockDelta,
},
BlockStop {
index: u32,
},
}
#[cfg(feature = "json")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockType {
Text,
Thinking,
ToolUse {
id: String,
name: String,
},
Other(String),
}
#[cfg(feature = "json")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockDelta {
Text(String),
Thinking(String),
InputJson(String),
Other,
}
#[cfg(feature = "json")]
fn parse_block_type(content_block: &serde_json::Value) -> BlockType {
let Some(ty) = content_block
.get("type")
.and_then(serde_json::Value::as_str)
else {
return BlockType::Other(String::new());
};
match ty {
"text" => BlockType::Text,
"thinking" => BlockType::Thinking,
"tool_use" => {
let id = content_block
.get("id")
.and_then(serde_json::Value::as_str)
.unwrap_or("")
.to_string();
let name = content_block
.get("name")
.and_then(serde_json::Value::as_str)
.unwrap_or("")
.to_string();
BlockType::ToolUse { id, name }
}
other => BlockType::Other(other.to_string()),
}
}
#[cfg(feature = "json")]
fn parse_block_delta(delta: &serde_json::Value) -> BlockDelta {
let Some(ty) = delta.get("type").and_then(serde_json::Value::as_str) else {
return BlockDelta::Other;
};
match ty {
"text_delta" => delta
.get("text")
.and_then(serde_json::Value::as_str)
.map(|s| BlockDelta::Text(s.to_string()))
.unwrap_or(BlockDelta::Other),
"thinking_delta" => delta
.get("thinking")
.and_then(serde_json::Value::as_str)
.map(|s| BlockDelta::Thinking(s.to_string()))
.unwrap_or(BlockDelta::Other),
"input_json_delta" => delta
.get("partial_json")
.and_then(serde_json::Value::as_str)
.map(|s| BlockDelta::InputJson(s.to_string()))
.unwrap_or(BlockDelta::Other),
_ => BlockDelta::Other,
}
}
#[cfg(all(feature = "json", feature = "async"))]
pub async fn stream_query<F>(
claude: &Claude,
cmd: &crate::command::query::QueryCommand,
handler: F,
) -> Result<CommandOutput>
where
F: FnMut(StreamEvent),
{
stream_query_impl(claude, cmd, handler, claude.timeout).await
}
#[cfg(all(feature = "json", feature = "async"))]
async fn stream_query_impl<F>(
claude: &Claude,
cmd: &crate::command::query::QueryCommand,
mut handler: F,
timeout: Option<Duration>,
) -> Result<CommandOutput>
where
F: FnMut(StreamEvent),
{
use crate::command::ClaudeCommand;
let args = cmd.args();
let mut command_args = Vec::new();
command_args.extend(claude.global_args.clone());
command_args.extend(args);
debug!(
binary = %claude.binary.display(),
args = ?command_args,
timeout = ?timeout,
"streaming claude command"
);
let mut cmd = Command::new(&claude.binary);
cmd.args(&command_args)
.env_remove("CLAUDECODE")
.envs(&claude.env)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null());
if let Some(ref dir) = claude.working_dir {
cmd.current_dir(dir);
}
let mut child = cmd.spawn().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stdout = child.stdout.take().expect("stdout was piped");
let mut stderr = child.stderr.take().expect("stderr was piped");
let mut reader = BufReader::new(stdout).lines();
let drain = drain_stderr(&mut stderr);
let read_future = read_lines(&mut reader, &mut handler, claude.working_dir.clone());
let combined = async {
let (line_result, stderr_str) = tokio::join!(read_future, drain);
(line_result, stderr_str)
};
let (line_result, stderr_str) = match timeout {
Some(d) => match tokio::time::timeout(d, combined).await {
Ok(pair) => pair,
Err(_) => {
let _ = child.kill().await;
let drain_budget = Duration::from_millis(200);
let stderr_str = tokio::time::timeout(drain_budget, drain_stderr(&mut stderr))
.await
.unwrap_or_default();
if !stderr_str.is_empty() {
warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
}
return Err(Error::Timeout {
timeout_seconds: d.as_secs(),
});
}
},
None => combined.await,
};
if let Err(e) = line_result {
let _ = child.kill().await;
return Err(e);
}
let status = child.wait().await.map_err(|e| Error::Io {
message: "failed to wait for claude process".to_string(),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let exit_code = status.code().unwrap_or(-1);
if !status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
exit_code,
stdout: String::new(),
stderr: stderr_str,
working_dir: claude.working_dir.clone(),
});
}
Ok(CommandOutput {
stdout: String::new(), stderr: stderr_str,
exit_code,
success: true,
})
}
#[cfg(all(feature = "json", feature = "async"))]
async fn drain_stderr(stderr: &mut ChildStderr) -> String {
let mut buf = Vec::new();
let _ = stderr.read_to_end(&mut buf).await;
String::from_utf8_lossy(&buf).into_owned()
}
#[cfg(all(feature = "json", feature = "async"))]
async fn read_lines<F>(
reader: &mut tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
handler: &mut F,
working_dir: Option<std::path::PathBuf>,
) -> Result<()>
where
F: FnMut(StreamEvent),
{
while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
message: "failed to read stdout line".to_string(),
source: e,
working_dir: working_dir.clone(),
})? {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<StreamEvent>(&line) {
Ok(event) => handler(event),
Err(e) => {
debug!(line = %line, error = %e, "failed to parse stream event, skipping");
}
}
}
Ok(())
}
#[cfg(all(feature = "sync", feature = "json"))]
pub fn stream_query_sync<F>(
claude: &Claude,
cmd: &crate::command::query::QueryCommand,
mut handler: F,
) -> Result<CommandOutput>
where
F: FnMut(StreamEvent),
{
use std::io::{BufRead as _, Read as _};
use std::process::{Command as StdCommand, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::Instant;
use crate::command::ClaudeCommand;
let args = cmd.args();
let mut command_args = Vec::new();
command_args.extend(claude.global_args.clone());
command_args.extend(args);
debug!(
binary = %claude.binary.display(),
args = ?command_args,
timeout = ?claude.timeout,
"streaming claude command (sync)"
);
let mut cmd_builder = StdCommand::new(&claude.binary);
cmd_builder
.args(&command_args)
.env_remove("CLAUDECODE")
.env_remove("CLAUDE_CODE_ENTRYPOINT")
.envs(&claude.env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(ref dir) = claude.working_dir {
cmd_builder.current_dir(dir);
}
let mut child = cmd_builder.spawn().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stdout = child.stdout.take().expect("stdout was piped");
let stderr = child.stderr.take().expect("stderr was piped");
let (tx, rx) = mpsc::channel::<StreamEvent>();
let reader_wd = claude.working_dir.clone();
let reader_thread = thread::spawn(move || -> Result<()> {
let reader = std::io::BufReader::new(stdout);
for line_res in reader.lines() {
let line = line_res.map_err(|e| Error::Io {
message: "failed to read stdout line".to_string(),
source: e,
working_dir: reader_wd.clone(),
})?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<StreamEvent>(&line) {
Ok(event) => {
if tx.send(event).is_err() {
return Ok(());
}
}
Err(e) => {
debug!(line = %line, error = %e, "failed to parse stream event, skipping");
}
}
}
Ok(())
});
let stderr_thread = thread::spawn(move || -> String {
let mut buf = Vec::new();
let mut stderr = stderr;
let _ = stderr.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
});
let deadline = claude.timeout.map(|d| Instant::now() + d);
let mut timed_out = false;
loop {
let recv_result = match deadline {
Some(d) => {
let now = Instant::now();
if now >= d {
timed_out = true;
break;
}
rx.recv_timeout(d - now)
}
None => rx.recv().map_err(|_| mpsc::RecvTimeoutError::Disconnected),
};
match recv_result {
Ok(event) => handler(event),
Err(mpsc::RecvTimeoutError::Timeout) => {
timed_out = true;
break;
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
if timed_out {
let _ = child.kill();
let _ = child.wait();
let budget = Duration::from_millis(200);
let stderr_str = join_with_budget(stderr_thread, budget).unwrap_or_default();
let _ = join_with_budget(reader_thread, budget);
if !stderr_str.is_empty() {
warn!(stderr = %stderr_str, "stderr from timed-out streaming process");
}
return Err(Error::Timeout {
timeout_seconds: claude.timeout.map(|d| d.as_secs()).unwrap_or_default(),
});
}
let reader_result = reader_thread.join().unwrap_or(Ok(()));
if let Err(e) = reader_result {
let _ = child.kill();
let _ = child.wait();
let _ = stderr_thread.join();
return Err(e);
}
let status = child.wait().map_err(|e| Error::Io {
message: "failed to wait for claude process".to_string(),
source: e,
working_dir: claude.working_dir.clone(),
})?;
let stderr_str = stderr_thread.join().unwrap_or_default();
let exit_code = status.code().unwrap_or(-1);
if !status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
exit_code,
stdout: String::new(),
stderr: stderr_str,
working_dir: claude.working_dir.clone(),
});
}
Ok(CommandOutput {
stdout: String::new(),
stderr: stderr_str,
exit_code,
success: true,
})
}
#[cfg(all(feature = "sync", feature = "json"))]
fn join_with_budget<T: Send + 'static>(
handle: std::thread::JoinHandle<T>,
budget: Duration,
) -> Option<T> {
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel::<T>();
thread::spawn(move || {
if let Ok(v) = handle.join() {
let _ = tx.send(v);
}
});
rx.recv_timeout(budget).ok()
}
#[cfg(all(test, feature = "json"))]
mod tests {
use super::*;
use serde_json::json;
fn parse(v: serde_json::Value) -> StreamEvent {
serde_json::from_value(v).expect("valid StreamEvent")
}
fn wrap(inner: serde_json::Value) -> StreamEvent {
parse(json!({
"type": "stream_event",
"event": inner,
"session_id": "sess-1",
"parent_tool_use_id": null,
"uuid": "11111111-1111-1111-1111-111111111111"
}))
}
#[test]
fn partial_message_text_block_lifecycle() {
let start = wrap(json!({
"type": "content_block_start",
"index": 0,
"content_block": { "type": "text", "text": "" }
}));
assert_eq!(
start.partial_message(),
Some(PartialMessageEvent::BlockStart {
index: 0,
block_type: BlockType::Text,
})
);
let delta = wrap(json!({
"type": "content_block_delta",
"index": 0,
"delta": { "type": "text_delta", "text": "Hello" }
}));
assert_eq!(
delta.partial_message(),
Some(PartialMessageEvent::BlockDelta {
index: 0,
delta: BlockDelta::Text("Hello".into()),
})
);
let stop = wrap(json!({ "type": "content_block_stop", "index": 0 }));
assert_eq!(
stop.partial_message(),
Some(PartialMessageEvent::BlockStop { index: 0 })
);
}
#[test]
fn partial_message_thinking_block_lifecycle() {
let start = wrap(json!({
"type": "content_block_start",
"index": 1,
"content_block": { "type": "thinking", "thinking": "", "signature": "" }
}));
assert_eq!(
start.partial_message(),
Some(PartialMessageEvent::BlockStart {
index: 1,
block_type: BlockType::Thinking,
})
);
let delta = wrap(json!({
"type": "content_block_delta",
"index": 1,
"delta": { "type": "thinking_delta", "thinking": "weighing options" }
}));
assert_eq!(
delta.partial_message(),
Some(PartialMessageEvent::BlockDelta {
index: 1,
delta: BlockDelta::Thinking("weighing options".into()),
})
);
let stop = wrap(json!({ "type": "content_block_stop", "index": 1 }));
assert_eq!(
stop.partial_message(),
Some(PartialMessageEvent::BlockStop { index: 1 })
);
}
#[test]
fn partial_message_tool_use_block_carries_id_and_name() {
let start = wrap(json!({
"type": "content_block_start",
"index": 2,
"content_block": {
"type": "tool_use",
"id": "toolu_abc",
"name": "Bash",
"input": {}
}
}));
assert_eq!(
start.partial_message(),
Some(PartialMessageEvent::BlockStart {
index: 2,
block_type: BlockType::ToolUse {
id: "toolu_abc".into(),
name: "Bash".into(),
},
})
);
let delta = wrap(json!({
"type": "content_block_delta",
"index": 2,
"delta": { "type": "input_json_delta", "partial_json": "{\"cmd\":" }
}));
assert_eq!(
delta.partial_message(),
Some(PartialMessageEvent::BlockDelta {
index: 2,
delta: BlockDelta::InputJson("{\"cmd\":".into()),
})
);
}
#[test]
fn partial_message_unknown_kinds_fall_through_to_other() {
let unknown_block = wrap(json!({
"type": "content_block_start",
"index": 3,
"content_block": { "type": "redacted_thinking", "data": "..." }
}));
assert_eq!(
unknown_block.partial_message(),
Some(PartialMessageEvent::BlockStart {
index: 3,
block_type: BlockType::Other("redacted_thinking".into()),
})
);
let unknown_delta = wrap(json!({
"type": "content_block_delta",
"index": 3,
"delta": { "type": "signature_delta", "signature": "sig" }
}));
assert_eq!(
unknown_delta.partial_message(),
Some(PartialMessageEvent::BlockDelta {
index: 3,
delta: BlockDelta::Other,
})
);
}
#[test]
fn partial_message_returns_none_for_non_partial_events() {
let result = parse(json!({
"type": "result",
"result": "done",
"session_id": "sess-1",
"total_cost_usd": 0.01
}));
assert!(result.partial_message().is_none());
let assistant = parse(json!({
"type": "assistant",
"message": { "role": "assistant", "content": [] },
"session_id": "sess-1"
}));
assert!(assistant.partial_message().is_none());
let message_start = wrap(json!({
"type": "message_start",
"message": { "id": "msg_1", "role": "assistant", "content": [] }
}));
assert!(message_start.partial_message().is_none());
}
#[test]
fn partial_message_accepts_unwrapped_event() {
let raw = parse(json!({
"type": "content_block_delta",
"index": 0,
"delta": { "type": "text_delta", "text": "hi" }
}));
assert_eq!(
raw.partial_message(),
Some(PartialMessageEvent::BlockDelta {
index: 0,
delta: BlockDelta::Text("hi".into()),
})
);
}
}