tokio_process_tools/
process_handle.rs

1use crate::output::Output;
2use crate::output_stream::broadcast::BroadcastOutputStream;
3use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
4use crate::output_stream::{BackpressureControl, FromStreamOptions};
5use crate::panic_on_drop::PanicOnDrop;
6use crate::terminate_on_drop::TerminateOnDrop;
7use crate::{CollectorError, LineParsingOptions, OutputStream, signal};
8use std::borrow::Cow;
9use std::fmt::Debug;
10use std::io;
11use std::process::{ExitStatus, Stdio};
12use std::time::Duration;
13use thiserror::Error;
14use tokio::process::Child;
15
16/// Errors that can occur when terminating a process.
17#[derive(Debug, Error)]
18pub enum TerminationError {
19    /// Failed to send a signal to the process.
20    #[error("Failed to send '{signal}' signal to process: {source}")]
21    SignallingFailed {
22        /// The underlying IO error.
23        source: io::Error,
24        /// The signal that could not be sent.
25        signal: &'static str,
26    },
27
28    /// Failed to terminate the process after trying all signals (SIGINT, SIGTERM, SIGKILL).
29    #[error(
30        "Failed to terminate process. Graceful SIGINT termination failure: {not_terminated_after_sigint}. Graceful SIGTERM termination failure: {not_terminated_after_sigterm}. Forceful termination failure: {not_terminated_after_sigkill}"
31    )]
32    TerminationFailed {
33        /// Error from waiting after sending SIGINT.
34        not_terminated_after_sigint: io::Error,
35        /// Error from waiting after sending SIGTERM.
36        not_terminated_after_sigterm: io::Error,
37        /// Error from waiting after sending SIGKILL.
38        not_terminated_after_sigkill: io::Error,
39    },
40}
41
42/// Represents the running state of a process.
43#[derive(Debug)]
44pub enum RunningState {
45    /// The process is still running.
46    Running,
47
48    /// The process has terminated with the given exit status.
49    Terminated(ExitStatus),
50
51    /// Failed to determine process state.
52    Uncertain(io::Error),
53}
54
55impl RunningState {
56    /// Returns `true` if the process is running, `false` otherwise.
57    pub fn as_bool(&self) -> bool {
58        match self {
59            RunningState::Running => true,
60            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
61        }
62    }
63}
64
65impl From<RunningState> for bool {
66    fn from(is_running: RunningState) -> Self {
67        match is_running {
68            RunningState::Running => true,
69            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
70        }
71    }
72}
73
74/// Errors that can occur when waiting for process output.
75#[derive(Debug, Error)]
76pub enum WaitError {
77    /// A general IO error occurred.
78    #[error("A general io error occurred")]
79    IoError(#[from] io::Error),
80
81    /// Could not terminate the process.
82    #[error("Could not terminate the process")]
83    TerminationError(#[from] TerminationError),
84
85    /// Collector failed to collect output.
86    #[error("Collector failed to collect output")]
87    CollectorFailed(#[from] CollectorError),
88}
89
90/// A handle to a spawned process with captured stdout/stderr streams.
91///
92/// This type provides methods for waiting on process completion, terminating the process,
93/// and accessing its output streams. By default, processes must be explicitly waited on
94/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
95///
96/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
97/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
98#[derive(Debug)]
99pub struct ProcessHandle<O: OutputStream> {
100    pub(crate) name: Cow<'static, str>,
101    child: Child,
102    std_out_stream: O,
103    std_err_stream: O,
104    panic_on_drop: Option<PanicOnDrop>,
105}
106
107impl ProcessHandle<BroadcastOutputStream> {
108    /// Spawns a new process with broadcast output streams.
109    ///
110    /// Uses a default channel capacity of 128 for both stdout and stderr.
111    pub fn spawn(
112        name: impl Into<Cow<'static, str>>,
113        cmd: tokio::process::Command,
114    ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
115        Self::spawn_with_capacity(name, cmd, 128, 128)
116    }
117
118    /// Spawns a new process with broadcast output streams and custom channel capacities.
119    pub fn spawn_with_capacity(
120        name: impl Into<Cow<'static, str>>,
121        mut cmd: tokio::process::Command,
122        stdout_channel_capacity: usize,
123        stderr_channel_capacity: usize,
124    ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
125        Self::prepare_command(&mut cmd).spawn().map(|child| {
126            Self::new_from_child_with_piped_io_and_capacity(
127                name,
128                child,
129                stdout_channel_capacity,
130                stderr_channel_capacity,
131            )
132        })
133    }
134
135    fn new_from_child_with_piped_io_and_capacity(
136        name: impl Into<Cow<'static, str>>,
137        mut child: Child,
138        stdout_channel_capacity: usize,
139        stderr_channel_capacity: usize,
140    ) -> ProcessHandle<BroadcastOutputStream> {
141        let stdout = child
142            .stdout
143            .take()
144            .expect("Child process stdout wasn't captured");
145        let stderr = child
146            .stderr
147            .take()
148            .expect("Child process stderr wasn't captured");
149
150        let (child, std_out_stream, std_err_stream) = (
151            child,
152            BroadcastOutputStream::from_stream(
153                stdout,
154                FromStreamOptions {
155                    channel_capacity: stdout_channel_capacity,
156                    ..Default::default()
157                },
158            ),
159            BroadcastOutputStream::from_stream(
160                stderr,
161                FromStreamOptions {
162                    channel_capacity: stderr_channel_capacity,
163                    ..Default::default()
164                },
165            ),
166        );
167
168        let mut this = ProcessHandle {
169            name: name.into(),
170            child,
171            std_out_stream,
172            std_err_stream,
173            panic_on_drop: None,
174        };
175        this.must_be_terminated();
176        this
177    }
178
179    /// Convenience function, waiting for the process to complete using
180    /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
181    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
182    ///
183    /// You may want to destructure this using:
184    /// ```no_run
185    /// # use tokio_process_tools::*;
186    /// # use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
187    /// # tokio_test::block_on(async {
188    /// # let mut proc = ProcessHandle::<SingleSubscriberOutputStream>::spawn("cmd", tokio::process::Command::new("ls")).unwrap();
189    /// let Output {
190    ///     status,
191    ///     stdout,
192    ///     stderr
193    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
194    /// # });
195    /// ```
196    pub async fn wait_for_completion_with_output(
197        &mut self,
198        timeout: Option<Duration>,
199        options: LineParsingOptions,
200    ) -> Result<Output, WaitError> {
201        let out_collector = self.stdout().collect_lines_into_vec(options);
202        let err_collector = self.stderr().collect_lines_into_vec(options);
203
204        let status = self.wait_for_completion(timeout).await?;
205
206        let stdout = out_collector.wait().await?;
207        let stderr = err_collector.wait().await?;
208
209        Ok(Output {
210            status,
211            stdout,
212            stderr,
213        })
214    }
215
216    /// Convenience function, waiting for the process to complete using
217    /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
218    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
219    pub async fn wait_for_completion_with_output_or_terminate(
220        &mut self,
221        wait_timeout: Duration,
222        interrupt_timeout: Duration,
223        terminate_timeout: Duration,
224        options: LineParsingOptions,
225    ) -> Result<Output, WaitError> {
226        let out_collector = self.stdout().collect_lines_into_vec(options);
227        let err_collector = self.stderr().collect_lines_into_vec(options);
228
229        let status = self
230            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
231            .await?;
232
233        let stdout = out_collector.wait().await?;
234        let stderr = err_collector.wait().await?;
235
236        Ok(Output {
237            status,
238            stdout,
239            stderr,
240        })
241    }
242}
243
244impl ProcessHandle<SingleSubscriberOutputStream> {
245    /// Spawns a new process with single subscriber output streams.
246    ///
247    /// Uses a default channel capacity of 128 for both stdout and stderr.
248    pub fn spawn(
249        name: impl Into<Cow<'static, str>>,
250        cmd: tokio::process::Command,
251    ) -> io::Result<Self> {
252        Self::spawn_with_capacity(name, cmd, 128, 128)
253    }
254
255    /// Spawns a new process with single subscriber output streams and custom channel capacities.
256    pub fn spawn_with_capacity(
257        name: impl Into<Cow<'static, str>>,
258        mut cmd: tokio::process::Command,
259        stdout_channel_capacity: usize,
260        stderr_channel_capacity: usize,
261    ) -> io::Result<Self> {
262        Self::prepare_command(&mut cmd).spawn().map(|child| {
263            Self::new_from_child_with_piped_io_and_capacity(
264                name,
265                child,
266                stdout_channel_capacity,
267                stderr_channel_capacity,
268            )
269        })
270    }
271
272    fn new_from_child_with_piped_io_and_capacity(
273        name: impl Into<Cow<'static, str>>,
274        mut child: Child,
275        stdout_channel_capacity: usize,
276        stderr_channel_capacity: usize,
277    ) -> Self {
278        let stdout = child
279            .stdout
280            .take()
281            .expect("Child process stdout wasn't captured");
282        let stderr = child
283            .stderr
284            .take()
285            .expect("Child process stderr wasn't captured");
286
287        let (child, std_out_stream, std_err_stream) = (
288            child,
289            SingleSubscriberOutputStream::from_stream(
290                stdout,
291                BackpressureControl::DropLatestIncomingIfBufferFull,
292                FromStreamOptions {
293                    channel_capacity: stdout_channel_capacity,
294                    ..Default::default()
295                },
296            ),
297            SingleSubscriberOutputStream::from_stream(
298                stderr,
299                BackpressureControl::DropLatestIncomingIfBufferFull,
300                FromStreamOptions {
301                    channel_capacity: stderr_channel_capacity,
302                    ..Default::default()
303                },
304            ),
305        );
306
307        let mut this = ProcessHandle {
308            name: name.into(),
309            child,
310            std_out_stream,
311            std_err_stream,
312            panic_on_drop: None,
313        };
314        this.must_be_terminated();
315        this
316    }
317
318    /// Convenience function, waiting for the process to complete using
319    /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
320    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
321    ///
322    /// You may want to destructure this using:
323    /// ```no_run
324    /// # use tokio_process_tools::*;
325    /// # use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
326    /// # tokio_test::block_on(async {
327    /// # let mut proc = ProcessHandle::<SingleSubscriberOutputStream>::spawn("cmd", tokio::process::Command::new("ls")).unwrap();
328    /// let Output {
329    ///     status,
330    ///     stdout,
331    ///     stderr
332    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
333    /// # });
334    /// ```
335    pub async fn wait_for_completion_with_output(
336        &mut self,
337        timeout: Option<Duration>,
338        options: LineParsingOptions,
339    ) -> Result<Output, WaitError> {
340        let out_collector = self.stdout_mut().collect_lines_into_vec(options);
341        let err_collector = self.stderr_mut().collect_lines_into_vec(options);
342
343        let status = self.wait_for_completion(timeout).await?;
344
345        let stdout = out_collector.wait().await?;
346        let stderr = err_collector.wait().await?;
347
348        Ok(Output {
349            status,
350            stdout,
351            stderr,
352        })
353    }
354
355    /// Convenience function, waiting for the process to complete using
356    /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
357    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
358    pub async fn wait_for_completion_with_output_or_terminate(
359        &mut self,
360        wait_timeout: Duration,
361        interrupt_timeout: Duration,
362        terminate_timeout: Duration,
363        options: LineParsingOptions,
364    ) -> Result<Output, WaitError> {
365        let out_collector = self.stdout_mut().collect_lines_into_vec(options);
366        let err_collector = self.stderr_mut().collect_lines_into_vec(options);
367
368        let status = self
369            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
370            .await?;
371
372        let stdout = out_collector.wait().await?;
373        let stderr = err_collector.wait().await?;
374
375        Ok(Output {
376            status,
377            stdout,
378            stderr,
379        })
380    }
381}
382
383impl<O: OutputStream> ProcessHandle<O> {
384    /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
385    /// which works more like `killpg`. Sending to the current process ID will likely trigger
386    /// undefined behavior of sending the event to every process that's attached to the console,
387    /// i.e. sending the event to group ID 0. Therefore, we need to create a new process group
388    /// for the child process we are about to spawn.
389    ///
390    /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
391    fn prepare_platform_specifics(
392        command: &mut tokio::process::Command,
393    ) -> &mut tokio::process::Command {
394        #[cfg(windows)]
395        {
396            use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
397
398            let flag = if self.graceful_exit {
399                CREATE_NEW_PROCESS_GROUP.0
400            } else {
401                0
402            };
403            command.creation_flags(flag)
404        }
405        #[cfg(not(windows))]
406        {
407            command
408        }
409    }
410
411    fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
412        Self::prepare_platform_specifics(command)
413            .stdout(Stdio::piped())
414            .stderr(Stdio::piped())
415            // It is much too easy to leave dangling resources here and there.
416            // This library tries to make it clear and encourage users to terminate spawned
417            // processes appropriately. If not done so anyway, this acts as a "last resort"
418            // type of solution, less graceful as the `terminate_on_drop` effect but at least
419            // capable of cleaning up.
420            .kill_on_drop(true)
421    }
422
423    /// Returns the OS process ID if the process hasn't exited yet.
424    ///
425    /// Once this process has been polled to completion this will return None.
426    pub fn id(&self) -> Option<u32> {
427        self.child.id()
428    }
429
430    /// Checks if the process is currently running.
431    ///
432    /// Returns [`RunningState::Running`] if the process is still running,
433    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
434    /// if the state could not be determined.
435    //noinspection RsSelfConvention
436    pub fn is_running(&mut self) -> RunningState {
437        match self.child.try_wait() {
438            Ok(None) => RunningState::Running,
439            Ok(Some(exit_status)) => {
440                self.must_not_be_terminated();
441                RunningState::Terminated(exit_status)
442            }
443            Err(err) => RunningState::Uncertain(err),
444        }
445    }
446
447    /// Returns a reference to the stdout stream.
448    pub fn stdout(&self) -> &O {
449        &self.std_out_stream
450    }
451
452    /// Returns a mutable reference to the stdout stream.
453    pub fn stdout_mut(&mut self) -> &mut O {
454        &mut self.std_out_stream
455    }
456
457    /// Returns a reference to the stderr stream.
458    pub fn stderr(&self) -> &O {
459        &self.std_err_stream
460    }
461
462    /// Returns a mutable reference to the stderr stream.
463    pub fn stderr_mut(&mut self) -> &mut O {
464        &mut self.std_err_stream
465    }
466
467    /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
468    ///
469    /// This method enables a safeguard that ensures that the process represented by this
470    /// `ProcessHandle` is properly terminated or awaited before being dropped.
471    /// If `must_be_terminated` is set and the `ProcessHandle` is
472    /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
473    /// prevent silent failure-states, ensuring that system resources are handled correctly.
474    ///
475    /// You typically do not need to call this, as every ProcessHandle is marked by default.
476    /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
477    /// process without terminating it.
478    ///
479    /// # Panic
480    ///
481    /// If the `ProcessHandle` is dropped without being awaited or terminated
482    /// after calling this method, a panic will occur with a descriptive message
483    /// to inform about the incorrect usage.
484    pub fn must_be_terminated(&mut self) {
485        self.panic_on_drop = Some(PanicOnDrop::new(
486            "tokio_process_tools::ProcessHandle",
487            "The process was not terminated.",
488            "Call `wait_for_completion` or `terminate` before the type is dropped!",
489        ));
490    }
491
492    /// Disables the panic-on-drop safeguard, allowing the spawned process to be kept running
493    /// uncontrolled in the background, while this handle can safely be dropped.
494    pub fn must_not_be_terminated(&mut self) {
495        if let Some(mut it) = self.panic_on_drop.take() {
496            it.defuse()
497        }
498    }
499
500    /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
501    /// automatically when this handle is dropped.
502    ///
503    /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
504    ///
505    /// Prefer manual termination of the process or awaiting it and relying on the (automatically
506    /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
507    /// nor terminated before being dropped.
508    pub fn terminate_on_drop(
509        self,
510        graceful_termination_timeout: Duration,
511        forceful_termination_timeout: Duration,
512    ) -> TerminateOnDrop<O> {
513        TerminateOnDrop {
514            process_handle: self,
515            interrupt_timeout: graceful_termination_timeout,
516            terminate_timeout: forceful_termination_timeout,
517        }
518    }
519
520    /// Manually sed a `SIGINT` on unix or equivalent on Windows to this process.
521    ///
522    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
523    pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
524        signal::send_interrupt(&self.child)
525    }
526
527    /// Manually sed a `SIGTERM` on unix or equivalent on Windows to this process.
528    ///
529    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
530    pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
531        signal::send_terminate(&self.child)
532    }
533
534    /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
535    /// doesn't run to completion after receiving any of the first two signals.
536    ///
537    /// This handle can be dropped safely after this call returned, no matter the outcome.
538    /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
539    pub async fn terminate(
540        &mut self,
541        interrupt_timeout: Duration,
542        terminate_timeout: Duration,
543    ) -> Result<ExitStatus, TerminationError> {
544        // Whether or not this function will ultimately succeed, we tried our best to terminate
545        // this process.
546        // Dropping this handle should not create any on-drop panic anymore.
547        // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
548        self.must_not_be_terminated();
549
550        self.send_interrupt_signal()
551            .map_err(|err| TerminationError::SignallingFailed {
552                source: err,
553                signal: "SIGINT",
554            })?;
555
556        match self.wait_for_completion(Some(interrupt_timeout)).await {
557            Ok(exit_status) => Ok(exit_status),
558            Err(not_terminated_after_sigint) => {
559                tracing::warn!(
560                    process = %self.name,
561                    error = %not_terminated_after_sigint,
562                    "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
563                );
564
565                self.send_terminate_signal()
566                    .map_err(|err| TerminationError::SignallingFailed {
567                        source: err,
568                        signal: "SIGTERM",
569                    })?;
570
571                match self.wait_for_completion(Some(terminate_timeout)).await {
572                    Ok(exit_status) => Ok(exit_status),
573                    Err(not_terminated_after_sigterm) => {
574                        tracing::warn!(
575                            process = %self.name,
576                            error = %not_terminated_after_sigterm,
577                            "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
578                        );
579
580                        match self.kill().await {
581                            Ok(()) => {
582                                // Note: A SIGKILL should typically (somewhat) immediately lead to
583                                // termination of the process. But there are cases in which even
584                                // a SIGKILL does not / cannot / will not kill a process.
585                                // Something must have gone horribly wrong then...
586                                // But: We do not want to wait indefinitely in case this happens
587                                // and therefore wait (at max) for a fixed three seconds after any
588                                // SIGKILL event.
589                                match self.wait_for_completion(Some(Duration::from_secs(3))).await {
590                                    Ok(exit_status) => Ok(exit_status),
591                                    Err(not_terminated_after_sigkill) => {
592                                        // Unlikely. See the note above.
593                                        tracing::error!(
594                                            "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
595                                            self.name
596                                        );
597                                        Err(TerminationError::TerminationFailed {
598                                            not_terminated_after_sigint,
599                                            not_terminated_after_sigterm,
600                                            not_terminated_after_sigkill,
601                                        })
602                                    }
603                                }
604                            }
605                            Err(not_terminated_after_sigkill) => {
606                                tracing::error!(
607                                    process = %self.name,
608                                    error = %not_terminated_after_sigkill,
609                                    "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
610                                );
611
612                                Err(TerminationError::TerminationFailed {
613                                    not_terminated_after_sigint,
614                                    not_terminated_after_sigterm,
615                                    not_terminated_after_sigkill,
616                                })
617                            }
618                        }
619                    }
620                }
621            }
622        }
623    }
624
625    /// Forces the process to exit. Most users should call [ProcessHandle::terminate] instead.
626    ///
627    /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
628    pub async fn kill(&mut self) -> io::Result<()> {
629        self.child.kill().await
630    }
631
632    /// Successfully awaiting the completion of the process will unset the
633    /// "must be terminated" setting, as a successfully awaited process is already terminated.
634    /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
635    /// "must be terminated" panic being raised.
636    async fn wait(&mut self) -> io::Result<ExitStatus> {
637        match self.child.wait().await {
638            Ok(status) => {
639                self.must_not_be_terminated();
640                Ok(status)
641            }
642            Err(err) => Err(err),
643        }
644    }
645
646    /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
647    ///
648    /// If the timeout is reached before the process terminated, an error is returned but the
649    /// process remains untouched / keeps running.
650    /// Use [ProcessHandle::wait_for_completion_or_terminate] if you want immediate termination.
651    ///
652    /// This does not provide the processes output. You can take a look at the convenience function
653    /// [ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output] to see
654    /// how the [ProcessHandle::stdout] and [ProcessHandle::stderr] streams (also available in
655    /// *_mut variants) can be used to inspect / watch over / capture the processes output.
656    pub async fn wait_for_completion(
657        &mut self,
658        timeout: Option<Duration>,
659    ) -> io::Result<ExitStatus> {
660        match timeout {
661            None => self.wait().await,
662            Some(timeout) => match tokio::time::timeout(timeout, self.wait()).await {
663                Ok(exit_status) => exit_status,
664                Err(err) => Err(err.into()),
665            },
666        }
667    }
668
669    /// Wait for this process to run to completion within `timeout`.
670    ///
671    /// If the timeout is reached before the process terminated normally, external termination of
672    /// the process is forced through [ProcessHandle::terminate].
673    ///
674    /// Note that this function may return `Ok` even though the timeout was reached, carrying the
675    /// exit status received after sending a termination signal!
676    pub async fn wait_for_completion_or_terminate(
677        &mut self,
678        wait_timeout: Duration,
679        interrupt_timeout: Duration,
680        terminate_timeout: Duration,
681    ) -> Result<ExitStatus, TerminationError> {
682        match self.wait_for_completion(Some(wait_timeout)).await {
683            Ok(exit_status) => Ok(exit_status),
684            Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
685        }
686    }
687
688    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
689    /// stdout and stderr output streams.
690    pub fn into_inner(self) -> (Child, O, O) {
691        (self.child, self.std_out_stream, self.std_err_stream)
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698    use assertr::prelude::*;
699
700    #[tokio::test]
701    async fn test_termination() {
702        let mut cmd = tokio::process::Command::new("sleep");
703        cmd.arg("5");
704
705        let started_at = jiff::Zoned::now();
706        let mut handle = ProcessHandle::<BroadcastOutputStream>::spawn("sleep", cmd).unwrap();
707        tokio::time::sleep(Duration::from_millis(100)).await;
708        let exit_status = handle
709            .terminate(Duration::from_secs(1), Duration::from_secs(1))
710            .await
711            .unwrap();
712        let terminated_at = jiff::Zoned::now();
713
714        // We terminate after roughly 100 ms of waiting.
715        // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
716        // We can increase this if the test should turn out to be flaky.
717        let ran_for = started_at.duration_until(&terminated_at);
718        assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
719
720        // When terminated, we do not get an exit code (unix).
721        assert_that(exit_status.code()).is_none();
722    }
723}