1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
use super::{DropMode, ProcessHandle};
use crate::output_stream::OutputStream;
use crate::panic_on_drop::PanicOnDrop;
use crate::terminate_on_drop::TerminateOnDrop;
use std::time::Duration;
impl<Stdout, Stderr> Drop for ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
fn drop(&mut self) {
match &self.drop_mode {
DropMode::Armed { .. } => {
// We want users to explicitly await or terminate spawned processes.
// If not done so, kill the process group now to have some sort of last-resort
// cleanup. The panic guard will additionally raise a panic when this method
// returns, signalling the misuse loudly. Targeting the group (rather than the
// child's PID alone) catches any grandchildren the child has fork-execed, which
// is the same invariant the explicit `kill()` path upholds.
if let Err(err) = drop_kill(&mut self.child) {
tracing::warn!(
process = %self.name,
error = %err,
"Failed to kill process while dropping an armed ProcessHandle"
);
}
}
DropMode::Disarmed => {}
}
}
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub(super) fn new_armed_drop_mode() -> DropMode {
DropMode::Armed {
panic: armed_panic_guard(),
}
}
/// Sets a panic-on-drop mechanism for this `ProcessHandle`.
///
/// This method enables a safeguard that ensures that the process represented by this
/// `ProcessHandle` is properly terminated or awaited before being dropped.
/// If `must_be_terminated` is set and the `ProcessHandle` is
/// dropped without successfully terminating, killing, waiting for, or explicitly detaching the
/// process, an intentional panic will occur to prevent silent failure-states, ensuring that
/// system resources are handled correctly.
///
/// You typically do not need to call this, as every `ProcessHandle` is marked by default.
/// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
/// process without terminating it.
/// Calling this method while the safeguard is already enabled is safe and has no effect beyond
/// keeping the handle armed.
///
/// # Panic
///
/// If the `ProcessHandle` is dropped without being awaited or terminated successfully
/// after calling this method, a panic will occur with a descriptive message
/// to inform about the incorrect usage.
pub fn must_be_terminated(&mut self) {
match &mut self.drop_mode {
DropMode::Armed { panic } if panic.is_armed() => {
// Already armed; nothing to do.
}
_ => {
self.drop_mode = DropMode::Armed {
panic: armed_panic_guard(),
};
}
}
}
/// Disables the kill/panic-on-drop safeguards for this handle.
///
/// Dropping the handle after calling this method will no longer signal, kill, or panic.
/// However, this does **not** keep the library-owned stdio pipes alive. If the child still
/// depends on stdin, stdout, or stderr being open, dropping the handle may still affect it.
///
/// Use plain [`tokio::process::Command`] directly when you need a child process that can
/// outlive the original handle without depending on captured stdio pipes.
pub fn must_not_be_terminated(&mut self) {
// Defuse the panic guard before swapping the variant so the dropped `PanicOnDrop` does
// not fire when the old `Armed` value is dropped by the assignment.
if let DropMode::Armed { panic } = &mut self.drop_mode {
panic.defuse();
}
self.drop_mode = DropMode::Disarmed;
}
#[cfg(test)]
pub(crate) fn is_drop_armed(&self) -> bool {
matches!(&self.drop_mode, DropMode::Armed { panic } if panic.is_armed())
}
#[cfg(test)]
pub(crate) fn is_drop_disarmed(&self) -> bool {
matches!(self.drop_mode, DropMode::Disarmed)
}
/// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
/// automatically when this handle is dropped.
///
/// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
///
/// Prefer manual termination of the process or awaiting it and relying on the (automatically
/// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
/// nor terminated before being dropped.
pub fn terminate_on_drop(
self,
graceful_termination_timeout: Duration,
forceful_termination_timeout: Duration,
) -> TerminateOnDrop<Stdout, Stderr> {
TerminateOnDrop {
process_handle: self,
interrupt_timeout: graceful_termination_timeout,
terminate_timeout: forceful_termination_timeout,
}
}
}
fn drop_kill(child: &mut tokio::process::Child) -> std::io::Result<()> {
#[cfg(unix)]
{
match child.id() {
Some(pid) => crate::signal::send_kill_to_process_group(pid),
None => child.start_kill(),
}
}
#[cfg(not(unix))]
{
child.start_kill()
}
}
fn armed_panic_guard() -> PanicOnDrop {
PanicOnDrop::new(
"tokio_process_tools::ProcessHandle",
"The process was not terminated.",
"Successfully call `wait_for_completion`, `terminate`, or `kill`, or call `must_not_be_terminated` before the type is dropped!",
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_support::long_running_command;
use crate::{
BestEffortDelivery, BroadcastOutputStream, DEFAULT_MAX_BUFFERED_CHUNKS,
DEFAULT_READ_CHUNK_SIZE, NoReplay,
};
use assertr::prelude::*;
fn spawn_long_running_process()
-> ProcessHandle<BroadcastOutputStream<BestEffortDelivery, NoReplay>> {
crate::Process::new(long_running_command(Duration::from_secs(5)))
.name("long-running")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap()
}
#[tokio::test]
async fn must_be_terminated_is_idempotent_when_already_armed() {
let mut process = spawn_long_running_process();
process.must_be_terminated();
assert_that!(process.is_drop_armed()).is_true();
process.kill().await.unwrap();
}
#[tokio::test]
async fn must_be_terminated_re_arms_safeguards_after_opt_out() {
let mut process = spawn_long_running_process();
process.must_not_be_terminated();
assert_that!(process.is_drop_disarmed()).is_true();
process.must_be_terminated();
assert_that!(process.is_drop_armed()).is_true();
process.kill().await.unwrap();
}
#[cfg(unix)]
#[tokio::test]
async fn must_not_be_terminated_lets_child_outlive_dropped_handle() {
use nix::errno::Errno;
use nix::sys::signal::{self, Signal};
use nix::sys::wait::waitpid;
use nix::unistd::Pid;
let mut process = spawn_long_running_process();
let pid = process.id().unwrap();
process.must_not_be_terminated();
assert_that!(process.is_drop_disarmed()).is_true();
drop(process);
let pid = Pid::from_raw(pid.cast_signed());
assert_that!(signal::kill(pid, None).is_ok()).is_true();
signal::kill(pid, Signal::SIGKILL).unwrap();
match waitpid(pid, None) {
Ok(_) | Err(Errno::ECHILD) => {}
Err(err) => {
assert_that!(err).fail(format_args!("waitpid failed: {err}"));
}
}
}
#[cfg(unix)]
#[tokio::test]
async fn must_not_be_terminated_still_closes_stdin_on_drop() {
use nix::errno::Errno;
use nix::sys::wait::waitpid;
use nix::unistd::Pid;
use std::fs;
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let output_file = temp_dir.path().join("stdin-result.txt");
let output_file = output_file.to_str().unwrap().replace('\'', "'\"'\"'");
let mut cmd = tokio::process::Command::new("sh");
cmd.arg("-c")
.arg(format!("cat >/dev/null; printf eof > '{output_file}'"));
let mut process = crate::Process::new(cmd)
.name("sh")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let pid = Pid::from_raw(process.id().unwrap().cast_signed());
process.must_not_be_terminated();
drop(process);
match tokio::time::timeout(
Duration::from_secs(2),
tokio::task::spawn_blocking(move || waitpid(pid, None)),
)
.await
.unwrap()
.unwrap()
{
Ok(_) | Err(Errno::ECHILD) => {}
Err(err) => {
assert_that!(err).fail(format_args!("waitpid failed: {err}"));
}
}
assert_that!(fs::read_to_string(temp_dir.path().join("stdin-result.txt")).unwrap())
.is_equal_to("eof");
}
#[cfg(unix)]
#[tokio::test]
async fn must_not_be_terminated_still_closes_stdout_pipe_on_drop() {
use nix::errno::Errno;
use nix::sys::wait::waitpid;
use nix::unistd::Pid;
let mut cmd = tokio::process::Command::new("yes");
cmd.arg("tick");
let mut process = crate::Process::new(cmd)
.name("yes")
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let pid = Pid::from_raw(process.id().unwrap().cast_signed());
process.must_not_be_terminated();
drop(process);
match tokio::time::timeout(
Duration::from_secs(2),
tokio::task::spawn_blocking(move || waitpid(pid, None)),
)
.await
.unwrap()
.unwrap()
{
Ok(_) | Err(Errno::ECHILD) => {}
Err(err) => {
assert_that!(err).fail(format_args!("waitpid failed: {err}"));
}
}
}
}