tokio_process_tools/process_handle/
drop_guard.rs1use super::{DropMode, ProcessHandle};
2use crate::output_stream::OutputStream;
3use crate::panic_on_drop::PanicOnDrop;
4use crate::terminate_on_drop::TerminateOnDrop;
5use std::time::Duration;
6
7impl<Stdout, Stderr> Drop for ProcessHandle<Stdout, Stderr>
8where
9 Stdout: OutputStream,
10 Stderr: OutputStream,
11{
12 fn drop(&mut self) {
13 match &self.drop_mode {
14 DropMode::Armed { .. } => {
15 if let Err(err) = drop_kill(&mut self.child) {
22 tracing::warn!(
23 process = %self.name,
24 error = %err,
25 "Failed to kill process while dropping an armed ProcessHandle"
26 );
27 }
28 }
29 DropMode::Disarmed => {}
30 }
31 }
32}
33
34impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
35where
36 Stdout: OutputStream,
37 Stderr: OutputStream,
38{
39 pub(super) fn new_armed_drop_mode() -> DropMode {
40 DropMode::Armed {
41 panic: armed_panic_guard(),
42 }
43 }
44
45 pub fn must_be_terminated(&mut self) {
66 match &mut self.drop_mode {
67 DropMode::Armed { panic } if panic.is_armed() => {
68 }
70 _ => {
71 self.drop_mode = DropMode::Armed {
72 panic: armed_panic_guard(),
73 };
74 }
75 }
76 }
77
78 pub fn must_not_be_terminated(&mut self) {
91 if let DropMode::Armed { panic } = &mut self.drop_mode {
94 panic.defuse();
95 }
96 self.drop_mode = DropMode::Disarmed;
97 }
98
99 #[cfg(test)]
100 pub(crate) fn is_drop_armed(&self) -> bool {
101 matches!(&self.drop_mode, DropMode::Armed { panic } if panic.is_armed())
102 }
103
104 #[cfg(test)]
105 pub(crate) fn is_drop_disarmed(&self) -> bool {
106 matches!(self.drop_mode, DropMode::Disarmed)
107 }
108
109 pub fn terminate_on_drop(
118 self,
119 graceful_termination_timeout: Duration,
120 forceful_termination_timeout: Duration,
121 ) -> TerminateOnDrop<Stdout, Stderr> {
122 TerminateOnDrop {
123 process_handle: self,
124 interrupt_timeout: graceful_termination_timeout,
125 terminate_timeout: forceful_termination_timeout,
126 }
127 }
128}
129
130fn drop_kill(child: &mut tokio::process::Child) -> std::io::Result<()> {
131 #[cfg(unix)]
132 {
133 match child.id() {
134 Some(pid) => crate::signal::send_kill_to_process_group(pid),
135 None => child.start_kill(),
136 }
137 }
138 #[cfg(not(unix))]
139 {
140 child.start_kill()
141 }
142}
143
144fn armed_panic_guard() -> PanicOnDrop {
145 PanicOnDrop::new(
146 "tokio_process_tools::ProcessHandle",
147 "The process was not terminated.",
148 "Successfully call `wait_for_completion`, `terminate`, or `kill`, or call `must_not_be_terminated` before the type is dropped!",
149 )
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::test_support::long_running_command;
156 use crate::{
157 BestEffortDelivery, BroadcastOutputStream, DEFAULT_MAX_BUFFERED_CHUNKS,
158 DEFAULT_READ_CHUNK_SIZE, NoReplay,
159 };
160 use assertr::prelude::*;
161
162 fn spawn_long_running_process()
163 -> ProcessHandle<BroadcastOutputStream<BestEffortDelivery, NoReplay>> {
164 crate::Process::new(long_running_command(Duration::from_secs(5)))
165 .name("long-running")
166 .stdout_and_stderr(|stream| {
167 stream
168 .broadcast()
169 .best_effort_delivery()
170 .no_replay()
171 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
172 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
173 })
174 .spawn()
175 .unwrap()
176 }
177
178 #[tokio::test]
179 async fn must_be_terminated_is_idempotent_when_already_armed() {
180 let mut process = spawn_long_running_process();
181
182 process.must_be_terminated();
183 assert_that!(process.is_drop_armed()).is_true();
184
185 process.kill().await.unwrap();
186 }
187
188 #[tokio::test]
189 async fn must_be_terminated_re_arms_safeguards_after_opt_out() {
190 let mut process = spawn_long_running_process();
191
192 process.must_not_be_terminated();
193 assert_that!(process.is_drop_disarmed()).is_true();
194
195 process.must_be_terminated();
196 assert_that!(process.is_drop_armed()).is_true();
197
198 process.kill().await.unwrap();
199 }
200
201 #[cfg(unix)]
202 #[tokio::test]
203 async fn must_not_be_terminated_lets_child_outlive_dropped_handle() {
204 use nix::errno::Errno;
205 use nix::sys::signal::{self, Signal};
206 use nix::sys::wait::waitpid;
207 use nix::unistd::Pid;
208
209 let mut process = spawn_long_running_process();
210 let pid = process.id().unwrap();
211
212 process.must_not_be_terminated();
213 assert_that!(process.is_drop_disarmed()).is_true();
214 drop(process);
215
216 let pid = Pid::from_raw(pid.cast_signed());
217 assert_that!(signal::kill(pid, None).is_ok()).is_true();
218
219 signal::kill(pid, Signal::SIGKILL).unwrap();
220 match waitpid(pid, None) {
221 Ok(_) | Err(Errno::ECHILD) => {}
222 Err(err) => {
223 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
224 }
225 }
226 }
227
228 #[cfg(unix)]
229 #[tokio::test]
230 async fn must_not_be_terminated_still_closes_stdin_on_drop() {
231 use nix::errno::Errno;
232 use nix::sys::wait::waitpid;
233 use nix::unistd::Pid;
234 use std::fs;
235 use tempfile::tempdir;
236
237 let temp_dir = tempdir().unwrap();
238 let output_file = temp_dir.path().join("stdin-result.txt");
239 let output_file = output_file.to_str().unwrap().replace('\'', "'\"'\"'");
240
241 let mut cmd = tokio::process::Command::new("sh");
242 cmd.arg("-c")
243 .arg(format!("cat >/dev/null; printf eof > '{output_file}'"));
244
245 let mut process = crate::Process::new(cmd)
246 .name("sh")
247 .stdout_and_stderr(|stream| {
248 stream
249 .broadcast()
250 .best_effort_delivery()
251 .no_replay()
252 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
253 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
254 })
255 .spawn()
256 .unwrap();
257 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
258
259 process.must_not_be_terminated();
260 drop(process);
261
262 match tokio::time::timeout(
263 Duration::from_secs(2),
264 tokio::task::spawn_blocking(move || waitpid(pid, None)),
265 )
266 .await
267 .unwrap()
268 .unwrap()
269 {
270 Ok(_) | Err(Errno::ECHILD) => {}
271 Err(err) => {
272 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
273 }
274 }
275
276 assert_that!(fs::read_to_string(temp_dir.path().join("stdin-result.txt")).unwrap())
277 .is_equal_to("eof");
278 }
279
280 #[cfg(unix)]
281 #[tokio::test]
282 async fn must_not_be_terminated_still_closes_stdout_pipe_on_drop() {
283 use nix::errno::Errno;
284 use nix::sys::wait::waitpid;
285 use nix::unistd::Pid;
286
287 let mut cmd = tokio::process::Command::new("yes");
288 cmd.arg("tick");
289
290 let mut process = crate::Process::new(cmd)
291 .name("yes")
292 .stdout_and_stderr(|stream| {
293 stream
294 .broadcast()
295 .best_effort_delivery()
296 .no_replay()
297 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
298 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
299 })
300 .spawn()
301 .unwrap();
302 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
303
304 process.must_not_be_terminated();
305 drop(process);
306
307 match tokio::time::timeout(
308 Duration::from_secs(2),
309 tokio::task::spawn_blocking(move || waitpid(pid, None)),
310 )
311 .await
312 .unwrap()
313 .unwrap()
314 {
315 Ok(_) | Err(Errno::ECHILD) => {}
316 Err(err) => {
317 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
318 }
319 }
320 }
321}