use std::ffi::OsString;
use std::fmt;
use std::fs::File;
use std::io::{self, Read, Write};
use std::ops::BitOr;
use std::path::Path;
use std::sync::Arc;
#[cfg(unix)]
mod os {
#[derive(Clone, Default)]
pub struct PipelineOsOptions {
pub setpgid: bool,
}
}
#[cfg(windows)]
mod os {
#[derive(Clone, Default)]
pub struct PipelineOsOptions;
}
use crate::communicate::Communicator;
use crate::exec::Redirection;
use crate::process::ExitStatus;
use crate::process::Process;
use crate::exec::{
Capture, Exec, FromSink, FromSource, InputData, InputRedirection, ReadAdapter, ReadErrAdapter,
WriteAdapter,
};
use crate::job::Job;
#[must_use]
pub struct Pipeline {
execs: Vec<Exec>,
stdin_redirect: Arc<Redirection>,
stdout: Arc<Redirection>,
stderr: Arc<Redirection>,
stdin_data: Option<InputData>,
check_success: bool,
detached: bool,
cwd: Option<OsString>,
#[allow(dead_code)]
os_options: os::PipelineOsOptions,
}
impl Default for Pipeline {
fn default() -> Pipeline {
Pipeline::new()
}
}
impl Pipeline {
pub fn new() -> Pipeline {
Pipeline {
execs: vec![],
stdin_redirect: Arc::new(Redirection::None),
stdout: Arc::new(Redirection::None),
stderr: Arc::new(Redirection::None),
stdin_data: None,
check_success: false,
detached: false,
cwd: None,
os_options: Default::default(),
}
}
pub fn pipe(mut self, cmd: Exec) -> Pipeline {
self.execs.push(cmd);
self
}
pub fn stdin<T>(mut self, stdin: T) -> Pipeline
where
InputRedirection: FromSource<T>,
{
match InputRedirection::from_source(stdin) {
InputRedirection::Redirection(r) => {
self.stdin_redirect = Arc::new(r);
self.stdin_data = None;
}
InputRedirection::Data(data) => {
self.stdin_redirect = Arc::new(Redirection::Pipe);
self.stdin_data = Some(data);
}
};
self
}
pub fn stdout<T>(mut self, stdout: T) -> Pipeline
where
Redirection: FromSink<T>,
{
self.stdout = Arc::new(Redirection::from_sink(stdout));
self
}
pub fn stderr_all<T>(mut self, stderr: T) -> Pipeline
where
Redirection: FromSink<T>,
{
self.stderr = Arc::new(Redirection::from_sink(stderr));
self
}
pub fn checked(mut self) -> Pipeline {
self.check_success = true;
self
}
pub fn cwd(mut self, dir: impl AsRef<Path>) -> Pipeline {
self.cwd = Some(dir.as_ref().as_os_str().to_owned());
self
}
pub fn detached(mut self) -> Pipeline {
self.detached = true;
self
}
#[cfg(unix)]
pub(crate) fn set_setpgid(&mut self, value: bool) {
self.os_options.setpgid = value;
}
fn check_no_stdin_data(&self, meth: &str) {
if self.stdin_data.is_some() {
panic!("{} called with input data specified", meth);
}
}
fn setup_stderr(&mut self) -> io::Result<Option<File>> {
let stderr_arc = std::mem::replace(&mut self.stderr, Arc::new(Redirection::None));
if matches!(*stderr_arc, Redirection::None) {
return Ok(None);
}
let (shared, stderr_read) = if matches!(*stderr_arc, Redirection::Pipe) {
let (stderr_read, stderr_write) = crate::spawn::make_pipe()?;
(Arc::new(Redirection::File(stderr_write)), Some(stderr_read))
} else {
(stderr_arc, None)
};
for exec in &mut self.execs {
exec.stderr_redirect = Arc::clone(&shared);
}
Ok(stderr_read)
}
pub fn start(mut self) -> io::Result<Job> {
if self.execs.is_empty() {
return Ok(Job {
stdin: None,
stdout: None,
stderr: None,
stdin_data: InputData::default(),
check_success: self.check_success,
processes: vec![],
});
}
if self.execs.first().unwrap().stdin_is_set() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"stdin of the first command is already redirected; \
use Pipeline::stdin() to redirect pipeline input",
));
}
if self.execs.last().unwrap().stdout_is_set() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"stdout of the last command is already redirected; \
use Pipeline::stdout() to redirect pipeline output",
));
}
#[cfg(unix)]
if self.execs.iter().any(|e| e.setpgid_is_set()) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"setpgid on individual commands in a pipeline is not \
supported; use Pipeline::setpgid() to put the pipeline \
in a process group",
));
}
let stderr = self.setup_stderr()?;
if let Some(dir) = &self.cwd {
self.execs = self.execs.into_iter().map(|cmd| cmd.cwd(dir)).collect();
}
if self.detached {
self.execs = self.execs.into_iter().map(|cmd| cmd.detached()).collect();
}
self.execs.first_mut().unwrap().stdin_redirect = self.stdin_redirect;
self.execs.last_mut().unwrap().stdout_redirect = self.stdout;
let cnt = self.execs.len();
let mut processes = Vec::<Process>::new();
let mut pipeline_stdin = None;
let mut pipeline_stdout = None;
let mut prev_stdout: Option<File> = None;
#[cfg(unix)]
let mut first_pid: u32 = 0;
for (idx, mut exec) in self.execs.into_iter().enumerate() {
if let Some(prev_out) = prev_stdout.take() {
exec = exec.stdin(prev_out);
}
if idx != cnt - 1 {
exec = exec.stdout(Redirection::Pipe);
}
#[cfg(unix)]
if self.os_options.setpgid {
if idx == 0 {
exec.set_pgid_value(0);
} else {
exec.set_pgid_value(first_pid);
}
}
let result = exec.spawn()?;
if idx == 0 {
pipeline_stdin = result.stdin;
#[cfg(unix)]
if self.os_options.setpgid {
first_pid = result.process.pid();
}
}
if idx == cnt - 1 {
pipeline_stdout = result.stdout;
} else {
prev_stdout = result.stdout;
}
processes.push(result.process);
}
Ok(Job {
stdin: pipeline_stdin,
stdout: pipeline_stdout,
stderr,
stdin_data: self.stdin_data.unwrap_or_default(),
check_success: self.check_success,
processes,
})
}
pub fn join(self) -> io::Result<ExitStatus> {
self.start()?.join()
}
pub fn stream_stdout(self) -> io::Result<impl Read> {
self.check_no_stdin_data("stream_stdout");
let handle = self.stdout(Redirection::Pipe).start()?;
Ok(ReadAdapter(handle))
}
pub fn stream_stderr_all(self) -> io::Result<impl Read> {
self.check_no_stdin_data("stream_stderr_all");
let handle = self.stderr_all(Redirection::Pipe).start()?;
Ok(ReadErrAdapter(handle))
}
pub fn stream_stdin(self) -> io::Result<impl Write> {
self.check_no_stdin_data("stream_stdin");
let handle = self.stdin(Redirection::Pipe).start()?;
Ok(WriteAdapter(handle))
}
pub fn communicate(mut self) -> io::Result<Communicator> {
self = self.detached();
if matches!(*self.stdout, Redirection::None) {
self = self.stdout(Redirection::Pipe);
}
if matches!(*self.stderr, Redirection::None) {
self = self.stderr_all(Redirection::Pipe);
}
self.start()?.communicate()
}
pub fn capture(mut self) -> io::Result<Capture> {
if matches!(*self.stdout, Redirection::None) {
self = self.stdout(Redirection::Pipe);
}
if matches!(*self.stderr, Redirection::None) {
self = self.stderr_all(Redirection::Pipe);
}
self.start()?.capture()
}
}
impl BitOr<Exec> for Pipeline {
type Output = Pipeline;
fn bitor(self, rhs: Exec) -> Pipeline {
self.pipe(rhs)
}
}
impl BitOr for Pipeline {
type Output = Pipeline;
fn bitor(mut self, rhs: Pipeline) -> Pipeline {
for exec in rhs.execs {
self = self.pipe(exec);
}
self
}
}
impl FromIterator<Exec> for Pipeline {
fn from_iter<I: IntoIterator<Item = Exec>>(iter: I) -> Self {
Pipeline {
execs: iter.into_iter().collect(),
stdin_redirect: Arc::new(Redirection::None),
stdout: Arc::new(Redirection::None),
stderr: Arc::new(Redirection::None),
stdin_data: None,
check_success: false,
detached: false,
cwd: None,
os_options: Default::default(),
}
}
}
impl fmt::Debug for Pipeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut args = vec![];
for cmd in &self.execs {
args.push(cmd.to_cmdline_lossy());
}
write!(f, "Pipeline {{ {} }}", args.join(" | "))
}
}