#![allow(missing_docs)]
mod common;
use assertr::prelude::*;
use common::*;
use std::time::Duration;
use unwrap_infallible::UnwrapInfallible;
#[tokio::test(flavor = "multi_thread")]
async fn kill_future_can_be_spawned_on_tokio_task() {
let mut process = spawn_long_running_process();
let result = tokio::spawn(async move { process.kill().await })
.await
.unwrap();
assert_that!(result).is_ok();
}
#[tokio::test]
async fn kill_disarms_cleanup_and_panic_guards() {
use tokio_process_tools::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, Process};
let mut process = Process::new(long_running_command(Duration::from_secs(5)))
.name("long-running")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.lossy_without_backpressure()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
assert_that!(process.stdin().is_open()).is_true();
assert_that!(process.is_drop_armed()).is_true();
process.kill().await.unwrap();
assert_that!(process.stdin().is_open()).is_false();
assert_that!(process.is_drop_disarmed()).is_true();
drop(process);
}
#[tokio::test(flavor = "multi_thread")]
async fn kill_propagates_to_grandchildren() {
use std::sync::Mutex;
use std::time::Instant;
use tokio::process::Command;
use tokio::sync::oneshot;
use tokio_process_tools::{
Consumable, DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, LineParsingOptions, Next,
NumBytesExt, ParseLines, Process,
};
#[cfg(windows)]
let cmd = {
let mut cmd = Command::new("powershell.exe");
cmd.arg("-NoProfile").arg("-Command").arg(
"$p = Start-Process -FilePath 'timeout.exe' -ArgumentList '/T','30','/NOBREAK' \
-PassThru -WindowStyle Hidden; \
Write-Host $p.Id; \
Start-Sleep -Seconds 30",
);
cmd
};
#[cfg(unix)]
let cmd = {
let mut cmd = Command::new("sh");
cmd.arg("-c").arg("sleep 30 & echo $!; wait");
cmd
};
let mut process = Process::new(cmd)
.name("group-kill-test")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.reliable_with_backpressure()
.replay_last_bytes(64.bytes())
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let (tx, rx) = oneshot::channel::<u32>();
let tx = Mutex::new(Some(tx));
let _consumer = process
.stdout()
.consume(ParseLines::inspect(
LineParsingOptions::default(),
move |line| {
if let Ok(pid) = line.trim().parse::<u32>()
&& let Some(tx) = tx.lock().unwrap().take()
{
let _ = tx.send(pid);
}
Next::Continue
},
))
.unwrap_infallible();
let grandchild_pid = tokio::time::timeout(Duration::from_secs(10), rx)
.await
.expect("grandchild should print its PID")
.expect("oneshot should resolve once the line is observed");
process.kill().await.unwrap();
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if grandchild_is_gone(grandchild_pid) {
return;
}
assert_that!(Instant::now() <= deadline)
.with_detail_message(format!(
"grandchild PID {grandchild_pid} still alive 10 s after kill"
))
.is_true();
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[cfg(windows)]
fn grandchild_is_gone(pid: u32) -> bool {
use windows_sys::Win32::Foundation::{CloseHandle, WAIT_OBJECT_0};
use windows_sys::Win32::System::Threading::{
OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, PROCESS_SYNCHRONIZE, WaitForSingleObject,
};
let handle = unsafe {
OpenProcess(
PROCESS_SYNCHRONIZE | PROCESS_QUERY_LIMITED_INFORMATION,
0,
pid,
)
};
if handle.is_null() {
return true;
}
let wait = unsafe { WaitForSingleObject(handle, 0) };
unsafe {
CloseHandle(handle);
}
wait == WAIT_OBJECT_0
}
#[cfg(unix)]
fn grandchild_is_gone(pid: u32) -> bool {
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid.cast_signed()), None).is_err()
}