tokio_process_tools/process_handle/
drop_guard.rs1#[cfg(any(unix, windows))]
2use super::termination::GracefulTimeouts;
3use super::{DropMode, ProcessHandle};
4use crate::output_stream::OutputStream;
5use crate::panic_on_drop::PanicOnDrop;
6#[cfg(any(unix, windows))]
7use crate::terminate_on_drop::TerminateOnDrop;
8
9impl<Stdout, Stderr> Drop for ProcessHandle<Stdout, Stderr>
10where
11 Stdout: OutputStream,
12 Stderr: OutputStream,
13{
14 fn drop(&mut self) {
15 match &self.drop_mode {
16 DropMode::Armed { .. } => {
17 if let Err(err) = drop_kill(&mut self.child) {
24 tracing::warn!(
25 process = %self.name,
26 error = %err,
27 "Failed to kill process while dropping an armed ProcessHandle"
28 );
29 }
30 }
31 DropMode::Disarmed => {}
32 }
33 }
34}
35
36impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
37where
38 Stdout: OutputStream,
39 Stderr: OutputStream,
40{
41 pub(super) fn new_armed_drop_mode() -> DropMode {
42 DropMode::Armed {
43 panic: armed_panic_guard(),
44 }
45 }
46
47 pub fn must_be_terminated(&mut self) {
68 match &mut self.drop_mode {
69 DropMode::Armed { panic } if panic.is_armed() => {
70 }
72 _ => {
73 self.drop_mode = DropMode::Armed {
74 panic: armed_panic_guard(),
75 };
76 }
77 }
78 }
79
80 pub fn must_not_be_terminated(&mut self) {
93 if let DropMode::Armed { panic } = &mut self.drop_mode {
96 panic.defuse();
97 }
98 self.drop_mode = DropMode::Disarmed;
99 }
100
101 #[cfg(test)]
102 pub(crate) fn is_drop_armed(&self) -> bool {
103 matches!(&self.drop_mode, DropMode::Armed { panic } if panic.is_armed())
104 }
105
106 #[cfg(test)]
107 pub(crate) fn is_drop_disarmed(&self) -> bool {
108 matches!(self.drop_mode, DropMode::Disarmed)
109 }
110
111 #[cfg(any(unix, windows))]
123 pub fn terminate_on_drop(self, timeouts: GracefulTimeouts) -> TerminateOnDrop<Stdout, Stderr> {
124 TerminateOnDrop {
125 process_handle: self,
126 timeouts,
127 }
128 }
129}
130
131fn drop_kill(child: &mut tokio::process::Child) -> std::io::Result<()> {
132 #[cfg(unix)]
133 {
134 match child.id() {
135 Some(pid) => crate::signal::send_kill_to_process_group(pid),
136 None => child.start_kill(),
137 }
138 }
139 #[cfg(not(unix))]
140 {
141 child.start_kill()
145 }
146}
147
148fn armed_panic_guard() -> PanicOnDrop {
149 PanicOnDrop::new(
150 "tokio_process_tools::ProcessHandle",
151 "The process was not terminated.",
152 "Successfully call `wait_for_completion`, `terminate`, or `kill`, or call `must_not_be_terminated` before the type is dropped!",
153 )
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::test_support::long_running_command;
160 use crate::{
161 BestEffortDelivery, BroadcastOutputStream, DEFAULT_MAX_BUFFERED_CHUNKS,
162 DEFAULT_READ_CHUNK_SIZE, NoReplay,
163 };
164 use assertr::prelude::*;
165 use std::time::Duration;
166
167 fn spawn_long_running_process()
168 -> ProcessHandle<BroadcastOutputStream<BestEffortDelivery, NoReplay>> {
169 crate::Process::new(long_running_command(Duration::from_secs(5)))
170 .name("long-running")
171 .stdout_and_stderr(|stream| {
172 stream
173 .broadcast()
174 .best_effort_delivery()
175 .no_replay()
176 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
177 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
178 })
179 .spawn()
180 .unwrap()
181 }
182
183 #[tokio::test]
184 async fn must_be_terminated_is_idempotent_when_already_armed() {
185 let mut process = spawn_long_running_process();
186
187 process.must_be_terminated();
188 assert_that!(process.is_drop_armed()).is_true();
189
190 process.kill().await.unwrap();
191 }
192
193 #[tokio::test]
194 async fn must_be_terminated_re_arms_safeguards_after_opt_out() {
195 let mut process = spawn_long_running_process();
196
197 process.must_not_be_terminated();
198 assert_that!(process.is_drop_disarmed()).is_true();
199
200 process.must_be_terminated();
201 assert_that!(process.is_drop_armed()).is_true();
202
203 process.kill().await.unwrap();
204 }
205
206 #[cfg(unix)]
207 #[tokio::test]
208 async fn must_not_be_terminated_lets_child_outlive_dropped_handle() {
209 use nix::errno::Errno;
210 use nix::sys::signal::{self, Signal};
211 use nix::sys::wait::waitpid;
212 use nix::unistd::Pid;
213
214 let mut process = spawn_long_running_process();
215 let pid = process.id().unwrap();
216
217 process.must_not_be_terminated();
218 assert_that!(process.is_drop_disarmed()).is_true();
219 drop(process);
220
221 let pid = Pid::from_raw(pid.cast_signed());
222 assert_that!(signal::kill(pid, None).is_ok()).is_true();
223
224 signal::kill(pid, Signal::SIGKILL).unwrap();
225 match waitpid(pid, None) {
226 Ok(_) | Err(Errno::ECHILD) => {}
227 Err(err) => {
228 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
229 }
230 }
231 }
232
233 #[cfg(unix)]
234 #[tokio::test]
235 async fn must_not_be_terminated_still_closes_stdin_on_drop() {
236 use nix::errno::Errno;
237 use nix::sys::wait::waitpid;
238 use nix::unistd::Pid;
239 use std::fs;
240 use tempfile::tempdir;
241
242 let temp_dir = tempdir().unwrap();
243 let output_file = temp_dir.path().join("stdin-result.txt");
244 let output_file = output_file.to_str().unwrap().replace('\'', "'\"'\"'");
245
246 let mut cmd = tokio::process::Command::new("sh");
247 cmd.arg("-c")
248 .arg(format!("cat >/dev/null; printf eof > '{output_file}'"));
249
250 let mut process = crate::Process::new(cmd)
251 .name("sh")
252 .stdout_and_stderr(|stream| {
253 stream
254 .broadcast()
255 .best_effort_delivery()
256 .no_replay()
257 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
258 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
259 })
260 .spawn()
261 .unwrap();
262 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
263
264 process.must_not_be_terminated();
265 drop(process);
266
267 match tokio::time::timeout(
268 Duration::from_secs(2),
269 tokio::task::spawn_blocking(move || waitpid(pid, None)),
270 )
271 .await
272 .unwrap()
273 .unwrap()
274 {
275 Ok(_) | Err(Errno::ECHILD) => {}
276 Err(err) => {
277 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
278 }
279 }
280
281 assert_that!(fs::read_to_string(temp_dir.path().join("stdin-result.txt")).unwrap())
282 .is_equal_to("eof");
283 }
284
285 #[cfg(unix)]
286 #[tokio::test]
287 async fn must_not_be_terminated_still_closes_stdout_pipe_on_drop() {
288 use nix::errno::Errno;
289 use nix::sys::wait::waitpid;
290 use nix::unistd::Pid;
291
292 let mut cmd = tokio::process::Command::new("yes");
293 cmd.arg("tick");
294
295 let mut process = crate::Process::new(cmd)
296 .name("yes")
297 .stdout_and_stderr(|stream| {
298 stream
299 .broadcast()
300 .best_effort_delivery()
301 .no_replay()
302 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
303 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
304 })
305 .spawn()
306 .unwrap();
307 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
308
309 process.must_not_be_terminated();
310 drop(process);
311
312 match tokio::time::timeout(
313 Duration::from_secs(2),
314 tokio::task::spawn_blocking(move || waitpid(pid, None)),
315 )
316 .await
317 .unwrap()
318 .unwrap()
319 {
320 Ok(_) | Err(Errno::ECHILD) => {}
321 Err(err) => {
322 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
323 }
324 }
325 }
326}