mod drop_guard;
pub(crate) mod output_collection;
mod replay;
mod spawn;
mod termination;
mod wait;
use crate::output_stream::OutputStream;
use crate::panic_on_drop::PanicOnDrop;
use std::borrow::Cow;
use std::io;
use std::mem::ManuallyDrop;
use std::process::ExitStatus;
use std::time::Duration;
use tokio::process::Child;
use tokio::process::ChildStdin;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WaitForCompletionOrTerminateOptions {
pub wait_timeout: Duration,
pub interrupt_timeout: Duration,
pub terminate_timeout: Duration,
}
#[derive(Debug)]
pub enum Stdin {
Open(ChildStdin),
Closed,
}
impl Stdin {
#[must_use]
pub fn is_open(&self) -> bool {
matches!(self, Stdin::Open(_))
}
pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
match self {
Stdin::Open(stdin) => Some(stdin),
Stdin::Closed => None,
}
}
pub fn close(&mut self) {
*self = Stdin::Closed;
}
}
#[derive(Debug)]
pub enum RunningState {
Running,
Terminated(ExitStatus),
Uncertain(io::Error),
}
impl RunningState {
#[must_use]
pub fn is_definitely_running(&self) -> bool {
matches!(self, RunningState::Running)
}
}
#[derive(Debug)]
pub(super) enum DropMode {
Armed { panic: PanicOnDrop },
Disarmed,
}
#[derive(Debug)]
pub struct ProcessHandle<Stdout, Stderr = Stdout>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub(crate) name: Cow<'static, str>,
child: Child,
std_in: Stdin,
std_out_stream: Stdout,
std_err_stream: Stderr,
pub(super) drop_mode: DropMode,
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub fn stdin(&mut self) -> &mut Stdin {
&mut self.std_in
}
pub fn stdout(&self) -> &Stdout {
&self.std_out_stream
}
pub fn stderr(&self) -> &Stderr {
&self.std_err_stream
}
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub fn id(&self) -> Option<u32> {
self.child.id()
}
pub(super) fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
self.child.try_wait()
}
pub fn is_running(&mut self) -> RunningState {
match self.try_reap_exit_status() {
Ok(None) => RunningState::Running,
Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
Err(err) => RunningState::Uncertain(err),
}
}
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub fn into_inner(mut self) -> (Child, Stdin, Stdout, Stderr) {
self.must_not_be_terminated();
let mut this = ManuallyDrop::new(self);
unsafe {
let child = std::ptr::read(&raw const this.child);
let stdin = std::ptr::read(&raw const this.std_in);
let stdout = std::ptr::read(&raw const this.std_out_stream);
let stderr = std::ptr::read(&raw const this.std_err_stream);
std::ptr::drop_in_place(&raw mut this.name);
std::ptr::drop_in_place(&raw mut this.drop_mode);
(child, stdin, stdout, stderr)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
AutoName, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process, RunningState,
test_support::long_running_command,
};
use assertr::prelude::*;
use std::process::Stdio;
use std::time::Duration;
mod stdin {
use super::*;
#[tokio::test]
async fn open_stdin_reports_open_until_closed() {
let mut child = long_running_command(Duration::from_secs(5))
.stdin(Stdio::piped())
.spawn()
.unwrap();
let child_stdin = child.stdin.take().unwrap();
let mut stdin = Stdin::Open(child_stdin);
assert_that!(stdin.is_open()).is_true();
assert_that!(stdin.as_mut().is_some()).is_true();
stdin.close();
assert_that!(stdin.is_open()).is_false();
assert_that!(stdin.as_mut()).is_none();
child.kill().await.unwrap();
}
#[test]
fn closed_stdin_reports_closed() {
let mut stdin = Stdin::Closed;
assert_that!(stdin.is_open()).is_false();
assert_that!(stdin.as_mut()).is_none();
stdin.close();
assert_that!(stdin.is_open()).is_false();
assert_that!(stdin.as_mut()).is_none();
}
}
mod is_running {
use super::*;
#[tokio::test]
async fn does_not_disarm_drop_guards_when_process_has_exited() {
let mut process = Process::new(long_running_command(Duration::from_millis(50)))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let _state = process.is_running();
assert_that!(process.is_drop_armed()).is_true();
process.must_not_be_terminated();
}
#[tokio::test]
async fn reports_running_before_wait_and_terminated_after_wait() {
let mut process = Process::new(long_running_command(Duration::from_secs(1)))
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
match process.is_running() {
RunningState::Running => {}
RunningState::Terminated(exit_status) => {
assert_that!(exit_status).fail("process should still be running");
}
RunningState::Uncertain(_) => {
assert_that!(&process).fail("process state should not be uncertain");
}
}
process
.wait_for_completion(Duration::from_secs(2))
.await
.unwrap();
match process.is_running() {
RunningState::Running => {
assert_that!(process).fail("process should not be running anymore");
}
RunningState::Terminated(exit_status) => {
assert_that!(exit_status.code()).is_some().is_equal_to(0);
assert_that!(exit_status.success()).is_true();
}
RunningState::Uncertain(_) => {
assert_that!(process).fail("process state should not be uncertain");
}
}
}
}
#[cfg(test)]
mod into_inner {
use super::*;
use crate::LineParsingOptions;
use crate::test_support::line_collection_options;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn returns_stdin_with_pipe_still_open() {
let cmd = tokio::process::Command::new("cat");
let process = Process::new(cmd)
.name("cat")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let (mut child, mut stdin, stdout, _stderr) = process.into_inner();
assert_that!(child.stdin.is_none()).is_true();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_that!(child.try_wait().unwrap().is_none()).is_true();
let collector = stdout
.collect_lines_into_vec(LineParsingOptions::default(), line_collection_options());
let Some(stdin_handle) = stdin.as_mut() else {
assert_that!(stdin.is_open()).fail("stdin should be returned open");
return;
};
stdin_handle
.write_all(b"stdin stayed open\n")
.await
.unwrap();
stdin_handle.flush().await.unwrap();
stdin.close();
let status = tokio::time::timeout(Duration::from_secs(2), child.wait())
.await
.unwrap()
.unwrap();
assert_that!(status.success()).is_true();
let collected = collector.wait().await.unwrap();
assert_that!(collected.lines().len()).is_equal_to(1);
assert_that!(collected[0].as_str()).is_equal_to("stdin stayed open");
}
#[tokio::test]
async fn defuses_panic_guard() {
let process = Process::new(long_running_command(Duration::from_secs(5)))
.name("long-running")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
child.kill().await.unwrap();
let _status = child.wait().await.unwrap();
}
#[tokio::test]
async fn supports_handles_built_with_owned_name() {
let process = Process::new(long_running_command(Duration::from_secs(5)))
.name(format!("sleeper-{}", 7))
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let (mut child, _stdin, _stdout, _stderr) = process.into_inner();
child.kill().await.unwrap();
let _status = child.wait().await.unwrap();
}
}
}