use std::process::Stdio;
use tokio::process::Command;
use crate::error::{ScriptError, ScriptResult};
use crate::output::Output;
#[derive(Debug)]
pub struct Pipeline {
commands: Vec<Command>,
}
impl Pipeline {
pub fn new<S: AsRef<std::ffi::OsStr>>(program: S) -> Self {
let mut cmd = Command::new(program);
let _ = cmd.stdin(Stdio::piped());
Self {
commands: vec![cmd],
}
}
pub fn arg<S: AsRef<std::ffi::OsStr>>(mut self, arg: S) -> Self {
if let Some(cmd) = self.commands.last_mut() {
let _ = cmd.arg(arg);
}
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<std::ffi::OsStr>,
{
if let Some(cmd) = self.commands.last_mut() {
let _ = cmd.args(args);
}
self
}
pub fn pipe<S: AsRef<std::ffi::OsStr>>(mut self, program: S) -> Self {
if let Some(prev_cmd) = self.commands.last_mut() {
let _ = prev_cmd.stdout(Stdio::piped());
}
let mut cmd = Command::new(program);
let _ = cmd.stdin(Stdio::piped());
self.commands.push(cmd);
self
}
pub fn env<K, V>(mut self, key: K, value: V) -> Self
where
K: AsRef<std::ffi::OsStr>,
V: AsRef<std::ffi::OsStr>,
{
for cmd in &mut self.commands {
let _ = cmd.env(key.as_ref(), value.as_ref());
}
self
}
pub fn current_dir<D>(mut self, dir: D) -> Self
where
D: AsRef<std::path::Path>,
{
for cmd in &mut self.commands {
let _ = cmd.current_dir(dir.as_ref());
}
self
}
pub fn capture_output(mut self) -> Self {
if let Some(cmd) = self.commands.last_mut() {
let _ = cmd.stdout(Stdio::piped());
let _ = cmd.stderr(Stdio::piped());
}
self
}
pub async fn execute(&mut self) -> ScriptResult<Output> {
if self.commands.is_empty() {
return Err(ScriptError::PipelineEmpty);
}
let num_commands = self.commands.len();
let mut previous_output: Option<Vec<u8>> = None;
for i in 0..num_commands {
let cmd = &mut self.commands[i];
if previous_output.is_some() {
let _ = cmd.stdin(Stdio::piped());
}
if i == num_commands - 1 {
if let Some(ref prev_stdout) = previous_output {
let mut child = cmd.spawn().map_err(ScriptError::IoError)?;
if let Some(ref mut stdin) = child.stdin {
use tokio::io::AsyncWriteExt;
let _unused = stdin.write_all(prev_stdout).await;
drop(_unused);
}
let out = child
.wait_with_output()
.await
.map_err(ScriptError::IoError)?;
return Ok(Output::new(out.stdout, out.stderr, out.status));
} else {
let output = cmd.output().await.map_err(ScriptError::IoError)?;
return Ok(Output::new(output.stdout, output.stderr, output.status));
}
} else {
let output = cmd.output().await.map_err(ScriptError::IoError)?;
previous_output = Some(output.stdout);
}
}
Err(ScriptError::PipelineError(
"Pipeline execution failed".into(),
))
}
}
impl Default for Pipeline {
fn default() -> Self {
Self::new("")
}
}