use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::mpsc;
use crate::trace::trace_lazy;
use crate::{CommandResult, Result};
const DEFAULT_EXIT_PUMP_GRACE_MS: u64 = 100;
const DEFAULT_KILL_SIGNAL: &str = "SIGTERM";
#[derive(Debug, Clone)]
pub enum OutputChunk {
Stdout(Vec<u8>),
Stderr(Vec<u8>),
Exit(i32),
}
pub struct StreamingRunner {
command: String,
cwd: Option<PathBuf>,
env: Option<HashMap<String, String>>,
stdin_content: Option<String>,
kill_signal: String,
exit_pump_grace_ms: u64,
}
impl StreamingRunner {
pub fn new(command: impl Into<String>) -> Self {
StreamingRunner {
command: command.into(),
cwd: None,
env: None,
stdin_content: None,
kill_signal: DEFAULT_KILL_SIGNAL.to_string(),
exit_pump_grace_ms: DEFAULT_EXIT_PUMP_GRACE_MS,
}
}
pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
self.cwd = Some(path.into());
self
}
pub fn env(mut self, env: HashMap<String, String>) -> Self {
self.env = Some(env);
self
}
pub fn stdin(mut self, content: impl Into<String>) -> Self {
self.stdin_content = Some(content.into());
self
}
pub fn kill_signal(mut self, signal: impl Into<String>) -> Self {
self.kill_signal = signal.into();
self
}
pub fn exit_pump_grace_ms(mut self, ms: u64) -> Self {
self.exit_pump_grace_ms = ms;
self
}
pub fn stream(mut self) -> OutputStream {
let (tx, rx) = mpsc::channel(1024);
let (kill_tx, kill_rx) = mpsc::unbounded_channel::<String>();
let command = self.command.clone();
let cwd = self.cwd.take();
let env = self.env.take();
let stdin_content = self.stdin_content.take();
let grace = self.exit_pump_grace_ms;
let kill_signal = self.kill_signal.clone();
tokio::spawn(async move {
if let Err(e) =
run_streaming_process(command, cwd, env, stdin_content, grace, tx.clone(), kill_rx)
.await
{
trace_lazy("StreamingRunner", || format!("Error: {}", e));
}
});
OutputStream {
rx,
kill_tx,
kill_signal,
killed: false,
}
}
pub async fn collect(self) -> Result<CommandResult> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code = 0;
let mut stream = self.stream();
while let Some(chunk) = stream.rx.recv().await {
match chunk {
OutputChunk::Stdout(data) => stdout.extend(data),
OutputChunk::Stderr(data) => stderr.extend(data),
OutputChunk::Exit(code) => exit_code = code,
}
}
Ok(CommandResult {
stdout: String::from_utf8_lossy(&stdout).to_string(),
stderr: String::from_utf8_lossy(&stderr).to_string(),
code: exit_code,
})
}
}
pub struct OutputStream {
rx: mpsc::Receiver<OutputChunk>,
kill_tx: mpsc::UnboundedSender<String>,
kill_signal: String,
killed: bool,
}
impl OutputStream {
pub async fn next(&mut self) -> Option<OutputChunk> {
self.rx.recv().await
}
pub fn kill(&mut self) {
let signal = self.kill_signal.clone();
self.kill_with(&signal);
}
pub fn kill_with(&mut self, signal: &str) {
if self.killed {
return;
}
self.killed = true;
trace_lazy("OutputStream", || format!("kill | signal={}", signal));
let _ = self.kill_tx.send(signal.to_string());
}
pub async fn collect(mut self) -> (Vec<u8>, Vec<u8>, i32) {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code = 0;
while let Some(chunk) = self.rx.recv().await {
match chunk {
OutputChunk::Stdout(data) => stdout.extend(data),
OutputChunk::Stderr(data) => stderr.extend(data),
OutputChunk::Exit(code) => exit_code = code,
}
}
(stdout, stderr, exit_code)
}
pub async fn collect_stdout(mut self) -> Vec<u8> {
let mut stdout = Vec::new();
while let Some(chunk) = self.rx.recv().await {
if let OutputChunk::Stdout(data) = chunk {
stdout.extend(data);
}
}
stdout
}
}
impl Drop for OutputStream {
fn drop(&mut self) {
if !self.killed {
let _ = self.kill_tx.send(self.kill_signal.clone());
}
}
}
async fn run_streaming_process(
command: String,
cwd: Option<PathBuf>,
env: Option<HashMap<String, String>>,
stdin_content: Option<String>,
exit_pump_grace_ms: u64,
tx: mpsc::Sender<OutputChunk>,
mut kill_rx: mpsc::UnboundedReceiver<String>,
) -> Result<()> {
trace_lazy("StreamingRunner", || format!("Starting: {}", command));
let shell = find_available_shell();
let mut cmd = Command::new(&shell.cmd);
for arg in &shell.args {
cmd.arg(arg);
}
cmd.arg(&command);
if stdin_content.is_some() {
cmd.stdin(Stdio::piped());
} else {
cmd.stdin(Stdio::null());
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
#[cfg(unix)]
cmd.process_group(0);
if let Some(ref cwd) = cwd {
cmd.current_dir(cwd);
}
if let Some(ref env_vars) = env {
for (key, value) in env_vars {
cmd.env(key, value);
}
}
let mut child = cmd.spawn()?;
if let Some(content) = stdin_content {
if let Some(mut stdin) = child.stdin.take() {
use tokio::io::AsyncWriteExt;
let _ = stdin.write_all(content.as_bytes()).await;
let _ = stdin.shutdown().await;
}
}
let stdout = child.stdout.take();
let tx_stdout = tx.clone();
let stdout_handle = stdout.map(|stdout| {
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut buf = vec![0u8; 8192];
loop {
use tokio::io::AsyncReadExt;
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx_stdout
.send(OutputChunk::Stdout(buf[..n].to_vec()))
.await
.is_err()
{
break;
}
}
Err(_) => break,
}
}
})
});
let stderr = child.stderr.take();
let tx_stderr = tx.clone();
let stderr_handle = stderr.map(|stderr| {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut buf = vec![0u8; 8192];
loop {
use tokio::io::AsyncReadExt;
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx_stderr
.send(OutputChunk::Stderr(buf[..n].to_vec()))
.await
.is_err()
{
break;
}
}
Err(_) => break,
}
}
})
});
let pid = child.id();
let code;
tokio::select! {
status = child.wait() => {
code = status_to_code(status?);
}
maybe_signal = kill_rx.recv() => {
let signal = maybe_signal.unwrap_or_else(|| DEFAULT_KILL_SIGNAL.to_string());
trace_lazy("StreamingRunner", || format!("Kill requested | signal={}", signal));
if let Some(pid) = pid {
send_signal_to_process(pid, &signal);
}
if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), child.wait())
.await
.is_err()
{
let _ = child.start_kill();
let _ = child.wait().await;
}
code = 128 + signal_number(&signal);
}
}
let stdout_abort = stdout_handle.as_ref().map(|h| h.abort_handle());
let stderr_abort = stderr_handle.as_ref().map(|h| h.abort_handle());
let drain = async {
if let Some(handle) = stdout_handle {
let _ = handle.await;
}
if let Some(handle) = stderr_handle {
let _ = handle.await;
}
};
if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), drain)
.await
.is_err()
{
if let Some(abort) = stdout_abort {
abort.abort();
}
if let Some(abort) = stderr_abort {
abort.abort();
}
}
let _ = tx.send(OutputChunk::Exit(code)).await;
trace_lazy("StreamingRunner", || format!("Exited with code: {}", code));
Ok(())
}
fn status_to_code(status: std::process::ExitStatus) -> i32 {
if let Some(code) = status.code() {
return code;
}
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(sig) = status.signal() {
return 128 + sig;
}
}
-1
}
fn signal_number(signal: &str) -> i32 {
match signal {
"SIGHUP" => 1,
"SIGINT" => 2,
"SIGQUIT" => 3,
"SIGKILL" => 9,
"SIGUSR1" => 10,
"SIGUSR2" => 12,
"SIGTERM" => 15,
_ => 15,
}
}
#[cfg(unix)]
fn send_signal_to_process(pid: u32, signal: &str) {
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let sig = match signal {
"SIGHUP" => Signal::SIGHUP,
"SIGINT" => Signal::SIGINT,
"SIGQUIT" => Signal::SIGQUIT,
"SIGKILL" => Signal::SIGKILL,
"SIGUSR1" => Signal::SIGUSR1,
"SIGUSR2" => Signal::SIGUSR2,
"SIGTERM" => Signal::SIGTERM,
_ => Signal::SIGTERM,
};
let _ = kill(Pid::from_raw(pid as i32), sig);
let _ = kill(Pid::from_raw(-(pid as i32)), sig);
}
#[cfg(not(unix))]
fn send_signal_to_process(_pid: u32, _signal: &str) {}
#[derive(Debug, Clone)]
struct ShellConfig {
cmd: String,
args: Vec<String>,
}
fn find_available_shell() -> ShellConfig {
let is_windows = cfg!(windows);
if is_windows {
ShellConfig {
cmd: "cmd.exe".to_string(),
args: vec!["/c".to_string()],
}
} else {
let shells = [
("/bin/sh", "-c"),
("/usr/bin/sh", "-c"),
("/bin/bash", "-c"),
];
for (cmd, arg) in shells {
if std::path::Path::new(cmd).exists() {
return ShellConfig {
cmd: cmd.to_string(),
args: vec![arg.to_string()],
};
}
}
ShellConfig {
cmd: "/bin/sh".to_string(),
args: vec!["-c".to_string()],
}
}
}
#[async_trait::async_trait]
pub trait AsyncIterator {
type Item;
async fn next(&mut self) -> Option<Self::Item>;
}
#[async_trait::async_trait]
impl AsyncIterator for OutputStream {
type Item = OutputChunk;
async fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().await
}
}
pub trait IntoStream {
fn into_stream(self) -> OutputStream;
}
impl IntoStream for crate::ProcessRunner {
fn into_stream(self) -> OutputStream {
let streaming = StreamingRunner::new(self.command().to_string());
streaming.stream()
}
}