1use std::collections::HashMap;
42use std::path::PathBuf;
43use std::process::Stdio;
44use tokio::io::BufReader;
45use tokio::process::Command;
46use tokio::sync::mpsc;
47
48use crate::trace::trace_lazy;
49use crate::{CommandResult, Result};
50
51#[derive(Debug, Clone)]
53pub enum OutputChunk {
54 Stdout(Vec<u8>),
56 Stderr(Vec<u8>),
58 Exit(i32),
60}
61
62pub struct StreamingRunner {
64 command: String,
65 cwd: Option<PathBuf>,
66 env: Option<HashMap<String, String>>,
67 stdin_content: Option<String>,
68}
69
70impl StreamingRunner {
71 pub fn new(command: impl Into<String>) -> Self {
73 StreamingRunner {
74 command: command.into(),
75 cwd: None,
76 env: None,
77 stdin_content: None,
78 }
79 }
80
81 pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
83 self.cwd = Some(path.into());
84 self
85 }
86
87 pub fn env(mut self, env: HashMap<String, String>) -> Self {
89 self.env = Some(env);
90 self
91 }
92
93 pub fn stdin(mut self, content: impl Into<String>) -> Self {
95 self.stdin_content = Some(content.into());
96 self
97 }
98
99 pub fn stream(mut self) -> OutputStream {
101 let (tx, rx) = mpsc::channel(1024);
102
103 let command = self.command.clone();
105 let cwd = self.cwd.take();
106 let env = self.env.take();
107 let stdin_content = self.stdin_content.take();
108
109 tokio::spawn(async move {
110 if let Err(e) =
111 run_streaming_process(command, cwd, env, stdin_content, tx.clone()).await
112 {
113 trace_lazy("StreamingRunner", || format!("Error: {}", e));
114 }
115 });
116
117 OutputStream { rx }
118 }
119
120 pub async fn collect(self) -> Result<CommandResult> {
122 let mut stdout = Vec::new();
123 let mut stderr = Vec::new();
124 let mut exit_code = 0;
125
126 let mut stream = self.stream();
127 while let Some(chunk) = stream.rx.recv().await {
128 match chunk {
129 OutputChunk::Stdout(data) => stdout.extend(data),
130 OutputChunk::Stderr(data) => stderr.extend(data),
131 OutputChunk::Exit(code) => exit_code = code,
132 }
133 }
134
135 Ok(CommandResult {
136 stdout: String::from_utf8_lossy(&stdout).to_string(),
137 stderr: String::from_utf8_lossy(&stderr).to_string(),
138 code: exit_code,
139 })
140 }
141}
142
143pub struct OutputStream {
145 rx: mpsc::Receiver<OutputChunk>,
146}
147
148impl OutputStream {
149 pub async fn next(&mut self) -> Option<OutputChunk> {
151 self.rx.recv().await
152 }
153
154 pub async fn collect(mut self) -> (Vec<u8>, Vec<u8>, i32) {
156 let mut stdout = Vec::new();
157 let mut stderr = Vec::new();
158 let mut exit_code = 0;
159
160 while let Some(chunk) = self.rx.recv().await {
161 match chunk {
162 OutputChunk::Stdout(data) => stdout.extend(data),
163 OutputChunk::Stderr(data) => stderr.extend(data),
164 OutputChunk::Exit(code) => exit_code = code,
165 }
166 }
167
168 (stdout, stderr, exit_code)
169 }
170
171 pub async fn collect_stdout(mut self) -> Vec<u8> {
173 let mut stdout = Vec::new();
174
175 while let Some(chunk) = self.rx.recv().await {
176 if let OutputChunk::Stdout(data) = chunk {
177 stdout.extend(data);
178 }
179 }
180
181 stdout
182 }
183}
184
185async fn run_streaming_process(
187 command: String,
188 cwd: Option<PathBuf>,
189 env: Option<HashMap<String, String>>,
190 stdin_content: Option<String>,
191 tx: mpsc::Sender<OutputChunk>,
192) -> Result<()> {
193 trace_lazy("StreamingRunner", || format!("Starting: {}", command));
194
195 let shell = find_available_shell();
196 let mut cmd = Command::new(&shell.cmd);
197 for arg in &shell.args {
198 cmd.arg(arg);
199 }
200 cmd.arg(&command);
201
202 if stdin_content.is_some() {
204 cmd.stdin(Stdio::piped());
205 } else {
206 cmd.stdin(Stdio::null());
207 }
208 cmd.stdout(Stdio::piped());
209 cmd.stderr(Stdio::piped());
210
211 if let Some(ref cwd) = cwd {
213 cmd.current_dir(cwd);
214 }
215
216 if let Some(ref env_vars) = env {
218 for (key, value) in env_vars {
219 cmd.env(key, value);
220 }
221 }
222
223 let mut child = cmd.spawn()?;
225
226 if let Some(content) = stdin_content {
228 if let Some(mut stdin) = child.stdin.take() {
229 use tokio::io::AsyncWriteExt;
230 let _ = stdin.write_all(content.as_bytes()).await;
231 let _ = stdin.shutdown().await;
232 }
233 }
234
235 let stdout = child.stdout.take();
237 let tx_stdout = tx.clone();
238 let stdout_handle = if let Some(stdout) = stdout {
239 Some(tokio::spawn(async move {
240 let mut reader = BufReader::new(stdout);
241 let mut buf = vec![0u8; 8192];
242 loop {
243 use tokio::io::AsyncReadExt;
244 match reader.read(&mut buf).await {
245 Ok(0) => break,
246 Ok(n) => {
247 if tx_stdout
248 .send(OutputChunk::Stdout(buf[..n].to_vec()))
249 .await
250 .is_err()
251 {
252 break;
253 }
254 }
255 Err(_) => break,
256 }
257 }
258 }))
259 } else {
260 None
261 };
262
263 let stderr = child.stderr.take();
265 let tx_stderr = tx.clone();
266 let stderr_handle = if let Some(stderr) = stderr {
267 Some(tokio::spawn(async move {
268 let mut reader = BufReader::new(stderr);
269 let mut buf = vec![0u8; 8192];
270 loop {
271 use tokio::io::AsyncReadExt;
272 match reader.read(&mut buf).await {
273 Ok(0) => break,
274 Ok(n) => {
275 if tx_stderr
276 .send(OutputChunk::Stderr(buf[..n].to_vec()))
277 .await
278 .is_err()
279 {
280 break;
281 }
282 }
283 Err(_) => break,
284 }
285 }
286 }))
287 } else {
288 None
289 };
290
291 if let Some(handle) = stdout_handle {
293 let _ = handle.await;
294 }
295 if let Some(handle) = stderr_handle {
296 let _ = handle.await;
297 }
298
299 let status = child.wait().await?;
301 let code = status.code().unwrap_or(-1);
302
303 let _ = tx.send(OutputChunk::Exit(code)).await;
305
306 trace_lazy("StreamingRunner", || format!("Exited with code: {}", code));
307
308 Ok(())
309}
310
311#[derive(Debug, Clone)]
313struct ShellConfig {
314 cmd: String,
315 args: Vec<String>,
316}
317
318fn find_available_shell() -> ShellConfig {
320 let is_windows = cfg!(windows);
321
322 if is_windows {
323 ShellConfig {
324 cmd: "cmd.exe".to_string(),
325 args: vec!["/c".to_string()],
326 }
327 } else {
328 let shells = [
329 ("/bin/sh", "-c"),
330 ("/usr/bin/sh", "-c"),
331 ("/bin/bash", "-c"),
332 ];
333
334 for (cmd, arg) in shells {
335 if std::path::Path::new(cmd).exists() {
336 return ShellConfig {
337 cmd: cmd.to_string(),
338 args: vec![arg.to_string()],
339 };
340 }
341 }
342
343 ShellConfig {
344 cmd: "/bin/sh".to_string(),
345 args: vec!["-c".to_string()],
346 }
347 }
348}
349
350#[async_trait::async_trait]
352pub trait AsyncIterator {
353 type Item;
354
355 async fn next(&mut self) -> Option<Self::Item>;
357}
358
359#[async_trait::async_trait]
360impl AsyncIterator for OutputStream {
361 type Item = OutputChunk;
362
363 async fn next(&mut self) -> Option<Self::Item> {
364 self.rx.recv().await
365 }
366}
367
368pub trait IntoStream {
370 fn into_stream(self) -> OutputStream;
372}
373
374impl IntoStream for crate::ProcessRunner {
375 fn into_stream(self) -> OutputStream {
376 let streaming = StreamingRunner::new(self.command().to_string());
377 streaming.stream()
378 }
379}