use std::time::{Duration, Instant};
use processkit::{CancellationToken, Command, ProcessGroup};
use crate::common::*;
fn pid_alive(pid: u32) -> bool {
#[cfg(windows)]
return windows_pid_alive(pid);
#[cfg(unix)]
return unsafe { libc::kill(pid as i32, 0) == 0 };
#[cfg(not(any(windows, unix)))]
{
let _ = pid;
false
}
}
#[tokio::test]
#[ignore = "spawns real subprocesses and cancels one mid-run"]
async fn cancel_mid_run_errors_and_kills_only_the_cancelled_child() {
let group = ProcessGroup::new().expect("create group");
let token = CancellationToken::new();
let sibling = group.start(&sleep_secs(30)).await.expect("start sibling");
let sibling_pid = sibling.pid().expect("sibling pid");
let run = group
.start(&sleep_secs(30).cancel_on(token.clone()))
.await
.expect("start cancellable sleeper");
let pid = run.pid().expect("pid");
let canceller = tokio::spawn({
let token = token.clone();
async move {
tokio::time::sleep(Duration::from_millis(300)).await;
token.cancel();
}
});
let start = Instant::now();
let err = run
.output_string()
.await
.expect_err("a cancelled run must error, not produce a result");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
assert!(
start.elapsed() < Duration::from_secs(10),
"cancel was not prompt (took {:?})",
start.elapsed()
);
canceller.await.expect("canceller task");
let _ = pid;
assert!(
pid_alive(sibling_pid),
"cancel must kill the child only, not shared-group siblings"
);
drop(sibling);
}
#[tokio::test]
#[ignore = "spawns a real subprocess through a client-level cancellation default"]
async fn client_default_cancel_on_cancels_a_real_run() {
use processkit::CliClient;
let token = CancellationToken::new();
let sleeper = sleep_secs(30);
let client = CliClient::new(sleeper.program()).default_cancel_on(token.clone());
let cmd = client.command(sleeper.arguments().iter().map(|a| a.to_os_string()));
let canceller = tokio::spawn({
let token = token.clone();
async move {
tokio::time::sleep(Duration::from_millis(300)).await;
token.cancel();
}
});
let start = Instant::now();
let err = client
.output(cmd)
.await
.expect_err("a cancelled run must error, not produce a result");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
assert!(
start.elapsed() < Duration::from_secs(10),
"client-default cancel was not prompt (took {:?})",
start.elapsed()
);
canceller.await.expect("canceller task");
}
#[tokio::test]
#[ignore = "exercises the pre-spawn short-circuit (no real subprocess)"]
async fn pre_cancelled_token_short_circuits_before_spawning() {
let token = CancellationToken::new();
token.cancel();
let start = Instant::now();
let err = Command::new("processkit-no-such-program-424242")
.cancel_on(token)
.run()
.await
.expect_err("a pre-cancelled run must not start");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
assert!(
start.elapsed() < Duration::from_secs(2),
"short-circuit was not immediate (took {:?})",
start.elapsed()
);
}
#[tokio::test]
#[ignore = "spawns a real subprocess and cancels it mid-stream"]
async fn cancel_ends_the_stream_and_finish_streamed_reports_it() {
use tokio_stream::StreamExt;
let token = CancellationToken::new();
let child = if cfg!(windows) {
banner_then_idle()
} else {
Command::new("sh")
.args(["-c", "echo ready; read line"])
.keep_stdin_open()
};
let mut run = child
.cancel_on(token.clone())
.start()
.await
.expect("start banner child");
let pid = run.pid().expect("pid");
let mut lines = run.stdout_lines();
let first = tokio::time::timeout(Duration::from_secs(15), lines.next())
.await
.expect("banner in time")
.expect("banner line");
assert!(first.contains("ready"), "line: {first:?}");
token.cancel();
let start = Instant::now();
loop {
match tokio::time::timeout(Duration::from_secs(15), lines.next()).await {
Ok(Some(_)) => continue,
Ok(None) => break,
Err(_) => panic!(
"stream did not end within 15s of the cancel \
(direct child still alive: {})",
pid_alive(pid)
),
}
}
assert!(
start.elapsed() < Duration::from_secs(15),
"stream did not end promptly (took {:?})",
start.elapsed()
);
let err = run
.finish_streamed()
.await
.expect_err("finishing a cancelled streamed run must error");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
}