#[derive(Debug, Copy, Clone)]
pub enum ReadSource {
Stdout,
Stderr,
}
pub struct Duplex(Simplex, Output);
impl super::Spawner for std::process::Command {
type Output = Duplex;
fn spawn_owned(&mut self) -> std::io::Result<Self::Output> {
let mut process = self
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let stdin = process.stdin.take().unwrap();
let stdout = process.stdout.take().unwrap();
let stderr = process.stderr.take().unwrap();
Ok(Duplex(
Simplex(Some(ProcessImpl { process, stdin })),
Output {
read_source: ReadSource::Stdout,
stdout,
stderr,
},
))
}
}
impl super::Process for Duplex {}
impl Duplex {
#[must_use]
pub fn id(&self) -> u32 {
self.0.id()
}
pub fn read_from(&mut self, read_source: ReadSource) -> &mut Self {
self.1.read_from(read_source);
self
}
pub fn wait(
self,
) -> Result<
(
std::process::ExitStatus,
std::process::ChildStdout,
std::process::ChildStderr,
),
std::io::Error,
> {
let (mut child, _, stdout, stderr) = self.eject();
child.wait().map(|status| (status, stdout, stderr))
}
pub fn pipes(
&mut self,
) -> (
&mut std::process::ChildStdin,
&mut std::process::ChildStdout,
&mut std::process::ChildStderr,
) {
(self.0.stdin(), &mut self.1.stdout, &mut self.1.stderr)
}
#[must_use]
pub fn decompose(self) -> (Simplex, Output) {
(self.0, self.1)
}
#[must_use]
pub fn eject(
self,
) -> (
std::process::Child,
std::process::ChildStdin,
std::process::ChildStdout,
std::process::ChildStderr,
) {
let (process, stdin) = self.0.eject();
let (stdout, stderr) = self.1.eject();
(process, stdin, stdout, stderr)
}
}
impl std::io::Write for Duplex {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
impl std::io::Read for Duplex {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.1.read(buf)
}
}
#[allow(clippy::module_name_repetitions)]
pub struct Simplex(Option<ProcessImpl>);
impl super::Process for Simplex {}
impl Simplex {
#[must_use]
pub fn id(&self) -> u32 {
self.0
.as_ref()
.unwrap_or_else(|| unreachable!())
.process
.id()
}
fn stdin(&mut self) -> &mut std::process::ChildStdin {
&mut self.0.as_mut().unwrap().stdin
}
pub fn wait(self) -> Result<std::process::ExitStatus, std::io::Error> {
let (mut child, _) = self.eject();
child.wait()
}
#[must_use]
pub fn eject(mut self) -> (std::process::Child, std::process::ChildStdin) {
let process = self.0.take().unwrap_or_else(|| unreachable!());
(process.process, process.stdin)
}
}
impl std::ops::Drop for Simplex {
fn drop(&mut self) {
if let Some(process) = self.0.take() {
drop(process.shutdown());
}
}
}
impl std::io::Write for Simplex {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.as_mut().unwrap().stdin.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.0.as_mut().unwrap().stdin.flush()
}
}
pub struct Output {
read_source: ReadSource,
stdout: std::process::ChildStdout,
stderr: std::process::ChildStderr,
}
impl Output {
pub fn read_from(&mut self, read_source: ReadSource) -> &mut Self {
self.read_source = read_source;
self
}
pub fn pipes(
&mut self,
) -> (
&mut std::process::ChildStdout,
&mut std::process::ChildStderr,
) {
(&mut self.stdout, &mut self.stderr)
}
#[must_use]
pub fn eject(self) -> (std::process::ChildStdout, std::process::ChildStderr) {
(self.stdout, self.stderr)
}
}
impl std::io::Read for Output {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.read_source {
ReadSource::Stdout => self.stdout.read(buf),
ReadSource::Stderr => self.stderr.read(buf),
}
}
}
struct ProcessImpl {
process: std::process::Child,
stdin: std::process::ChildStdin,
}
impl ProcessImpl {
#[allow(clippy::cast_possible_wrap)]
#[cfg(unix)]
pub fn pid(&self) -> nix::unistd::Pid {
nix::unistd::Pid::from_raw(self.process.id() as nix::libc::pid_t)
}
#[cfg(not(unix))]
fn shutdown(mut self) -> std::io::Result<std::process::ExitStatus> {
self.process.kill();
self.process.wait()
}
#[cfg(unix)]
fn shutdown(mut self) -> Result<std::process::ExitStatus, super::UnixIoError> {
use std::io::Write;
let pid = self.pid();
self.stdin.flush()?;
std::mem::drop(self.stdin);
if let Ok(status) = self.process.try_wait() {
if status.is_none() {
use nix::sys::{signal, wait};
use wait::WaitStatus::Exited;
let no_hang = Some(wait::WaitPidFlag::WNOHANG);
signal::kill(pid, signal::SIGINT)?;
std::thread::sleep(std::time::Duration::from_secs(2));
if let Ok(Exited(_, _)) = wait::waitpid(pid, no_hang) {
} else {
signal::kill(pid, signal::SIGTERM)?;
std::thread::sleep(std::time::Duration::from_secs(2));
if let Ok(Exited(_, _)) = wait::waitpid(pid, no_hang) {
} else {
self.process.kill()?;
}
}
}
}
self.process.wait().map_err(super::UnixIoError::from)
}
}
#[cfg(all(test, unix))]
mod test {
use crate::Spawner;
#[test]
fn read() {
use std::io::BufRead;
let child = std::process::Command::new("sh")
.arg("-c")
.arg("echo hello")
.spawn_owned()
.unwrap();
let mut output = String::new();
let mut reader = std::io::BufReader::new(child);
assert!(reader.read_line(&mut output).is_ok());
assert_eq!("hello\n", output);
}
#[test]
fn write() {
use std::io::{BufRead, Write};
let mut child = std::process::Command::new("cat").spawn_owned().unwrap();
assert!(child.write_all(b"hello\n").is_ok());
let mut output = String::new();
let mut reader = std::io::BufReader::new(child);
assert!(reader.read_line(&mut output).is_ok());
assert_eq!("hello\n", output);
}
#[test]
fn decompose() {
use std::io::{BufRead, Write};
let child = std::process::Command::new("cat").spawn_owned().unwrap();
let (mut child, output) = child.decompose();
assert!(child.write_all(b"hello\n").is_ok());
let mut buffer = String::new();
let mut reader = std::io::BufReader::new(output);
assert!(reader.read_line(&mut buffer).is_ok());
assert_eq!("hello\n", buffer);
}
#[test]
fn drop() {
let child = std::process::Command::new("ls").spawn_owned().unwrap();
let mut simplex = child.0;
assert!(simplex.0.take().unwrap().shutdown().is_err());
}
}