use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use crate::trace::trace_lazy;
use crate::{CommandResult, Result, RunOptions, StdinOption};
#[derive(Debug, Clone)]
pub struct Pipeline {
commands: Vec<String>,
stdin: Option<String>,
cwd: Option<PathBuf>,
env: Option<HashMap<String, String>>,
mirror: bool,
capture: bool,
}
impl Default for Pipeline {
fn default() -> Self {
Self::new()
}
}
impl Pipeline {
pub fn new() -> Self {
Pipeline {
commands: Vec::new(),
stdin: None,
cwd: None,
env: None,
mirror: true,
capture: true,
}
}
pub fn add(mut self, command: impl Into<String>) -> Self {
self.commands.push(command.into());
self
}
pub fn stdin(mut self, content: impl Into<String>) -> Self {
self.stdin = Some(content.into());
self
}
pub fn cwd(mut self, path: impl Into<PathBuf>) -> Self {
self.cwd = Some(path.into());
self
}
pub fn env(mut self, env: HashMap<String, String>) -> Self {
self.env = Some(env);
self
}
pub fn mirror_output(mut self, mirror: bool) -> Self {
self.mirror = mirror;
self
}
pub fn capture_output(mut self, capture: bool) -> Self {
self.capture = capture;
self
}
pub async fn run(self) -> Result<CommandResult> {
if self.commands.is_empty() {
return Ok(CommandResult {
stdout: String::new(),
stderr: "No commands in pipeline".to_string(),
code: 1,
});
}
trace_lazy("Pipeline", || {
format!("Running pipeline with {} commands", self.commands.len())
});
let mut current_stdin = self.stdin.clone();
let mut last_result = CommandResult {
stdout: String::new(),
stderr: String::new(),
code: 0,
};
let mut accumulated_stderr = String::new();
for (i, cmd_str) in self.commands.iter().enumerate() {
let is_last = i == self.commands.len() - 1;
trace_lazy("Pipeline", || {
format!(
"Executing command {}/{}: {}",
i + 1,
self.commands.len(),
cmd_str
)
});
let first_word = cmd_str.split_whitespace().next().unwrap_or("");
if crate::commands::are_virtual_commands_enabled() {
if let Some(result) = self
.try_virtual_command(first_word, cmd_str, ¤t_stdin)
.await
{
if result.code != 0 {
return Ok(CommandResult {
stdout: result.stdout,
stderr: accumulated_stderr + &result.stderr,
code: result.code,
});
}
current_stdin = Some(result.stdout.clone());
accumulated_stderr.push_str(&result.stderr);
last_result = result;
continue;
}
}
let shell = find_available_shell();
let mut cmd = Command::new(&shell.cmd);
for arg in &shell.args {
cmd.arg(arg);
}
cmd.arg(cmd_str);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref cwd) = self.cwd {
cmd.current_dir(cwd);
}
if let Some(ref env_vars) = self.env {
for (key, value) in env_vars {
cmd.env(key, value);
}
}
let mut child = cmd.spawn()?;
if let Some(ref stdin_content) = current_stdin {
if let Some(mut stdin) = child.stdin.take() {
let content = stdin_content.clone();
tokio::spawn(async move {
let _ = stdin.write_all(content.as_bytes()).await;
let _ = stdin.shutdown().await;
});
}
}
let mut stdout_content = String::new();
if let Some(mut stdout) = child.stdout.take() {
stdout.read_to_string(&mut stdout_content).await?;
}
let mut stderr_content = String::new();
if let Some(mut stderr) = child.stderr.take() {
stderr.read_to_string(&mut stderr_content).await?;
}
if is_last && self.mirror {
if !stdout_content.is_empty() {
print!("{}", stdout_content);
}
if !stderr_content.is_empty() {
eprint!("{}", stderr_content);
}
}
let status = child.wait().await?;
let code = status.code().unwrap_or(-1);
accumulated_stderr.push_str(&stderr_content);
if code != 0 {
return Ok(CommandResult {
stdout: stdout_content,
stderr: accumulated_stderr,
code,
});
}
current_stdin = Some(stdout_content.clone());
last_result = CommandResult {
stdout: stdout_content,
stderr: String::new(),
code,
};
}
Ok(CommandResult {
stdout: last_result.stdout,
stderr: accumulated_stderr,
code: last_result.code,
})
}
async fn try_virtual_command(
&self,
cmd_name: &str,
full_cmd: &str,
stdin: &Option<String>,
) -> Option<CommandResult> {
let parts: Vec<&str> = full_cmd.split_whitespace().collect();
let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
let ctx = crate::commands::CommandContext {
args,
stdin: stdin.clone(),
cwd: self.cwd.clone(),
env: self.env.clone(),
output_tx: None,
is_cancelled: None,
};
match cmd_name {
"echo" => Some(crate::commands::echo(ctx).await),
"pwd" => Some(crate::commands::pwd(ctx).await),
"cd" => Some(crate::commands::cd(ctx).await),
"true" => Some(crate::commands::r#true(ctx).await),
"false" => Some(crate::commands::r#false(ctx).await),
"sleep" => Some(crate::commands::sleep(ctx).await),
"cat" => Some(crate::commands::cat(ctx).await),
"ls" => Some(crate::commands::ls(ctx).await),
"mkdir" => Some(crate::commands::mkdir(ctx).await),
"rm" => Some(crate::commands::rm(ctx).await),
"touch" => Some(crate::commands::touch(ctx).await),
"cp" => Some(crate::commands::cp(ctx).await),
"mv" => Some(crate::commands::mv(ctx).await),
"basename" => Some(crate::commands::basename(ctx).await),
"dirname" => Some(crate::commands::dirname(ctx).await),
"env" => Some(crate::commands::env(ctx).await),
"exit" => Some(crate::commands::exit(ctx).await),
"which" => Some(crate::commands::which(ctx).await),
"yes" => Some(crate::commands::yes(ctx).await),
"seq" => Some(crate::commands::seq(ctx).await),
"test" => Some(crate::commands::test(ctx).await),
_ => None,
}
}
}
#[derive(Debug, Clone)]
struct ShellConfig {
cmd: String,
args: Vec<String>,
}
fn find_available_shell() -> ShellConfig {
let is_windows = cfg!(windows);
if is_windows {
ShellConfig {
cmd: "cmd.exe".to_string(),
args: vec!["/c".to_string()],
}
} else {
let shells = [
("/bin/sh", "-c"),
("/usr/bin/sh", "-c"),
("/bin/bash", "-c"),
];
for (cmd, arg) in shells {
if std::path::Path::new(cmd).exists() {
return ShellConfig {
cmd: cmd.to_string(),
args: vec![arg.to_string()],
};
}
}
ShellConfig {
cmd: "/bin/sh".to_string(),
args: vec!["-c".to_string()],
}
}
}
pub trait PipelineExt {
fn pipe(self, command: impl Into<String>) -> PipelineBuilder;
}
impl PipelineExt for crate::ProcessRunner {
fn pipe(self, command: impl Into<String>) -> PipelineBuilder {
PipelineBuilder {
first: self,
additional: vec![command.into()],
}
}
}
pub struct PipelineBuilder {
first: crate::ProcessRunner,
additional: Vec<String>,
}
impl PipelineBuilder {
pub fn pipe(mut self, command: impl Into<String>) -> Self {
self.additional.push(command.into());
self
}
pub async fn run(mut self) -> Result<CommandResult> {
let first_result = self.first.run().await?;
if first_result.code != 0 {
return Ok(first_result);
}
let mut current_stdin = Some(first_result.stdout);
let mut accumulated_stderr = first_result.stderr;
let mut last_result = CommandResult {
stdout: String::new(),
stderr: String::new(),
code: 0,
};
for cmd_str in &self.additional {
let mut runner = crate::ProcessRunner::new(
cmd_str.clone(),
RunOptions {
stdin: StdinOption::Content(current_stdin.take().unwrap_or_default()),
mirror: false,
capture: true,
..Default::default()
},
);
let result = runner.run().await?;
accumulated_stderr.push_str(&result.stderr);
if result.code != 0 {
return Ok(CommandResult {
stdout: result.stdout,
stderr: accumulated_stderr,
code: result.code,
});
}
current_stdin = Some(result.stdout.clone());
last_result = result;
}
Ok(CommandResult {
stdout: last_result.stdout,
stderr: accumulated_stderr,
code: last_result.code,
})
}
}