tokio_process_tools/process_handle/
drop_guard.rs1use super::{DropMode, ProcessHandle};
2use crate::output_stream::OutputStream;
3use crate::panic_on_drop::PanicOnDrop;
4use crate::terminate_on_drop::TerminateOnDrop;
5use std::time::Duration;
6
7impl<Stdout, Stderr> Drop for ProcessHandle<Stdout, Stderr>
8where
9 Stdout: OutputStream,
10 Stderr: OutputStream,
11{
12 fn drop(&mut self) {
13 match &self.drop_mode {
14 DropMode::Armed { .. } => {
15 if let Err(err) = drop_kill(&mut self.child) {
22 tracing::warn!(
23 process = %self.name,
24 error = %err,
25 "Failed to kill process while dropping an armed ProcessHandle"
26 );
27 }
28 }
29 DropMode::Disarmed => {}
30 }
31 }
32}
33
34impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
35where
36 Stdout: OutputStream,
37 Stderr: OutputStream,
38{
39 pub(super) fn new_armed_drop_mode() -> DropMode {
40 DropMode::Armed {
41 panic: armed_panic_guard(),
42 }
43 }
44
45 pub fn must_be_terminated(&mut self) {
66 match &mut self.drop_mode {
67 DropMode::Armed { panic } if panic.is_armed() => {
68 }
70 _ => {
71 self.drop_mode = DropMode::Armed {
72 panic: armed_panic_guard(),
73 };
74 }
75 }
76 }
77
78 pub fn must_not_be_terminated(&mut self) {
87 if let DropMode::Armed { panic } = &mut self.drop_mode {
90 panic.defuse();
91 }
92 self.drop_mode = DropMode::Disarmed;
93 }
94
95 #[cfg(test)]
96 pub(crate) fn is_drop_armed(&self) -> bool {
97 matches!(&self.drop_mode, DropMode::Armed { panic } if panic.is_armed())
98 }
99
100 #[cfg(test)]
101 pub(crate) fn is_drop_disarmed(&self) -> bool {
102 matches!(self.drop_mode, DropMode::Disarmed)
103 }
104
105 pub fn terminate_on_drop(
114 self,
115 graceful_termination_timeout: Duration,
116 forceful_termination_timeout: Duration,
117 ) -> TerminateOnDrop<Stdout, Stderr> {
118 TerminateOnDrop {
119 process_handle: self,
120 interrupt_timeout: graceful_termination_timeout,
121 terminate_timeout: forceful_termination_timeout,
122 }
123 }
124}
125
126fn drop_kill(child: &mut tokio::process::Child) -> std::io::Result<()> {
127 #[cfg(unix)]
128 {
129 match child.id() {
130 Some(pid) => crate::signal::send_kill_to_process_group(pid),
131 None => child.start_kill(),
132 }
133 }
134 #[cfg(not(unix))]
135 {
136 child.start_kill()
137 }
138}
139
140fn armed_panic_guard() -> PanicOnDrop {
141 PanicOnDrop::new(
142 "tokio_process_tools::ProcessHandle",
143 "The process was not terminated.",
144 "Successfully call `wait_for_completion`, `terminate`, or `kill`, or call `must_not_be_terminated` before the type is dropped!",
145 )
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::test_support::long_running_command;
152 use crate::{
153 BestEffortDelivery, BroadcastOutputStream, DEFAULT_MAX_BUFFERED_CHUNKS,
154 DEFAULT_READ_CHUNK_SIZE, NoReplay,
155 };
156 use assertr::prelude::*;
157
158 fn spawn_long_running_process()
159 -> ProcessHandle<BroadcastOutputStream<BestEffortDelivery, NoReplay>> {
160 crate::Process::new(long_running_command(Duration::from_secs(5)))
161 .name("long-running")
162 .stdout_and_stderr(|stream| {
163 stream
164 .broadcast()
165 .best_effort_delivery()
166 .no_replay()
167 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
168 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
169 })
170 .spawn()
171 .unwrap()
172 }
173
174 #[tokio::test]
175 async fn must_be_terminated_is_idempotent_when_already_armed() {
176 let mut process = spawn_long_running_process();
177
178 process.must_be_terminated();
179 assert_that!(process.is_drop_armed()).is_true();
180
181 process.kill().await.unwrap();
182 }
183
184 #[tokio::test]
185 async fn must_be_terminated_re_arms_safeguards_after_opt_out() {
186 let mut process = spawn_long_running_process();
187
188 process.must_not_be_terminated();
189 assert_that!(process.is_drop_disarmed()).is_true();
190
191 process.must_be_terminated();
192 assert_that!(process.is_drop_armed()).is_true();
193
194 process.kill().await.unwrap();
195 }
196
197 #[cfg(unix)]
198 #[tokio::test]
199 async fn must_not_be_terminated_lets_child_outlive_dropped_handle() {
200 use nix::errno::Errno;
201 use nix::sys::signal::{self, Signal};
202 use nix::sys::wait::waitpid;
203 use nix::unistd::Pid;
204
205 let mut process = spawn_long_running_process();
206 let pid = process.id().unwrap();
207
208 process.must_not_be_terminated();
209 assert_that!(process.is_drop_disarmed()).is_true();
210 drop(process);
211
212 let pid = Pid::from_raw(pid.cast_signed());
213 assert_that!(signal::kill(pid, None).is_ok()).is_true();
214
215 signal::kill(pid, Signal::SIGKILL).unwrap();
216 match waitpid(pid, None) {
217 Ok(_) | Err(Errno::ECHILD) => {}
218 Err(err) => {
219 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
220 }
221 }
222 }
223
224 #[cfg(unix)]
225 #[tokio::test]
226 async fn must_not_be_terminated_still_closes_stdin_on_drop() {
227 use nix::errno::Errno;
228 use nix::sys::wait::waitpid;
229 use nix::unistd::Pid;
230 use std::fs;
231 use tempfile::tempdir;
232
233 let temp_dir = tempdir().unwrap();
234 let output_file = temp_dir.path().join("stdin-result.txt");
235 let output_file = output_file.to_str().unwrap().replace('\'', "'\"'\"'");
236
237 let mut cmd = tokio::process::Command::new("sh");
238 cmd.arg("-c")
239 .arg(format!("cat >/dev/null; printf eof > '{output_file}'"));
240
241 let mut process = crate::Process::new(cmd)
242 .name("sh")
243 .stdout_and_stderr(|stream| {
244 stream
245 .broadcast()
246 .best_effort_delivery()
247 .no_replay()
248 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
249 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
250 })
251 .spawn()
252 .unwrap();
253 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
254
255 process.must_not_be_terminated();
256 drop(process);
257
258 match tokio::time::timeout(
259 Duration::from_secs(2),
260 tokio::task::spawn_blocking(move || waitpid(pid, None)),
261 )
262 .await
263 .unwrap()
264 .unwrap()
265 {
266 Ok(_) | Err(Errno::ECHILD) => {}
267 Err(err) => {
268 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
269 }
270 }
271
272 assert_that!(fs::read_to_string(temp_dir.path().join("stdin-result.txt")).unwrap())
273 .is_equal_to("eof");
274 }
275
276 #[cfg(unix)]
277 #[tokio::test]
278 async fn must_not_be_terminated_still_closes_stdout_pipe_on_drop() {
279 use nix::errno::Errno;
280 use nix::sys::wait::waitpid;
281 use nix::unistd::Pid;
282
283 let mut cmd = tokio::process::Command::new("yes");
284 cmd.arg("tick");
285
286 let mut process = crate::Process::new(cmd)
287 .name("yes")
288 .stdout_and_stderr(|stream| {
289 stream
290 .broadcast()
291 .best_effort_delivery()
292 .no_replay()
293 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
294 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
295 })
296 .spawn()
297 .unwrap();
298 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
299
300 process.must_not_be_terminated();
301 drop(process);
302
303 match tokio::time::timeout(
304 Duration::from_secs(2),
305 tokio::task::spawn_blocking(move || waitpid(pid, None)),
306 )
307 .await
308 .unwrap()
309 .unwrap()
310 {
311 Ok(_) | Err(Errno::ECHILD) => {}
312 Err(err) => {
313 assert_that!(err).fail(format_args!("waitpid failed: {err}"));
314 }
315 }
316 }
317}