use std::time::{Duration, Instant};
use processkit::{Command, Mechanism, ProcessGroup, Signal};
use crate::common::*;
#[cfg(unix)]
#[tokio::test]
#[ignore = "spawns a real subprocess and signals it"]
async fn unix_signal_reaches_the_tree() {
use tokio_stream::StreamExt;
let group = ProcessGroup::new().expect("create group");
let cmd = Command::new("sh").args([
"-c",
"trap 'echo got-hup' HUP; echo ready; while :; do sleep 0.1; done",
]);
let mut process = group.start(&cmd).await.expect("start trap child");
let mut lines = process.stdout_lines();
let ready = tokio::time::timeout(Duration::from_secs(10), lines.next())
.await
.expect("readiness line in time")
.expect("readiness line");
assert!(ready.contains("ready"), "line: {ready:?}");
group.signal(Signal::Hup).expect("broadcast SIGHUP");
let got = tokio::time::timeout(Duration::from_secs(10), lines.next())
.await
.expect("trap line in time")
.expect("trap line");
assert!(got.contains("got-hup"), "line: {got:?}");
}
#[cfg(unix)]
#[tokio::test]
#[ignore = "spawns a real subprocess and freezes it"]
async fn unix_suspend_freezes_progress() {
use tokio_stream::StreamExt;
let group = ProcessGroup::new().expect("create group");
let cmd = Command::new("sh").args([
"-c",
"i=0; while :; do i=$((i+1)); echo $i; sleep 0.05; done",
]);
let mut process = group.start(&cmd).await.expect("start ticker");
let mut lines = process.stdout_lines();
tokio::time::timeout(Duration::from_secs(10), lines.next())
.await
.expect("first tick in time")
.expect("first tick");
group.suspend().expect("suspend");
tokio::time::sleep(Duration::from_millis(200)).await;
while let Ok(Some(_)) = tokio::time::timeout(Duration::from_millis(100), lines.next()).await {}
let stalled = tokio::time::timeout(Duration::from_millis(400), lines.next()).await;
assert!(stalled.is_err(), "frozen tree kept producing output");
group.resume().expect("resume");
let resumed = tokio::time::timeout(Duration::from_secs(10), lines.next()).await;
assert!(
resumed.is_ok_and(|line| line.is_some()),
"tree did not resume ticking"
);
}
#[cfg(unix)]
#[test]
#[ignore = "creates an OS job/cgroup"]
fn signal_on_empty_group_is_ok() {
let group = ProcessGroup::new().expect("create group");
group.signal(Signal::Term).expect("signal on empty group");
group.suspend().expect("suspend on empty group");
group.resume().expect("resume on empty group");
}
#[cfg(windows)]
#[test]
#[ignore = "creates an OS job"]
fn windows_signal_non_kill_is_unsupported() {
let group = ProcessGroup::new().expect("create group");
for sig in [Signal::Term, Signal::Hup, Signal::Other(9)] {
let err = group
.signal(sig)
.expect_err("a non-Kill signal must be rejected on Windows");
assert!(
matches!(err, processkit::Error::Unsupported { .. }),
"expected Error::Unsupported for {sig:?}, got {err:?}"
);
}
}
#[cfg(windows)]
#[tokio::test]
#[ignore = "spawns a real subprocess and kills it via Signal::Kill"]
async fn windows_signal_kill_kills_tree() {
let group = ProcessGroup::new().expect("create group");
let process = group.start(&sleeper()).await.expect("start sleeper");
assert!(process.pid().is_some());
group
.signal(Signal::Kill)
.expect("Signal::Kill maps to job terminate");
let start = Instant::now();
let _ = tokio::time::timeout(Duration::from_secs(10), process.wait())
.await
.expect("killed tree should be reaped promptly")
.expect("wait");
assert!(
start.elapsed() < Duration::from_secs(5),
"Signal::Kill was not prompt (took {:?})",
start.elapsed()
);
}
#[cfg(windows)]
#[tokio::test]
#[ignore = "spawns a real subprocess and suspends/resumes its threads"]
async fn windows_suspend_resume_stalls_output() {
use tokio_stream::StreamExt;
let group = ProcessGroup::new().expect("create group");
let cmd = Command::new("ping").args(["-n", "30", "127.0.0.1"]);
let mut process = group.start(&cmd).await.expect("start ping");
let mut lines = process.stdout_lines();
tokio::time::timeout(Duration::from_secs(10), lines.next())
.await
.expect("first ping line in time")
.expect("first ping line");
group.suspend().expect("suspend");
tokio::time::sleep(Duration::from_millis(200)).await;
while let Ok(Some(_)) = tokio::time::timeout(Duration::from_millis(100), lines.next()).await {}
let stalled = tokio::time::timeout(Duration::from_secs(2), lines.next()).await;
assert!(stalled.is_err(), "suspended tree kept producing output");
group.resume().expect("resume");
let resumed = tokio::time::timeout(Duration::from_secs(10), lines.next()).await;
assert!(
resumed.is_ok_and(|line| line.is_some()),
"tree did not resume output"
);
}
#[tokio::test]
#[ignore = "spawns a real subprocess outside the group and adopts it"]
async fn adopt_brings_an_external_child_under_containment() {
let mut cmd = if cfg!(windows) {
let mut c = tokio::process::Command::new("ping");
c.args(["-n", "30", "127.0.0.1"]);
c
} else {
let mut c = tokio::process::Command::new("sleep");
c.arg("30");
c
};
cmd.stdout(std::process::Stdio::null());
let mut child = cmd.spawn().expect("spawn external child");
let group = ProcessGroup::new().expect("create group");
group.adopt(&child).expect("adopt external child");
group.terminate_all().expect("terminate the adopted tree");
let start = Instant::now();
let _ = tokio::time::timeout(Duration::from_secs(10), child.wait())
.await
.expect("adopted child reaped in time");
assert!(
start.elapsed() < Duration::from_secs(5),
"adopted child was not contained (took {:?})",
start.elapsed()
);
}
#[tokio::test]
#[ignore = "spawns real subprocesses and lists the group's members"]
async fn members_lists_live_children() {
let group = ProcessGroup::new().expect("create group");
if matches!(group.mechanism(), Mechanism::None) {
eprintln!("skipping: no containment on this target");
return;
}
let _a = group.start(&sleeper()).await.expect("start first sleeper");
let _b = group.start(&sleeper()).await.expect("start second sleeper");
let members = group.members().expect("members");
assert!(members.len() >= 2, "members: {members:?}");
}
#[tokio::test]
#[ignore = "spawns real subprocesses and watches the member list shrink"]
async fn members_shrinks_when_a_child_dies() {
let group = ProcessGroup::new().expect("create group");
if matches!(group.mechanism(), Mechanism::None) {
eprintln!("skipping: no containment on this target");
return;
}
let _keep = group.start(&sleep_secs(30)).await.expect("start survivor");
let mut dying = group.start(&sleep_secs(30)).await.expect("start victim");
let before = group.members().expect("members").len();
assert!(before >= 2, "expected at least two members, got {before}");
dying.start_kill().expect("kill victim");
let _ = tokio::time::timeout(Duration::from_secs(10), dying.wait())
.await
.expect("victim reaped in time");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let now = group.members().expect("members").len();
if now < before {
break;
}
assert!(
Instant::now() < deadline,
"member count never dropped below {before}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[tokio::test]
#[ignore = "creates an OS job/cgroup"]
async fn members_on_empty_group_is_empty() {
let group = ProcessGroup::new().expect("create group");
let members = group.members().expect("members");
assert!(members.is_empty(), "fresh group has members: {members:?}");
}
#[tokio::test]
#[ignore = "spawns a short subprocess and adopts it after reaping"]
async fn adopt_of_a_reaped_child_errors_instead_of_tracking_nothing() {
let group = ProcessGroup::new().expect("create group");
if matches!(group.mechanism(), Mechanism::None) {
eprintln!("skipping: no containment on this target");
return;
}
let mut cmd = if cfg!(windows) {
let mut c = tokio::process::Command::new("cmd");
c.args(["/c", "exit", "0"]);
c
} else {
let mut c = tokio::process::Command::new("sh");
c.args(["-c", "exit 0"]);
c
};
let mut child = cmd.spawn().expect("spawn short child");
let _ = tokio::time::timeout(Duration::from_secs(10), child.wait())
.await
.expect("short child exits");
let err = group
.adopt(&child)
.expect_err("adopting a reaped child must error");
assert!(
matches!(err, processkit::Error::Io(_)),
"expected the no-pid Io error, got {err:?}"
);
}
#[tokio::test]
#[ignore = "creates an OS job/cgroup"]
async fn empty_group_accepts_lifecycle_calls() {
let group = ProcessGroup::new().expect("create group");
if matches!(group.mechanism(), Mechanism::None) {
eprintln!("skipping: no containment on this target");
return;
}
group.signal(Signal::Kill).expect("Kill on an empty group");
if cfg!(windows) {
let err = group
.signal(Signal::Term)
.expect_err("only Kill is deliverable on Windows");
assert!(
matches!(err, processkit::Error::Unsupported { .. }),
"expected Unsupported, got {err:?}"
);
} else {
group.signal(Signal::Term).expect("Term on an empty group");
}
group.suspend().expect("suspend an empty group");
group.resume().expect("resume an empty group");
#[cfg(feature = "stats")]
{
let stats = group.stats().expect("stats on an empty group");
assert_eq!(stats.active_process_count, 0);
}
}
#[cfg(windows)]
#[tokio::test]
#[ignore = "spawns a real subprocess and nests suspend/resume"]
async fn windows_nested_suspend_needs_matching_resumes() {
use tokio_stream::StreamExt;
let group = ProcessGroup::new().expect("create group");
let mut run = group
.start(&Command::new("ping").args(["-n", "31", "127.0.0.1"]))
.await
.expect("start ticker");
let mut lines = run.stdout_lines();
tokio::time::timeout(Duration::from_secs(15), lines.next())
.await
.expect("ticker prints")
.expect("first line");
group.suspend().expect("suspend #1");
group.suspend().expect("suspend #2");
group.resume().expect("resume #1 of 2");
loop {
match tokio::time::timeout(Duration::from_secs(2), lines.next()).await {
Ok(Some(_)) => continue,
Ok(None) => panic!("ticker exited while suspended"),
Err(_) => break,
}
}
assert!(
tokio::time::timeout(Duration::from_secs(3), lines.next())
.await
.is_err(),
"one resume must not thaw two suspends"
);
group.resume().expect("resume #2 of 2");
let line = tokio::time::timeout(Duration::from_secs(15), lines.next())
.await
.expect("a balanced resume thaws the tree");
assert!(line.is_some(), "ticker resumed output");
}
#[cfg(target_os = "linux")]
#[tokio::test]
#[ignore = "adopts a real subprocess into a suspended cgroup"]
async fn linux_cgroup_adopt_into_suspended_group_freezes_the_child() {
use tokio::io::AsyncBufReadExt;
let group = ProcessGroup::new().expect("create group");
if !matches!(group.mechanism(), Mechanism::CgroupV2) {
eprintln!("skipping: needs the cgroup mechanism");
return;
}
let mut ticker = tokio::process::Command::new("sh")
.args(["-c", "while :; do echo tick; sleep 0.25; done"])
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()
.expect("spawn ticker");
let stdout = ticker.stdout.take().expect("ticker stdout");
let mut lines = tokio::io::BufReader::new(stdout).lines();
tokio::time::timeout(Duration::from_secs(10), lines.next_line())
.await
.expect("ticker prints")
.expect("read line")
.expect("a tick before adoption");
group.suspend().expect("suspend the empty group");
group
.adopt(&ticker)
.expect("adopt the ticker into the frozen cgroup");
loop {
match tokio::time::timeout(Duration::from_secs(1), lines.next_line()).await {
Ok(Ok(Some(_))) => continue,
Ok(_) => panic!("ticker exited while frozen"),
Err(_) => break,
}
}
assert!(
tokio::time::timeout(Duration::from_secs(2), lines.next_line())
.await
.is_err(),
"a child adopted into a suspended cgroup must freeze on attach"
);
group.resume().expect("resume");
let line = tokio::time::timeout(Duration::from_secs(10), lines.next_line())
.await
.expect("thawed ticker resumes output")
.expect("read line");
assert_eq!(line.as_deref(), Some("tick"));
let _ = ticker.kill().await;
}