1use std::collections::HashMap;
58use std::path::PathBuf;
59use std::process::Stdio;
60use std::time::Duration;
61use tokio::io::BufReader;
62use tokio::process::Command;
63use tokio::sync::mpsc;
64
65use crate::trace::trace_lazy;
66use crate::{CommandResult, Result};
67
68const DEFAULT_EXIT_PUMP_GRACE_MS: u64 = 100;
72
73const DEFAULT_KILL_SIGNAL: &str = "SIGTERM";
75
76#[derive(Debug, Clone)]
78pub enum OutputChunk {
79 Stdout(Vec<u8>),
81 Stderr(Vec<u8>),
83 Exit(i32),
85}
86
87pub struct StreamingRunner {
89 command: String,
90 cwd: Option<PathBuf>,
91 env: Option<HashMap<String, String>>,
92 stdin_content: Option<String>,
93 kill_signal: String,
94 exit_pump_grace_ms: u64,
95}
96
97impl StreamingRunner {
98 pub fn new(command: impl Into<String>) -> Self {
100 StreamingRunner {
101 command: command.into(),
102 cwd: None,
103 env: None,
104 stdin_content: None,
105 kill_signal: DEFAULT_KILL_SIGNAL.to_string(),
106 exit_pump_grace_ms: DEFAULT_EXIT_PUMP_GRACE_MS,
107 }
108 }
109
110 pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
112 self.cwd = Some(path.into());
113 self
114 }
115
116 pub fn env(mut self, env: HashMap<String, String>) -> Self {
118 self.env = Some(env);
119 self
120 }
121
122 pub fn stdin(mut self, content: impl Into<String>) -> Self {
124 self.stdin_content = Some(content.into());
125 self
126 }
127
128 pub fn kill_signal(mut self, signal: impl Into<String>) -> Self {
135 self.kill_signal = signal.into();
136 self
137 }
138
139 pub fn exit_pump_grace_ms(mut self, ms: u64) -> Self {
143 self.exit_pump_grace_ms = ms;
144 self
145 }
146
147 pub fn stream(mut self) -> OutputStream {
149 let (tx, rx) = mpsc::channel(1024);
150 let (kill_tx, kill_rx) = mpsc::unbounded_channel::<String>();
152
153 let command = self.command.clone();
155 let cwd = self.cwd.take();
156 let env = self.env.take();
157 let stdin_content = self.stdin_content.take();
158 let grace = self.exit_pump_grace_ms;
159 let kill_signal = self.kill_signal.clone();
160
161 tokio::spawn(async move {
162 if let Err(e) =
163 run_streaming_process(command, cwd, env, stdin_content, grace, tx.clone(), kill_rx)
164 .await
165 {
166 trace_lazy("StreamingRunner", || format!("Error: {}", e));
167 }
168 });
169
170 OutputStream {
171 rx,
172 kill_tx,
173 kill_signal,
174 killed: false,
175 }
176 }
177
178 pub async fn collect(self) -> Result<CommandResult> {
180 let mut stdout = Vec::new();
181 let mut stderr = Vec::new();
182 let mut exit_code = 0;
183
184 let mut stream = self.stream();
185 while let Some(chunk) = stream.rx.recv().await {
186 match chunk {
187 OutputChunk::Stdout(data) => stdout.extend(data),
188 OutputChunk::Stderr(data) => stderr.extend(data),
189 OutputChunk::Exit(code) => exit_code = code,
190 }
191 }
192
193 Ok(CommandResult {
194 stdout: String::from_utf8_lossy(&stdout).to_string(),
195 stderr: String::from_utf8_lossy(&stderr).to_string(),
196 code: exit_code,
197 })
198 }
199}
200
201pub struct OutputStream {
203 rx: mpsc::Receiver<OutputChunk>,
204 kill_tx: mpsc::UnboundedSender<String>,
205 kill_signal: String,
206 killed: bool,
207}
208
209impl OutputStream {
210 pub async fn next(&mut self) -> Option<OutputChunk> {
212 self.rx.recv().await
213 }
214
215 pub fn kill(&mut self) {
221 let signal = self.kill_signal.clone();
222 self.kill_with(&signal);
223 }
224
225 pub fn kill_with(&mut self, signal: &str) {
228 if self.killed {
229 return;
230 }
231 self.killed = true;
232 trace_lazy("OutputStream", || format!("kill | signal={}", signal));
233 let _ = self.kill_tx.send(signal.to_string());
236 }
237
238 pub async fn collect(mut self) -> (Vec<u8>, Vec<u8>, i32) {
240 let mut stdout = Vec::new();
241 let mut stderr = Vec::new();
242 let mut exit_code = 0;
243
244 while let Some(chunk) = self.rx.recv().await {
245 match chunk {
246 OutputChunk::Stdout(data) => stdout.extend(data),
247 OutputChunk::Stderr(data) => stderr.extend(data),
248 OutputChunk::Exit(code) => exit_code = code,
249 }
250 }
251
252 (stdout, stderr, exit_code)
253 }
254
255 pub async fn collect_stdout(mut self) -> Vec<u8> {
257 let mut stdout = Vec::new();
258
259 while let Some(chunk) = self.rx.recv().await {
260 if let OutputChunk::Stdout(data) = chunk {
261 stdout.extend(data);
262 }
263 }
264
265 stdout
266 }
267}
268
269impl Drop for OutputStream {
270 fn drop(&mut self) {
271 if !self.killed {
275 let _ = self.kill_tx.send(self.kill_signal.clone());
276 }
277 }
278}
279
280async fn run_streaming_process(
282 command: String,
283 cwd: Option<PathBuf>,
284 env: Option<HashMap<String, String>>,
285 stdin_content: Option<String>,
286 exit_pump_grace_ms: u64,
287 tx: mpsc::Sender<OutputChunk>,
288 mut kill_rx: mpsc::UnboundedReceiver<String>,
289) -> Result<()> {
290 trace_lazy("StreamingRunner", || format!("Starting: {}", command));
291
292 let shell = find_available_shell();
293 let mut cmd = Command::new(&shell.cmd);
294 for arg in &shell.args {
295 cmd.arg(arg);
296 }
297 cmd.arg(&command);
298
299 if stdin_content.is_some() {
301 cmd.stdin(Stdio::piped());
302 } else {
303 cmd.stdin(Stdio::null());
304 }
305 cmd.stdout(Stdio::piped());
306 cmd.stderr(Stdio::piped());
307
308 #[cfg(unix)]
311 cmd.process_group(0);
312
313 if let Some(ref cwd) = cwd {
315 cmd.current_dir(cwd);
316 }
317
318 if let Some(ref env_vars) = env {
320 for (key, value) in env_vars {
321 cmd.env(key, value);
322 }
323 }
324
325 let mut child = cmd.spawn()?;
327
328 if let Some(content) = stdin_content {
330 if let Some(mut stdin) = child.stdin.take() {
331 use tokio::io::AsyncWriteExt;
332 let _ = stdin.write_all(content.as_bytes()).await;
333 let _ = stdin.shutdown().await;
334 }
335 }
336
337 let stdout = child.stdout.take();
339 let tx_stdout = tx.clone();
340 let stdout_handle = stdout.map(|stdout| {
341 tokio::spawn(async move {
342 let mut reader = BufReader::new(stdout);
343 let mut buf = vec![0u8; 8192];
344 loop {
345 use tokio::io::AsyncReadExt;
346 match reader.read(&mut buf).await {
347 Ok(0) => break,
348 Ok(n) => {
349 if tx_stdout
350 .send(OutputChunk::Stdout(buf[..n].to_vec()))
351 .await
352 .is_err()
353 {
354 break;
355 }
356 }
357 Err(_) => break,
358 }
359 }
360 })
361 });
362
363 let stderr = child.stderr.take();
365 let tx_stderr = tx.clone();
366 let stderr_handle = stderr.map(|stderr| {
367 tokio::spawn(async move {
368 let mut reader = BufReader::new(stderr);
369 let mut buf = vec![0u8; 8192];
370 loop {
371 use tokio::io::AsyncReadExt;
372 match reader.read(&mut buf).await {
373 Ok(0) => break,
374 Ok(n) => {
375 if tx_stderr
376 .send(OutputChunk::Stderr(buf[..n].to_vec()))
377 .await
378 .is_err()
379 {
380 break;
381 }
382 }
383 Err(_) => break,
384 }
385 }
386 })
387 });
388
389 let pid = child.id();
394 let code;
395 tokio::select! {
396 status = child.wait() => {
397 code = status_to_code(status?);
398 }
399 maybe_signal = kill_rx.recv() => {
400 let signal = maybe_signal.unwrap_or_else(|| DEFAULT_KILL_SIGNAL.to_string());
403 trace_lazy("StreamingRunner", || format!("Kill requested | signal={}", signal));
404 if let Some(pid) = pid {
405 send_signal_to_process(pid, &signal);
406 }
407 if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), child.wait())
410 .await
411 .is_err()
412 {
413 let _ = child.start_kill();
414 let _ = child.wait().await;
415 }
416 code = 128 + signal_number(&signal);
419 }
420 }
421
422 let stdout_abort = stdout_handle.as_ref().map(|h| h.abort_handle());
426 let stderr_abort = stderr_handle.as_ref().map(|h| h.abort_handle());
427 let drain = async {
428 if let Some(handle) = stdout_handle {
429 let _ = handle.await;
430 }
431 if let Some(handle) = stderr_handle {
432 let _ = handle.await;
433 }
434 };
435 if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), drain)
436 .await
437 .is_err()
438 {
439 if let Some(abort) = stdout_abort {
442 abort.abort();
443 }
444 if let Some(abort) = stderr_abort {
445 abort.abort();
446 }
447 }
448
449 let _ = tx.send(OutputChunk::Exit(code)).await;
451
452 trace_lazy("StreamingRunner", || format!("Exited with code: {}", code));
453
454 Ok(())
455}
456
457fn status_to_code(status: std::process::ExitStatus) -> i32 {
460 if let Some(code) = status.code() {
461 return code;
462 }
463 #[cfg(unix)]
464 {
465 use std::os::unix::process::ExitStatusExt;
466 if let Some(sig) = status.signal() {
467 return 128 + sig;
468 }
469 }
470 -1
471}
472
473fn signal_number(signal: &str) -> i32 {
476 match signal {
477 "SIGHUP" => 1,
478 "SIGINT" => 2,
479 "SIGQUIT" => 3,
480 "SIGKILL" => 9,
481 "SIGUSR1" => 10,
482 "SIGUSR2" => 12,
483 "SIGTERM" => 15,
484 _ => 15,
485 }
486}
487
488#[cfg(unix)]
490fn send_signal_to_process(pid: u32, signal: &str) {
491 use nix::sys::signal::{kill, Signal};
492 use nix::unistd::Pid;
493
494 let sig = match signal {
495 "SIGHUP" => Signal::SIGHUP,
496 "SIGINT" => Signal::SIGINT,
497 "SIGQUIT" => Signal::SIGQUIT,
498 "SIGKILL" => Signal::SIGKILL,
499 "SIGUSR1" => Signal::SIGUSR1,
500 "SIGUSR2" => Signal::SIGUSR2,
501 "SIGTERM" => Signal::SIGTERM,
502 _ => Signal::SIGTERM,
503 };
504
505 let _ = kill(Pid::from_raw(pid as i32), sig);
507 let _ = kill(Pid::from_raw(-(pid as i32)), sig);
509}
510
511#[cfg(not(unix))]
514fn send_signal_to_process(_pid: u32, _signal: &str) {}
515
516#[derive(Debug, Clone)]
518struct ShellConfig {
519 cmd: String,
520 args: Vec<String>,
521}
522
523fn find_available_shell() -> ShellConfig {
525 let is_windows = cfg!(windows);
526
527 if is_windows {
528 ShellConfig {
529 cmd: "cmd.exe".to_string(),
530 args: vec!["/c".to_string()],
531 }
532 } else {
533 let shells = [
534 ("/bin/sh", "-c"),
535 ("/usr/bin/sh", "-c"),
536 ("/bin/bash", "-c"),
537 ];
538
539 for (cmd, arg) in shells {
540 if std::path::Path::new(cmd).exists() {
541 return ShellConfig {
542 cmd: cmd.to_string(),
543 args: vec![arg.to_string()],
544 };
545 }
546 }
547
548 ShellConfig {
549 cmd: "/bin/sh".to_string(),
550 args: vec!["-c".to_string()],
551 }
552 }
553}
554
555#[async_trait::async_trait]
557pub trait AsyncIterator {
558 type Item;
559
560 async fn next(&mut self) -> Option<Self::Item>;
562}
563
564#[async_trait::async_trait]
565impl AsyncIterator for OutputStream {
566 type Item = OutputChunk;
567
568 async fn next(&mut self) -> Option<Self::Item> {
569 self.rx.recv().await
570 }
571}
572
573pub trait IntoStream {
575 fn into_stream(self) -> OutputStream;
577}
578
579impl IntoStream for crate::ProcessRunner {
580 fn into_stream(self) -> OutputStream {
581 let streaming = StreamingRunner::new(self.command().to_string());
582 streaming.stream()
583 }
584}