tokio_process_tools/
process_handle.rs

1use crate::error::{SpawnError, TerminationError, WaitError};
2use crate::output::Output;
3use crate::output_stream::broadcast::BroadcastOutputStream;
4use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::{BackpressureControl, FromStreamOptions};
6use crate::panic_on_drop::PanicOnDrop;
7use crate::terminate_on_drop::TerminateOnDrop;
8use crate::{LineParsingOptions, NumBytes, OutputStream, signal};
9use std::borrow::Cow;
10use std::fmt::Debug;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13use std::time::Duration;
14use tokio::process::Child;
15
16const STDOUT_STREAM_NAME: &str = "stdout";
17const STDERR_STREAM_NAME: &str = "stderr";
18
19/// Maximum time to wait for process termination after sending SIGKILL.
20///
21/// This is a safety timeout since SIGKILL should terminate processes immediately,
22/// but there are rare cases where even SIGKILL may not work.
23const SIGKILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
24
25/// Represents the running state of a process.
26#[derive(Debug)]
27pub enum RunningState {
28    /// The process is still running.
29    Running,
30
31    /// The process has terminated with the given exit status.
32    Terminated(ExitStatus),
33
34    /// Failed to determine process state.
35    Uncertain(io::Error),
36}
37
38impl RunningState {
39    /// Returns `true` if the process is running, `false` otherwise.
40    pub fn as_bool(&self) -> bool {
41        match self {
42            RunningState::Running => true,
43            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
44        }
45    }
46}
47
48impl From<RunningState> for bool {
49    fn from(is_running: RunningState) -> Self {
50        match is_running {
51            RunningState::Running => true,
52            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
53        }
54    }
55}
56
57/// A handle to a spawned process with captured stdout/stderr streams.
58///
59/// This type provides methods for waiting on process completion, terminating the process,
60/// and accessing its output streams. By default, processes must be explicitly waited on
61/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
62///
63/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
64/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
65#[derive(Debug)]
66pub struct ProcessHandle<O: OutputStream> {
67    pub(crate) name: Cow<'static, str>,
68    child: Child,
69    std_out_stream: O,
70    std_err_stream: O,
71    panic_on_drop: Option<PanicOnDrop>,
72}
73
74impl ProcessHandle<BroadcastOutputStream> {
75    /// Spawns a new process with broadcast output streams and custom channel capacities.
76    ///
77    /// This method is intended for internal use by the `Process` builder.
78    /// Users should use `Process::new(cmd).spawn_broadcast()` instead.
79    pub(crate) fn spawn_with_capacity(
80        name: impl Into<Cow<'static, str>>,
81        mut cmd: tokio::process::Command,
82        stdout_chunk_size: NumBytes,
83        stderr_chunk_size: NumBytes,
84        stdout_channel_capacity: usize,
85        stderr_channel_capacity: usize,
86    ) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
87        let process_name = name.into();
88        Self::prepare_command(&mut cmd)
89            .spawn()
90            .map(|child| {
91                Self::new_from_child_with_piped_io_and_capacity(
92                    process_name.clone(),
93                    child,
94                    stdout_chunk_size,
95                    stderr_chunk_size,
96                    stdout_channel_capacity,
97                    stderr_channel_capacity,
98                )
99            })
100            .map_err(|source| SpawnError::SpawnFailed {
101                process_name,
102                source,
103            })
104    }
105
106    fn new_from_child_with_piped_io_and_capacity(
107        name: impl Into<Cow<'static, str>>,
108        mut child: Child,
109        stdout_chunk_size: NumBytes,
110        stderr_chunk_size: NumBytes,
111        stdout_channel_capacity: usize,
112        stderr_channel_capacity: usize,
113    ) -> ProcessHandle<BroadcastOutputStream> {
114        let stdout = child
115            .stdout
116            .take()
117            .expect("Child process stdout wasn't captured");
118        let stderr = child
119            .stderr
120            .take()
121            .expect("Child process stderr wasn't captured");
122
123        let (child, std_out_stream, std_err_stream) = (
124            child,
125            BroadcastOutputStream::from_stream(
126                stdout,
127                "stdout",
128                FromStreamOptions {
129                    chunk_size: stdout_chunk_size,
130                    channel_capacity: stdout_channel_capacity,
131                },
132            ),
133            BroadcastOutputStream::from_stream(
134                stderr,
135                STDERR_STREAM_NAME,
136                FromStreamOptions {
137                    chunk_size: stderr_chunk_size,
138                    channel_capacity: stderr_channel_capacity,
139                },
140            ),
141        );
142
143        let mut this = ProcessHandle {
144            name: name.into(),
145            child,
146            std_out_stream,
147            std_err_stream,
148            panic_on_drop: None,
149        };
150        this.must_be_terminated();
151        this
152    }
153
154    /// Convenience function, waiting for the process to complete using
155    /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
156    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
157    ///
158    /// You may want to destructure this using:
159    /// ```no_run
160    /// # use tokio::process::Command;
161    /// # use tokio_process_tools::*;
162    /// # tokio_test::block_on(async {
163    /// # let mut proc = Process::new(Command::new("ls")).spawn_broadcast().unwrap();
164    /// let Output {
165    ///     status,
166    ///     stdout,
167    ///     stderr
168    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
169    /// # });
170    /// ```
171    pub async fn wait_for_completion_with_output(
172        &mut self,
173        timeout: Option<Duration>,
174        options: LineParsingOptions,
175    ) -> Result<Output, WaitError> {
176        let out_collector = self.stdout().collect_lines_into_vec(options);
177        let err_collector = self.stderr().collect_lines_into_vec(options);
178
179        let status = self.wait_for_completion(timeout).await?;
180
181        let stdout = out_collector.wait().await?;
182        let stderr = err_collector.wait().await?;
183
184        Ok(Output {
185            status,
186            stdout,
187            stderr,
188        })
189    }
190
191    /// Convenience function, waiting for the process to complete using
192    /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
193    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
194    pub async fn wait_for_completion_with_output_or_terminate(
195        &mut self,
196        wait_timeout: Duration,
197        interrupt_timeout: Duration,
198        terminate_timeout: Duration,
199        options: LineParsingOptions,
200    ) -> Result<Output, WaitError> {
201        let out_collector = self.stdout().collect_lines_into_vec(options);
202        let err_collector = self.stderr().collect_lines_into_vec(options);
203
204        let status = self
205            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
206            .await?;
207
208        let stdout = out_collector.wait().await?;
209        let stderr = err_collector.wait().await?;
210
211        Ok(Output {
212            status,
213            stdout,
214            stderr,
215        })
216    }
217}
218
219impl ProcessHandle<SingleSubscriberOutputStream> {
220    /// Spawns a new process with single subscriber output streams and custom channel capacities.
221    ///
222    /// This method is intended for internal use by the `Process` builder.
223    /// Users should use `Process::new(cmd).spawn_single_subscriber()` instead.
224    pub(crate) fn spawn_with_capacity(
225        name: impl Into<Cow<'static, str>>,
226        mut cmd: tokio::process::Command,
227        stdout_chunk_size: NumBytes,
228        stderr_chunk_size: NumBytes,
229        stdout_channel_capacity: usize,
230        stderr_channel_capacity: usize,
231    ) -> Result<Self, SpawnError> {
232        let process_name = name.into();
233        Self::prepare_command(&mut cmd)
234            .spawn()
235            .map(|child| {
236                Self::new_from_child_with_piped_io_and_capacity(
237                    process_name.clone(),
238                    child,
239                    stdout_chunk_size,
240                    stderr_chunk_size,
241                    stdout_channel_capacity,
242                    stderr_channel_capacity,
243                )
244            })
245            .map_err(|source| SpawnError::SpawnFailed {
246                process_name,
247                source,
248            })
249    }
250
251    fn new_from_child_with_piped_io_and_capacity(
252        name: impl Into<Cow<'static, str>>,
253        mut child: Child,
254        stdout_chunk_size: NumBytes,
255        stderr_chunk_size: NumBytes,
256        stdout_channel_capacity: usize,
257        stderr_channel_capacity: usize,
258    ) -> Self {
259        let stdout = child
260            .stdout
261            .take()
262            .expect("Child process stdout wasn't captured");
263        let stderr = child
264            .stderr
265            .take()
266            .expect("Child process stderr wasn't captured");
267
268        let (child, std_out_stream, std_err_stream) = (
269            child,
270            SingleSubscriberOutputStream::from_stream(
271                stdout,
272                STDOUT_STREAM_NAME,
273                BackpressureControl::DropLatestIncomingIfBufferFull,
274                FromStreamOptions {
275                    chunk_size: stdout_chunk_size,
276                    channel_capacity: stdout_channel_capacity,
277                },
278            ),
279            SingleSubscriberOutputStream::from_stream(
280                stderr,
281                STDERR_STREAM_NAME,
282                BackpressureControl::DropLatestIncomingIfBufferFull,
283                FromStreamOptions {
284                    chunk_size: stderr_chunk_size,
285                    channel_capacity: stderr_channel_capacity,
286                },
287            ),
288        );
289
290        let mut this = ProcessHandle {
291            name: name.into(),
292            child,
293            std_out_stream,
294            std_err_stream,
295            panic_on_drop: None,
296        };
297        this.must_be_terminated();
298        this
299    }
300
301    /// Convenience function, waiting for the process to complete using
302    /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
303    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
304    ///
305    /// You may want to destructure this using:
306    /// ```no_run
307    /// # use tokio_process_tools::*;
308    /// # tokio_test::block_on(async {
309    /// # let mut proc = Process::new(tokio::process::Command::new("ls")).spawn_broadcast().unwrap();
310    /// let Output {
311    ///     status,
312    ///     stdout,
313    ///     stderr
314    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
315    /// # });
316    /// ```
317    pub async fn wait_for_completion_with_output(
318        &mut self,
319        timeout: Option<Duration>,
320        options: LineParsingOptions,
321    ) -> Result<Output, WaitError> {
322        let out_collector = self.stdout().collect_lines_into_vec(options);
323        let err_collector = self.stderr().collect_lines_into_vec(options);
324
325        let status = self.wait_for_completion(timeout).await?;
326
327        let stdout = out_collector.wait().await?;
328        let stderr = err_collector.wait().await?;
329
330        Ok(Output {
331            status,
332            stdout,
333            stderr,
334        })
335    }
336
337    /// Convenience function, waiting for the process to complete using
338    /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
339    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
340    pub async fn wait_for_completion_with_output_or_terminate(
341        &mut self,
342        wait_timeout: Duration,
343        interrupt_timeout: Duration,
344        terminate_timeout: Duration,
345        options: LineParsingOptions,
346    ) -> Result<Output, WaitError> {
347        let out_collector = self.stdout().collect_lines_into_vec(options);
348        let err_collector = self.stderr().collect_lines_into_vec(options);
349
350        let status = self
351            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
352            .await?;
353
354        let stdout = out_collector.wait().await?;
355        let stderr = err_collector.wait().await?;
356
357        Ok(Output {
358            status,
359            stdout,
360            stderr,
361        })
362    }
363}
364
365impl<O: OutputStream> ProcessHandle<O> {
366    /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
367    /// which works more like `killpg`. Sending to the current process ID will likely trigger
368    /// undefined behavior of sending the event to every process that's attached to the console,
369    /// i.e. sending the event to group ID 0. Therefore, we create a new process group
370    /// for the child process we are about to spawn.
371    ///
372    /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
373    fn prepare_platform_specifics(
374        command: &mut tokio::process::Command,
375    ) -> &mut tokio::process::Command {
376        #[cfg(windows)]
377        {
378            use windows_sys::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
379            command.creation_flags(CREATE_NEW_PROCESS_GROUP)
380        }
381        #[cfg(not(windows))]
382        {
383            command
384        }
385    }
386
387    fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
388        Self::prepare_platform_specifics(command)
389            .stdout(Stdio::piped())
390            .stderr(Stdio::piped())
391            // It is much too easy to leave dangling resources here and there.
392            // This library tries to make it clear and encourage users to terminate spawned
393            // processes appropriately. If not done so anyway, this acts as a "last resort"
394            // type of solution, less graceful as the `terminate_on_drop` effect but at least
395            // capable of cleaning up.
396            .kill_on_drop(true)
397    }
398
399    /// Returns the OS process ID if the process hasn't exited yet.
400    ///
401    /// Once this process has been polled to completion this will return None.
402    pub fn id(&self) -> Option<u32> {
403        self.child.id()
404    }
405
406    /// Checks if the process is currently running.
407    ///
408    /// Returns [`RunningState::Running`] if the process is still running,
409    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
410    /// if the state could not be determined.
411    //noinspection RsSelfConvention
412    pub fn is_running(&mut self) -> RunningState {
413        match self.child.try_wait() {
414            Ok(None) => RunningState::Running,
415            Ok(Some(exit_status)) => {
416                self.must_not_be_terminated();
417                RunningState::Terminated(exit_status)
418            }
419            Err(err) => RunningState::Uncertain(err),
420        }
421    }
422
423    /// Returns a reference to the stdout stream.
424    ///
425    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
426    /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
427    /// attempts will panic with a helpful error message).
428    pub fn stdout(&self) -> &O {
429        &self.std_out_stream
430    }
431
432    /// Returns a reference to the stderr stream.
433    ///
434    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
435    /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
436    /// attempts will panic with a helpful error message).
437    pub fn stderr(&self) -> &O {
438        &self.std_err_stream
439    }
440
441    /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
442    ///
443    /// This method enables a safeguard that ensures that the process represented by this
444    /// `ProcessHandle` is properly terminated or awaited before being dropped.
445    /// If `must_be_terminated` is set and the `ProcessHandle` is
446    /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
447    /// prevent silent failure-states, ensuring that system resources are handled correctly.
448    ///
449    /// You typically do not need to call this, as every ProcessHandle is marked by default.
450    /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
451    /// process without terminating it.
452    ///
453    /// # Panic
454    ///
455    /// If the `ProcessHandle` is dropped without being awaited or terminated
456    /// after calling this method, a panic will occur with a descriptive message
457    /// to inform about the incorrect usage.
458    pub fn must_be_terminated(&mut self) {
459        self.panic_on_drop = Some(PanicOnDrop::new(
460            "tokio_process_tools::ProcessHandle",
461            "The process was not terminated.",
462            "Call `wait_for_completion` or `terminate` before the type is dropped!",
463        ));
464    }
465
466    /// Disables the panic-on-drop safeguard, allowing the spawned process to be kept running
467    /// uncontrolled in the background, while this handle can safely be dropped.
468    pub fn must_not_be_terminated(&mut self) {
469        if let Some(mut it) = self.panic_on_drop.take() {
470            it.defuse()
471        }
472    }
473
474    /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
475    /// automatically when this handle is dropped.
476    ///
477    /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
478    ///
479    /// Prefer manual termination of the process or awaiting it and relying on the (automatically
480    /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
481    /// nor terminated before being dropped.
482    pub fn terminate_on_drop(
483        self,
484        graceful_termination_timeout: Duration,
485        forceful_termination_timeout: Duration,
486    ) -> TerminateOnDrop<O> {
487        TerminateOnDrop {
488            process_handle: self,
489            interrupt_timeout: graceful_termination_timeout,
490            terminate_timeout: forceful_termination_timeout,
491        }
492    }
493
494    /// Manually send a `SIGINT` on unix or equivalent on Windows to this process.
495    ///
496    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
497    pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
498        signal::send_interrupt(&self.child)
499    }
500
501    /// Manually send a `SIGTERM` on unix or equivalent on Windows to this process.
502    ///
503    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
504    pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
505        signal::send_terminate(&self.child)
506    }
507
508    /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
509    /// doesn't run to completion after receiving any of the first two signals.
510    ///
511    /// This handle can be dropped safely after this call returned, no matter the outcome.
512    /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
513    pub async fn terminate(
514        &mut self,
515        interrupt_timeout: Duration,
516        terminate_timeout: Duration,
517    ) -> Result<ExitStatus, TerminationError> {
518        // Whether or not this function will ultimately succeed, we tried our best to terminate
519        // this process.
520        // Dropping this handle should not create any on-drop panic anymore.
521        // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
522        self.must_not_be_terminated();
523
524        self.send_interrupt_signal()
525            .map_err(|err| TerminationError::SignallingFailed {
526                process_name: self.name.clone(),
527                source: err,
528                signal: "SIGINT",
529            })?;
530
531        match self.wait_for_completion(Some(interrupt_timeout)).await {
532            Ok(exit_status) => Ok(exit_status),
533            Err(not_terminated_after_sigint) => {
534                tracing::warn!(
535                    process = %self.name,
536                    error = %not_terminated_after_sigint,
537                    "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
538                );
539
540                self.send_terminate_signal()
541                    .map_err(|err| TerminationError::SignallingFailed {
542                        process_name: self.name.clone(),
543                        source: err,
544                        signal: "SIGTERM",
545                    })?;
546
547                match self.wait_for_completion(Some(terminate_timeout)).await {
548                    Ok(exit_status) => Ok(exit_status),
549                    Err(not_terminated_after_sigterm) => {
550                        tracing::warn!(
551                            process = %self.name,
552                            error = %not_terminated_after_sigterm,
553                            "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
554                        );
555
556                        match self.kill().await {
557                            Ok(()) => {
558                                // Note: A SIGKILL should typically (somewhat) immediately lead to
559                                // termination of the process. But there are cases in which even
560                                // a SIGKILL does not / cannot / will not kill a process.
561                                // Something must have gone horribly wrong then...
562                                // But: We do not want to wait indefinitely in case this happens
563                                // and therefore wait (at max) for a fixed duration after any
564                                // SIGKILL event.
565                                match self.wait_for_completion(Some(SIGKILL_WAIT_TIMEOUT)).await {
566                                    Ok(exit_status) => Ok(exit_status),
567                                    Err(not_terminated_after_sigkill) => {
568                                        // Unlikely. See the note above.
569                                        tracing::error!(
570                                            "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
571                                            self.name
572                                        );
573                                        Err(TerminationError::TerminationFailed {
574                                            process_name: self.name.clone(),
575                                            sigint_error: not_terminated_after_sigint.to_string(),
576                                            sigterm_error: not_terminated_after_sigterm.to_string(),
577                                            sigkill_error: io::Error::new(
578                                                io::ErrorKind::TimedOut,
579                                                not_terminated_after_sigkill.to_string(),
580                                            ),
581                                        })
582                                    }
583                                }
584                            }
585                            Err(kill_error) => {
586                                tracing::error!(
587                                    process = %self.name,
588                                    error = %kill_error,
589                                    "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
590                                );
591
592                                Err(TerminationError::TerminationFailed {
593                                    process_name: self.name.clone(),
594                                    sigint_error: not_terminated_after_sigint.to_string(),
595                                    sigterm_error: not_terminated_after_sigterm.to_string(),
596                                    sigkill_error: kill_error,
597                                })
598                            }
599                        }
600                    }
601                }
602            }
603        }
604    }
605
606    /// Forces the process to exit. Most users should call [ProcessHandle::terminate] instead.
607    ///
608    /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
609    pub async fn kill(&mut self) -> io::Result<()> {
610        self.child.kill().await
611    }
612
613    /// Successfully awaiting the completion of the process will unset the
614    /// "must be terminated" setting, as a successfully awaited process is already terminated.
615    /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
616    /// "must be terminated" panic being raised.
617    async fn wait(&mut self) -> io::Result<ExitStatus> {
618        match self.child.wait().await {
619            Ok(status) => {
620                self.must_not_be_terminated();
621                Ok(status)
622            }
623            Err(err) => Err(err),
624        }
625    }
626
627    /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
628    ///
629    /// If the timeout is reached before the process terminated, an error is returned but the
630    /// process remains untouched / keeps running.
631    /// Use [ProcessHandle::wait_for_completion_or_terminate] if you want immediate termination.
632    ///
633    /// This does not provide the processes output. You can take a look at the convenience function
634    /// [ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output] to see
635    /// how the [ProcessHandle::stdout] and [ProcessHandle::stderr] streams (also available in
636    /// *_mut variants) can be used to inspect / watch over / capture the processes output.
637    pub async fn wait_for_completion(
638        &mut self,
639        timeout: Option<Duration>,
640    ) -> Result<ExitStatus, WaitError> {
641        match timeout {
642            None => self.wait().await.map_err(|source| WaitError::IoError {
643                process_name: self.name.clone(),
644                source,
645            }),
646            Some(timeout_duration) => {
647                match tokio::time::timeout(timeout_duration, self.wait()).await {
648                    Ok(Ok(exit_status)) => Ok(exit_status),
649                    Ok(Err(source)) => Err(WaitError::IoError {
650                        process_name: self.name.clone(),
651                        source,
652                    }),
653                    Err(_elapsed) => Err(WaitError::Timeout {
654                        process_name: self.name.clone(),
655                        timeout: timeout_duration,
656                    }),
657                }
658            }
659        }
660    }
661
662    /// Wait for this process to run to completion within `timeout`.
663    ///
664    /// If the timeout is reached before the process terminated normally, external termination of
665    /// the process is forced through [ProcessHandle::terminate].
666    ///
667    /// Note that this function may return `Ok` even though the timeout was reached, carrying the
668    /// exit status received after sending a termination signal!
669    pub async fn wait_for_completion_or_terminate(
670        &mut self,
671        wait_timeout: Duration,
672        interrupt_timeout: Duration,
673        terminate_timeout: Duration,
674    ) -> Result<ExitStatus, TerminationError> {
675        match self.wait_for_completion(Some(wait_timeout)).await {
676            Ok(exit_status) => Ok(exit_status),
677            Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
678        }
679    }
680
681    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
682    /// stdout and stderr output streams.
683    pub fn into_inner(self) -> (Child, O, O) {
684        (self.child, self.std_out_stream, self.std_err_stream)
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691    use assertr::prelude::*;
692
693    #[tokio::test]
694    async fn test_termination() {
695        let mut cmd = tokio::process::Command::new("sleep");
696        cmd.arg("5");
697
698        let started_at = jiff::Zoned::now();
699        let mut handle = crate::Process::new(cmd)
700            .name("sleep")
701            .spawn_broadcast()
702            .unwrap();
703        tokio::time::sleep(Duration::from_millis(100)).await;
704        let exit_status = handle
705            .terminate(Duration::from_secs(1), Duration::from_secs(1))
706            .await
707            .unwrap();
708        let terminated_at = jiff::Zoned::now();
709
710        // We terminate after roughly 100 ms of waiting.
711        // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
712        // We can increase this if the test should turn out to be flaky.
713        let ran_for = started_at.duration_until(&terminated_at);
714        assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
715
716        // When terminated, we do not get an exit code (unix).
717        assert_that(exit_status.code()).is_none();
718    }
719}