claude_wrapper/
streaming.rs1use tokio::io::{AsyncBufReadExt, BufReader};
2use tokio::process::Command;
3use tracing::debug;
4
5use crate::Claude;
6use crate::error::{Error, Result};
7use crate::exec::CommandOutput;
8
9#[cfg(feature = "json")]
14#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
15pub struct StreamEvent {
16 #[serde(flatten)]
18 pub data: serde_json::Value,
19}
20
21#[cfg(feature = "json")]
22impl StreamEvent {
23 pub fn event_type(&self) -> Option<&str> {
25 self.data.get("type").and_then(|v| v.as_str())
26 }
27
28 pub fn role(&self) -> Option<&str> {
30 self.data.get("role").and_then(|v| v.as_str())
31 }
32
33 pub fn is_result(&self) -> bool {
35 self.event_type() == Some("result")
36 }
37
38 pub fn result_text(&self) -> Option<&str> {
40 self.data.get("result").and_then(|v| v.as_str())
41 }
42
43 pub fn session_id(&self) -> Option<&str> {
45 self.data.get("session_id").and_then(|v| v.as_str())
46 }
47
48 pub fn cost_usd(&self) -> Option<f64> {
50 self.data.get("cost_usd").and_then(|v| v.as_f64())
51 }
52}
53
54#[cfg(feature = "json")]
81pub async fn stream_query<F>(
82 claude: &Claude,
83 cmd: &crate::command::query::QueryCommand,
84 mut handler: F,
85) -> Result<CommandOutput>
86where
87 F: FnMut(StreamEvent),
88{
89 use crate::command::ClaudeCommand;
90
91 let args = cmd.args();
92
93 let mut command_args = Vec::new();
94 command_args.extend(claude.global_args.clone());
95 command_args.extend(args);
96
97 debug!(binary = %claude.binary.display(), args = ?command_args, "streaming claude command");
98
99 let mut cmd = Command::new(&claude.binary);
100 cmd.args(&command_args)
101 .env_remove("CLAUDECODE")
102 .envs(&claude.env)
103 .stdout(std::process::Stdio::piped())
104 .stderr(std::process::Stdio::piped())
105 .stdin(std::process::Stdio::null());
106
107 if let Some(ref dir) = claude.working_dir {
108 cmd.current_dir(dir);
109 }
110
111 let mut child = cmd.spawn().map_err(|e| Error::Io {
112 message: format!("failed to spawn claude: {e}"),
113 source: e,
114 working_dir: claude.working_dir.clone(),
115 })?;
116
117 let stdout = child.stdout.take().expect("stdout was piped");
118 let mut reader = BufReader::new(stdout).lines();
119
120 while let Some(line) = reader.next_line().await.map_err(|e| Error::Io {
121 message: "failed to read stdout line".to_string(),
122 source: e,
123 working_dir: claude.working_dir.clone(),
124 })? {
125 if line.trim().is_empty() {
126 continue;
127 }
128 match serde_json::from_str::<StreamEvent>(&line) {
129 Ok(event) => handler(event),
130 Err(e) => {
131 debug!(line = %line, error = %e, "failed to parse stream event, skipping");
132 }
133 }
134 }
135
136 let output = child.wait_with_output().await.map_err(|e| Error::Io {
137 message: "failed to wait for claude process".to_string(),
138 source: e,
139 working_dir: claude.working_dir.clone(),
140 })?;
141
142 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
143 let exit_code = output.status.code().unwrap_or(-1);
144
145 if !output.status.success() {
146 return Err(Error::CommandFailed {
147 command: format!("{} {}", claude.binary.display(), command_args.join(" ")),
148 exit_code,
149 stdout: String::new(),
150 stderr,
151 working_dir: claude.working_dir.clone(),
152 });
153 }
154
155 Ok(CommandOutput {
156 stdout: String::new(), stderr,
158 exit_code,
159 success: true,
160 })
161}