#![allow(clippy::needless_doctest_main)]
#[derive(Debug, Copy, Clone)]
pub enum ReadSource {
Stdout,
Stderr,
Both,
}
pub struct Duplex(Simplex, Output);
impl super::Spawner for tokio::process::Command {
type Output = Duplex;
fn spawn_owned(&mut self) -> std::io::Result<Self::Output> {
let mut process = self
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let stdin = process.stdin.take().unwrap();
let stdout = process.stdout.take().unwrap();
let stderr = process.stderr.take().unwrap();
Ok(Duplex(
Simplex(Some(ProcessImpl { process, stdin })),
Output {
read_source: ReadSource::Both,
stdout,
stderr,
},
))
}
}
impl super::Process for Duplex {}
impl Duplex {
#[must_use]
pub fn id(&self) -> Option<u32> {
self.0.id()
}
pub fn read_from(&mut self, read_source: ReadSource) -> &mut Self {
self.1.read_from(read_source);
self
}
pub async fn wait(
self,
) -> Result<
(
std::process::ExitStatus,
tokio::process::ChildStdout,
tokio::process::ChildStderr,
),
std::io::Error,
> {
let (mut child, _, stdout, stderr) = self.eject();
child.wait().await.map(|status| (status, stdout, stderr))
}
pub fn pipes(
&mut self,
) -> (
&mut tokio::process::ChildStdin,
&mut tokio::process::ChildStdout,
&mut tokio::process::ChildStderr,
) {
(self.0.stdin(), &mut self.1.stdout, &mut self.1.stderr)
}
#[must_use]
pub fn decompose(self) -> (Simplex, Output) {
(self.0, self.1)
}
#[must_use]
pub fn eject(
self,
) -> (
tokio::process::Child,
tokio::process::ChildStdin,
tokio::process::ChildStdout,
tokio::process::ChildStderr,
) {
let (process, stdin) = self.0.eject();
let (stdout, stderr) = self.1.eject();
(process, stdin, stdout, stderr)
}
pub async fn shutdown(self) -> std::io::Result<std::process::ExitStatus> {
self.0.shutdown().await
}
}
impl tokio::io::AsyncWrite for Duplex {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
std::pin::Pin::new(&mut self.0).poll_write(ctx, buf)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.0).poll_flush(ctx)
}
fn poll_shutdown(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.0).poll_shutdown(ctx)
}
}
impl tokio::io::AsyncRead for Duplex {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.1).poll_read(ctx, buf)
}
}
#[allow(clippy::module_name_repetitions)]
pub struct Simplex(Option<ProcessImpl>);
impl super::Process for Simplex {}
impl Simplex {
#[must_use]
pub fn id(&self) -> Option<u32> {
self.0
.as_ref()
.unwrap_or_else(|| unreachable!())
.process
.id()
}
fn stdin(&mut self) -> &mut tokio::process::ChildStdin {
&mut self.0.as_mut().unwrap().stdin
}
pub async fn wait(self) -> Result<std::process::ExitStatus, std::io::Error> {
let (mut child, _) = self.eject();
child.wait().await
}
#[must_use]
pub fn eject(mut self) -> (tokio::process::Child, tokio::process::ChildStdin) {
let process = self.0.take().unwrap_or_else(|| unreachable!());
(process.process, process.stdin)
}
pub async fn shutdown(mut self) -> std::io::Result<std::process::ExitStatus> {
match self
.0
.take()
.unwrap_or_else(|| unreachable!())
.shutdown()
.await
{
Ok(status) => Ok(status),
Err(super::UnixIoError::Io(err)) => Err(err),
Err(super::UnixIoError::Unix(err)) => {
Err(std::io::Error::from_raw_os_error(err as i32))
}
}
}
}
impl std::ops::Drop for Simplex {
fn drop(&mut self) {
if self.0.is_some() {
tokio::spawn(self.0.take().unwrap().shutdown());
}
}
}
impl tokio::io::AsyncWrite for Simplex {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
std::pin::Pin::new(&mut self.0.as_mut().unwrap().stdin).poll_write(ctx, buf)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.0.as_mut().unwrap().stdin).poll_flush(ctx)
}
fn poll_shutdown(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.0.as_mut().unwrap().stdin).poll_shutdown(ctx)
}
}
pub struct Output {
read_source: ReadSource,
stdout: tokio::process::ChildStdout,
stderr: tokio::process::ChildStderr,
}
impl Output {
pub fn read_from(&mut self, read_source: ReadSource) -> &mut Self {
self.read_source = read_source;
self
}
pub fn pipes(
&mut self,
) -> (
&mut tokio::process::ChildStdout,
&mut tokio::process::ChildStderr,
) {
(&mut self.stdout, &mut self.stderr)
}
#[must_use]
pub fn eject(self) -> (tokio::process::ChildStdout, tokio::process::ChildStderr) {
(self.stdout, self.stderr)
}
}
impl tokio::io::AsyncRead for Output {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.read_source {
ReadSource::Stdout => std::pin::Pin::new(&mut self.stdout).poll_read(cx, buf),
ReadSource::Stderr => std::pin::Pin::new(&mut self.stderr).poll_read(cx, buf),
ReadSource::Both => {
let stderr = std::pin::Pin::new(&mut self.stderr).poll_read(cx, buf);
if stderr.is_ready() {
stderr
} else {
std::pin::Pin::new(&mut self.stdout).poll_read(cx, buf)
}
}
}
}
}
struct ProcessImpl {
process: tokio::process::Child,
stdin: tokio::process::ChildStdin,
}
impl ProcessImpl {
#[allow(clippy::cast_possible_wrap)]
#[cfg(unix)]
pub fn pid(&self) -> Option<nix::unistd::Pid> {
self.process
.id()
.map(|pid| nix::unistd::Pid::from_raw(pid as nix::libc::pid_t))
}
#[cfg(not(unix))]
async fn shutdown(mut self) -> std::io::Result<std::process::ExitStatus> {
self.process.kill();
self.process.await
}
#[cfg(unix)]
async fn shutdown(mut self) -> Result<std::process::ExitStatus, super::UnixIoError> {
let pid = match self.process.try_wait() {
Ok(None) => self.pid().unwrap(),
Ok(Some(status)) => return Ok(status),
Err(err) => return Err(super::UnixIoError::from(err)),
};
let mut process = self.process;
let mut process = std::pin::Pin::new(&mut process);
{
use nix::sys::signal;
use std::time::Duration;
use tokio::time::timeout;
if timeout(Duration::from_secs(2), process.wait())
.await
.is_err()
{
signal::kill(pid, signal::SIGINT)?;
}
if timeout(Duration::from_secs(2), process.wait())
.await
.is_err()
{
signal::kill(pid, signal::SIGTERM)?;
}
if timeout(Duration::from_secs(2), process.wait())
.await
.is_err()
{
process.kill().await?;
}
}
process.wait().await.map_err(super::UnixIoError::from)
}
}
#[cfg(all(test, unix))]
mod test {
use crate::Spawner;
#[tokio::test]
async fn read() {
use tokio::io::AsyncReadExt;
let mut child = tokio::process::Command::new("sh")
.arg("-c")
.arg("echo hello")
.spawn_owned()
.unwrap();
let mut output = String::new();
assert!(child.read_to_string(&mut output).await.is_ok());
assert_eq!("hello\n", output);
}
#[tokio::test]
async fn write() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut child = tokio::process::Command::new("cat").spawn_owned().unwrap();
assert!(child.write_all(b"hello\n").await.is_ok());
let mut buffer = [0_u8; 10];
let bytes = child.read(&mut buffer).await.unwrap();
assert_eq!("hello\n", std::str::from_utf8(&buffer[..bytes]).unwrap());
}
#[tokio::test]
async fn test_drop_does_not_panic() {
let child = tokio::process::Command::new("ls").spawn_owned().unwrap();
let mut child = child.0;
assert!(child.0.take().unwrap().shutdown().await.is_ok());
}
}