use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::mpsc;
use crate::trace::trace_lazy;
use crate::{CommandResult, Result};
#[derive(Debug, Clone)]
pub enum OutputChunk {
Stdout(Vec<u8>),
Stderr(Vec<u8>),
Exit(i32),
}
pub struct StreamingRunner {
command: String,
cwd: Option<PathBuf>,
env: Option<HashMap<String, String>>,
stdin_content: Option<String>,
}
impl StreamingRunner {
pub fn new(command: impl Into<String>) -> Self {
StreamingRunner {
command: command.into(),
cwd: None,
env: None,
stdin_content: None,
}
}
pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
self.cwd = Some(path.into());
self
}
pub fn env(mut self, env: HashMap<String, String>) -> Self {
self.env = Some(env);
self
}
pub fn stdin(mut self, content: impl Into<String>) -> Self {
self.stdin_content = Some(content.into());
self
}
pub fn stream(mut self) -> OutputStream {
let (tx, rx) = mpsc::channel(1024);
let command = self.command.clone();
let cwd = self.cwd.take();
let env = self.env.take();
let stdin_content = self.stdin_content.take();
tokio::spawn(async move {
if let Err(e) =
run_streaming_process(command, cwd, env, stdin_content, tx.clone()).await
{
trace_lazy("StreamingRunner", || format!("Error: {}", e));
}
});
OutputStream { rx }
}
pub async fn collect(self) -> Result<CommandResult> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code = 0;
let mut stream = self.stream();
while let Some(chunk) = stream.rx.recv().await {
match chunk {
OutputChunk::Stdout(data) => stdout.extend(data),
OutputChunk::Stderr(data) => stderr.extend(data),
OutputChunk::Exit(code) => exit_code = code,
}
}
Ok(CommandResult {
stdout: String::from_utf8_lossy(&stdout).to_string(),
stderr: String::from_utf8_lossy(&stderr).to_string(),
code: exit_code,
})
}
}
pub struct OutputStream {
rx: mpsc::Receiver<OutputChunk>,
}
impl OutputStream {
pub async fn next(&mut self) -> Option<OutputChunk> {
self.rx.recv().await
}
pub async fn collect(mut self) -> (Vec<u8>, Vec<u8>, i32) {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code = 0;
while let Some(chunk) = self.rx.recv().await {
match chunk {
OutputChunk::Stdout(data) => stdout.extend(data),
OutputChunk::Stderr(data) => stderr.extend(data),
OutputChunk::Exit(code) => exit_code = code,
}
}
(stdout, stderr, exit_code)
}
pub async fn collect_stdout(mut self) -> Vec<u8> {
let mut stdout = Vec::new();
while let Some(chunk) = self.rx.recv().await {
if let OutputChunk::Stdout(data) = chunk {
stdout.extend(data);
}
}
stdout
}
}
async fn run_streaming_process(
command: String,
cwd: Option<PathBuf>,
env: Option<HashMap<String, String>>,
stdin_content: Option<String>,
tx: mpsc::Sender<OutputChunk>,
) -> Result<()> {
trace_lazy("StreamingRunner", || format!("Starting: {}", command));
let shell = find_available_shell();
let mut cmd = Command::new(&shell.cmd);
for arg in &shell.args {
cmd.arg(arg);
}
cmd.arg(&command);
if stdin_content.is_some() {
cmd.stdin(Stdio::piped());
} else {
cmd.stdin(Stdio::null());
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref cwd) = cwd {
cmd.current_dir(cwd);
}
if let Some(ref env_vars) = env {
for (key, value) in env_vars {
cmd.env(key, value);
}
}
let mut child = cmd.spawn()?;
if let Some(content) = stdin_content {
if let Some(mut stdin) = child.stdin.take() {
use tokio::io::AsyncWriteExt;
let _ = stdin.write_all(content.as_bytes()).await;
let _ = stdin.shutdown().await;
}
}
let stdout = child.stdout.take();
let tx_stdout = tx.clone();
let stdout_handle = if let Some(stdout) = stdout {
Some(tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut buf = vec![0u8; 8192];
loop {
use tokio::io::AsyncReadExt;
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx_stdout
.send(OutputChunk::Stdout(buf[..n].to_vec()))
.await
.is_err()
{
break;
}
}
Err(_) => break,
}
}
}))
} else {
None
};
let stderr = child.stderr.take();
let tx_stderr = tx.clone();
let stderr_handle = if let Some(stderr) = stderr {
Some(tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut buf = vec![0u8; 8192];
loop {
use tokio::io::AsyncReadExt;
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if tx_stderr
.send(OutputChunk::Stderr(buf[..n].to_vec()))
.await
.is_err()
{
break;
}
}
Err(_) => break,
}
}
}))
} else {
None
};
if let Some(handle) = stdout_handle {
let _ = handle.await;
}
if let Some(handle) = stderr_handle {
let _ = handle.await;
}
let status = child.wait().await?;
let code = status.code().unwrap_or(-1);
let _ = tx.send(OutputChunk::Exit(code)).await;
trace_lazy("StreamingRunner", || format!("Exited with code: {}", code));
Ok(())
}
#[derive(Debug, Clone)]
struct ShellConfig {
cmd: String,
args: Vec<String>,
}
fn find_available_shell() -> ShellConfig {
let is_windows = cfg!(windows);
if is_windows {
ShellConfig {
cmd: "cmd.exe".to_string(),
args: vec!["/c".to_string()],
}
} else {
let shells = [
("/bin/sh", "-c"),
("/usr/bin/sh", "-c"),
("/bin/bash", "-c"),
];
for (cmd, arg) in shells {
if std::path::Path::new(cmd).exists() {
return ShellConfig {
cmd: cmd.to_string(),
args: vec![arg.to_string()],
};
}
}
ShellConfig {
cmd: "/bin/sh".to_string(),
args: vec!["-c".to_string()],
}
}
}
#[async_trait::async_trait]
pub trait AsyncIterator {
type Item;
async fn next(&mut self) -> Option<Self::Item>;
}
#[async_trait::async_trait]
impl AsyncIterator for OutputStream {
type Item = OutputChunk;
async fn next(&mut self) -> Option<Self::Item> {
self.rx.recv().await
}
}
pub trait IntoStream {
fn into_stream(self) -> OutputStream;
}
impl IntoStream for crate::ProcessRunner {
fn into_stream(self) -> OutputStream {
let streaming = StreamingRunner::new(self.command().to_string());
streaming.stream()
}
}