mod common;
use std::time::Duration;
use processkit::{Command, JobRunner, ProcessGroup, RunningProcess, output_all, wait_all};
use crate::common::*;
#[tokio::test]
async fn concurrent_spawn_into_one_group() {
if skip_unless_enabled("concurrent_spawn_into_one_group") {
return;
}
for n in [50usize, 100] {
let group = ProcessGroup::new().expect("create group");
let cmds: Vec<Command> = (0..n).map(|_| quick_exit()).collect();
let results = tokio::time::timeout(Duration::from_secs(60), output_all(cmds, n, &group))
.await
.expect("the spawn burst finished in time");
assert_eq!(results.len(), n);
assert!(
results.iter().all(|r| matches!(r, Ok(o) if o.is_success())),
"all {n} children spawn and exit 0"
);
let extra = group
.start(&quick_exit())
.await
.expect("group still usable after the burst");
let _ = extra.wait().await.expect("reap the extra child");
}
}
#[tokio::test]
async fn many_private_groups_at_once() {
if skip_unless_enabled("many_private_groups_at_once") {
return;
}
let n = 100usize;
let cmds: Vec<Command> = (0..n).map(|_| quick_exit()).collect();
let results = tokio::time::timeout(Duration::from_secs(60), output_all(cmds, n, &JobRunner))
.await
.expect("the batch finished in time");
assert_eq!(results.len(), n);
assert!(
results.iter().all(|r| matches!(r, Ok(o) if o.is_success())),
"all {n} private-group runs succeed"
);
}
#[tokio::test]
async fn sustained_spawn_reap_churn_does_not_leak() {
if skip_unless_enabled("sustained_spawn_reap_churn") {
return;
}
#[cfg(target_os = "linux")]
let before = open_fd_count();
for _ in 0..100 {
let result = quick_exit().output_string().await.expect("run a child");
assert!(result.is_success());
}
#[cfg(target_os = "linux")]
{
let after = open_fd_count();
assert!(
after <= before + 8,
"fd count grew across 100 spawn/reap cycles: {before} -> {after}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn large_group_teardown_kills_the_whole_tree() {
if skip_unless_enabled("large_group_teardown") {
return;
}
const GRACE: Duration = Duration::from_secs(45);
{
let group = ProcessGroup::new().expect("create group");
let mut handles = Vec::new();
for _ in 0..50 {
handles.push(group.start(&long_sleeper()).await.expect("start child"));
}
let waiters: Vec<_> = handles
.into_iter()
.map(|h| tokio::spawn(h.wait()))
.collect();
drop(group);
assert_all_reaped_within(waiters, GRACE, "drop").await;
}
{
let group = ProcessGroup::new().expect("create group");
let mut handles = Vec::new();
for _ in 0..50 {
handles.push(group.start(&long_sleeper()).await.expect("start child"));
}
let waiters: Vec<_> = handles
.into_iter()
.map(|h| tokio::spawn(h.wait()))
.collect();
tokio::time::timeout(Duration::from_secs(30), group.shutdown())
.await
.expect("shutdown stays bounded")
.expect("shutdown ok");
assert_all_reaped_within(waiters, GRACE, "shutdown").await;
}
}
async fn assert_all_reaped_within(
waiters: Vec<tokio::task::JoinHandle<processkit::Result<Option<i32>>>>,
grace: Duration,
how: &str,
) {
let reaped = tokio::time::timeout(grace, async {
for waiter in waiters {
let _ = waiter.await.expect("a wait() task panicked");
}
})
.await;
assert!(
reaped.is_ok(),
"{how} must reap all 50 children within {grace:?} (their natural exit is ~90s)"
);
}
#[tokio::test]
async fn concurrent_kill_reaps_every_handle() {
if skip_unless_enabled("concurrent_kill") {
return;
}
let group = ProcessGroup::new().expect("create group");
let mut handles = Vec::new();
for _ in 0..20 {
handles.push(group.start(&long_sleeper()).await.expect("start child"));
}
for handle in &mut handles {
handle.start_kill().expect("start_kill");
}
let mut refs: Vec<&mut RunningProcess> = handles.iter_mut().collect();
let codes = tokio::time::timeout(Duration::from_secs(15), wait_all(&mut refs))
.await
.expect("all kills reaped in time")
.expect("join");
assert_eq!(codes.len(), 20, "every killed handle is collected");
}
#[cfg(feature = "cancellation")]
#[tokio::test]
async fn cancellation_storm_resolves_every_call() {
use processkit::{CancellationToken, Error};
if skip_unless_enabled("cancellation_storm") {
return;
}
let n = 50usize;
let tokens: Vec<CancellationToken> = (0..n).map(|_| CancellationToken::new()).collect();
let tasks: Vec<_> = tokens
.iter()
.map(|token| {
let cmd = long_sleeper().cancel_on(token.clone());
tokio::spawn(async move { cmd.run().await })
})
.collect();
tokio::time::sleep(Duration::from_millis(200)).await;
for token in &tokens {
token.cancel();
}
for task in tasks {
let result = tokio::time::timeout(Duration::from_secs(15), task)
.await
.expect("cancelled run resolves in time")
.expect("task did not panic");
assert!(
matches!(result, Err(Error::Cancelled { .. })),
"every cancelled run resolves to Error::Cancelled, got {result:?}"
);
}
}
#[tokio::test]
async fn buffer_saturation_drains_without_blocking_the_child() {
use processkit::OutputBufferPolicy;
if skip_unless_enabled("buffer_saturation") {
return;
}
let result = tokio::time::timeout(
Duration::from_secs(60),
line_emitter(100_000)
.output_buffer(OutputBufferPolicy::bounded(1000))
.output_string(),
)
.await
.expect("the emitter ran to completion")
.expect("output captured");
assert!(
result.is_success(),
"the child drains and exits cleanly (never blocked on a full pipe): {:?}",
result.code()
);
let retained = result.stdout().lines().count();
assert!(
(1..=1000).contains(&retained),
"the bounded policy caps retained lines at 1000, got {retained}"
);
}
#[tokio::test]
async fn supervisor_storm_guard_trips_under_real_restarts() {
use processkit::{RestartPolicy, Supervisor};
if skip_unless_enabled("supervisor_storm") {
return;
}
let outcome = tokio::time::timeout(
Duration::from_secs(30),
Supervisor::new(always_fail())
.restart(RestartPolicy::Always)
.max_restarts(6)
.backoff(Duration::from_millis(1), 1.0)
.storm_pause(Duration::from_millis(50))
.failure_threshold(1.5)
.failure_decay(Duration::from_secs(1000))
.run(),
)
.await
.expect("supervision stayed bounded")
.expect("supervision ran");
assert!(
outcome.storm_pauses > 0,
"the storm guard must trip under a real restart storm, got {}",
outcome.storm_pauses
);
}