1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
//! Cancellation: `Command::cancel_on` — `cancellation`-gated via the `mod`
//! declaration in `main.rs`.
use std::time::{Duration, Instant};
use processkit::{CancellationToken, Command, ProcessGroup};
use crate::common::*;
/// Whether a process with `pid` is still alive, per platform.
fn pid_alive(pid: u32) -> bool {
#[cfg(windows)]
return windows_pid_alive(pid);
#[cfg(unix)]
// SAFETY: signal 0 is a sound liveness probe.
return unsafe { libc::kill(pid as i32, 0) == 0 };
#[cfg(not(any(windows, unix)))]
{
let _ = pid;
false
}
}
#[tokio::test]
#[ignore = "spawns real subprocesses and cancels one mid-run"]
async fn cancel_mid_run_errors_and_kills_only_the_cancelled_child() {
let group = ProcessGroup::new().expect("create group");
let token = CancellationToken::new();
// A sibling in the same shared group: cancellation must not touch it
// (same child-only scope as a timeout on a shared-group handle).
let sibling = group.start(&sleep_secs(30)).await.expect("start sibling");
let sibling_pid = sibling.pid().expect("sibling pid");
// Single-process sleeper, deliberately: the cmd-wrapped `sleeper()` is
// two processes on Windows, and the child-only cancel kill would leave
// the grandchild holding the stdout pipe — stalling teardown for the
// full pump grace instead of ending promptly.
let run = group
.start(&sleep_secs(30).cancel_on(token.clone()))
.await
.expect("start cancellable sleeper");
let pid = run.pid().expect("pid");
let canceller = tokio::spawn({
let token = token.clone();
async move {
tokio::time::sleep(Duration::from_millis(300)).await;
token.cancel();
}
});
let start = Instant::now();
let err = run
.output_string()
.await
.expect_err("a cancelled run must error, not produce a result");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
// Promptness: the sleeper runs ~30s if cancellation is broken. Generous
// headroom for full-suite load (cf. the widened timeout-test bounds).
assert!(
start.elapsed() < Duration::from_secs(10),
"cancel was not prompt (took {:?})",
start.elapsed()
);
canceller.await.expect("canceller task");
// The cancelled child is dead AND reaped by the time `output_string`
// returned: the cancel arm's kill_tree start-kills and then awaits the
// child. (No raw post-mortem pid probe here: a dead pid is recycled by
// a parallel-suite neighbour within seconds on Windows, which made an
// earlier probe loop flake.) The prompt Err above is the death proof.
// The shared group's sibling is untouched — probing a process we hold
// a live handle to is reuse-safe.
let _ = pid;
assert!(
pid_alive(sibling_pid),
"cancel must kill the child only, not shared-group siblings"
);
drop(sibling);
}
#[tokio::test]
#[ignore = "exercises the pre-spawn short-circuit (no real subprocess)"]
async fn pre_cancelled_token_short_circuits_before_spawning() {
let token = CancellationToken::new();
token.cancel();
let start = Instant::now();
// A program that doesn't exist: reaching the OS spawn would fail with
// an Io error, so getting Cancelled proves the short-circuit fired
// before any spawn was attempted.
let err = Command::new("processkit-no-such-program-424242")
.cancel_on(token)
.run()
.await
.expect_err("a pre-cancelled run must not start");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
assert!(
start.elapsed() < Duration::from_secs(2),
"short-circuit was not immediate (took {:?})",
start.elapsed()
);
}
#[tokio::test]
#[ignore = "spawns a real subprocess and cancels it mid-stream"]
async fn cancel_ends_the_stream_and_finish_streamed_reports_it() {
use tokio_stream::StreamExt;
let token = CancellationToken::new();
// Windows: the Job kill is atomic — the cmd-wrapped banner child is
// fine. Unix: deliberately FORK-FREE (`read` parks the shell itself,
// stdin kept open) — a `sleep 30` forked at cancel time escaped the
// pgroup broadcast on macOS CI (killpg is documented best-effort
// against a forking tree) and held the stdout pipe open past the
// stream bound.
let child = if cfg!(windows) {
banner_then_idle()
} else {
Command::new("sh")
.args(["-c", "echo ready; read line"])
.keep_stdin_open()
};
let mut run = child
.cancel_on(token.clone())
.start()
.await
.expect("start banner child");
let pid = run.pid().expect("pid");
let mut lines = run.stdout_lines();
// Wait for the banner so the cancel provably lands mid-stream.
let first = tokio::time::timeout(Duration::from_secs(15), lines.next())
.await
.expect("banner in time")
.expect("banner line");
assert!(first.contains("ready"), "line: {first:?}");
token.cancel();
// The cancel tears the (handle-owned) tree down, the pipes close, and
// the stream ends — the child would otherwise idle ~30s. On a timeout,
// report whether the direct child is even dead — that separates "the
// kill never landed" from "the pipe stayed open" (seen once on macOS
// CI; the probe makes the next occurrence diagnosable).
let start = Instant::now();
loop {
match tokio::time::timeout(Duration::from_secs(15), lines.next()).await {
Ok(Some(_)) => continue,
Ok(None) => break,
Err(_) => panic!(
"stream did not end within 15s of the cancel \
(direct child still alive: {})",
pid_alive(pid)
),
}
}
assert!(
start.elapsed() < Duration::from_secs(15),
"stream did not end promptly (took {:?})",
start.elapsed()
);
let err = run
.finish_streamed()
.await
.expect_err("finishing a cancelled streamed run must error");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
}