tokio_process_tools/
process_handle.rs

1use crate::error::{SpawnError, TerminationError, WaitError};
2use crate::output::Output;
3use crate::output_stream::broadcast::BroadcastOutputStream;
4use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::{BackpressureControl, FromStreamOptions};
6use crate::panic_on_drop::PanicOnDrop;
7use crate::terminate_on_drop::TerminateOnDrop;
8use crate::{LineParsingOptions, NumBytes, OutputStream, signal};
9use std::borrow::Cow;
10use std::fmt::Debug;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13use std::time::Duration;
14use tokio::process::{Child, ChildStdin};
15
16const STDOUT_STREAM_NAME: &str = "stdout";
17const STDERR_STREAM_NAME: &str = "stderr";
18
19/// Maximum time to wait for process termination after sending SIGKILL.
20///
21/// This is a safety timeout since SIGKILL should terminate processes immediately,
22/// but there are rare cases where even SIGKILL may not work.
23const SIGKILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
24
25/// Represents the stdin stream of a child process.
26///
27/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
28/// that can be used to write data to the process. It can be explicitly closed by calling
29/// [`Stdin::close`], after which it transitions to the `Closed` state.
30#[derive(Debug)]
31pub enum Stdin {
32    /// stdin is open and available for writing.
33    Open(ChildStdin),
34    /// stdin has been closed.
35    Closed,
36}
37
38impl Stdin {
39    /// Returns `true` if stdin is open and available for writing.
40    pub fn is_open(&self) -> bool {
41        matches!(self, Stdin::Open(_))
42    }
43
44    /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
45    pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
46        match self {
47            Stdin::Open(stdin) => Some(stdin),
48            Stdin::Closed => None,
49        }
50    }
51
52    /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
53    ///
54    /// This sends `EOF` to the child process. After calling this method, this stdin
55    /// will be in the `Closed` state and no further writes will be possible.
56    pub fn close(&mut self) {
57        *self = Stdin::Closed;
58    }
59}
60
61/// Represents the running state of a process.
62#[derive(Debug)]
63pub enum RunningState {
64    /// The process is still running.
65    Running,
66
67    /// The process has terminated with the given exit status.
68    Terminated(ExitStatus),
69
70    /// Failed to determine process state.
71    Uncertain(io::Error),
72}
73
74impl RunningState {
75    /// Returns `true` if the process is running, `false` otherwise.
76    pub fn as_bool(&self) -> bool {
77        match self {
78            RunningState::Running => true,
79            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
80        }
81    }
82}
83
84impl From<RunningState> for bool {
85    fn from(is_running: RunningState) -> Self {
86        match is_running {
87            RunningState::Running => true,
88            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
89        }
90    }
91}
92
93/// A handle to a spawned process with captured stdout/stderr streams.
94///
95/// This type provides methods for waiting on process completion, terminating the process,
96/// and accessing its output streams. By default, processes must be explicitly waited on
97/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
98///
99/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
100/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
101#[derive(Debug)]
102pub struct ProcessHandle<O: OutputStream> {
103    pub(crate) name: Cow<'static, str>,
104    child: Child,
105    std_in: Stdin,
106    std_out_stream: O,
107    std_err_stream: O,
108    panic_on_drop: Option<PanicOnDrop>,
109}
110
111impl ProcessHandle<BroadcastOutputStream> {
112    /// Spawns a new process with broadcast output streams and custom channel capacities.
113    ///
114    /// This method is intended for internal use by the `Process` builder.
115    /// Users should use `Process::new(cmd).spawn_broadcast()` instead.
116    pub(crate) fn spawn_with_capacity(
117        name: impl Into<Cow<'static, str>>,
118        mut cmd: tokio::process::Command,
119        stdout_chunk_size: NumBytes,
120        stderr_chunk_size: NumBytes,
121        stdout_channel_capacity: usize,
122        stderr_channel_capacity: usize,
123    ) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
124        let process_name = name.into();
125        Self::prepare_command(&mut cmd)
126            .spawn()
127            .map(|child| {
128                Self::new_from_child_with_piped_io_and_capacity(
129                    process_name.clone(),
130                    child,
131                    stdout_chunk_size,
132                    stderr_chunk_size,
133                    stdout_channel_capacity,
134                    stderr_channel_capacity,
135                )
136            })
137            .map_err(|source| SpawnError::SpawnFailed {
138                process_name,
139                source,
140            })
141    }
142
143    fn new_from_child_with_piped_io_and_capacity(
144        name: impl Into<Cow<'static, str>>,
145        mut child: Child,
146        stdout_chunk_size: NumBytes,
147        stderr_chunk_size: NumBytes,
148        stdout_channel_capacity: usize,
149        stderr_channel_capacity: usize,
150    ) -> ProcessHandle<BroadcastOutputStream> {
151        let std_in = match child.stdin.take() {
152            Some(stdin) => Stdin::Open(stdin),
153            None => Stdin::Closed,
154        };
155        let stdout = child
156            .stdout
157            .take()
158            .expect("Child process stdout wasn't captured");
159        let stderr = child
160            .stderr
161            .take()
162            .expect("Child process stderr wasn't captured");
163
164        let std_out_stream = BroadcastOutputStream::from_stream(
165            stdout,
166            STDOUT_STREAM_NAME,
167            FromStreamOptions {
168                chunk_size: stdout_chunk_size,
169                channel_capacity: stdout_channel_capacity,
170            },
171        );
172        let std_err_stream = BroadcastOutputStream::from_stream(
173            stderr,
174            STDERR_STREAM_NAME,
175            FromStreamOptions {
176                chunk_size: stderr_chunk_size,
177                channel_capacity: stderr_channel_capacity,
178            },
179        );
180
181        let mut this = ProcessHandle {
182            name: name.into(),
183            child,
184            std_in,
185            std_out_stream,
186            std_err_stream,
187            panic_on_drop: None,
188        };
189        this.must_be_terminated();
190        this
191    }
192
193    /// Convenience function, waiting for the process to complete using
194    /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
195    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
196    ///
197    /// You may want to destructure this using:
198    /// ```no_run
199    /// # use tokio::process::Command;
200    /// # use tokio_process_tools::*;
201    /// # tokio_test::block_on(async {
202    /// # let mut proc = Process::new(Command::new("ls")).spawn_broadcast().unwrap();
203    /// let Output {
204    ///     status,
205    ///     stdout,
206    ///     stderr
207    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
208    /// # });
209    /// ```
210    pub async fn wait_for_completion_with_output(
211        &mut self,
212        timeout: Option<Duration>,
213        options: LineParsingOptions,
214    ) -> Result<Output, WaitError> {
215        let out_collector = self.stdout().collect_lines_into_vec(options);
216        let err_collector = self.stderr().collect_lines_into_vec(options);
217
218        let status = self.wait_for_completion(timeout).await?;
219
220        let stdout = out_collector.wait().await?;
221        let stderr = err_collector.wait().await?;
222
223        Ok(Output {
224            status,
225            stdout,
226            stderr,
227        })
228    }
229
230    /// Convenience function, waiting for the process to complete using
231    /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
232    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
233    pub async fn wait_for_completion_with_output_or_terminate(
234        &mut self,
235        wait_timeout: Duration,
236        interrupt_timeout: Duration,
237        terminate_timeout: Duration,
238        options: LineParsingOptions,
239    ) -> Result<Output, WaitError> {
240        let out_collector = self.stdout().collect_lines_into_vec(options);
241        let err_collector = self.stderr().collect_lines_into_vec(options);
242
243        let status = self
244            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
245            .await?;
246
247        let stdout = out_collector.wait().await?;
248        let stderr = err_collector.wait().await?;
249
250        Ok(Output {
251            status,
252            stdout,
253            stderr,
254        })
255    }
256}
257
258impl ProcessHandle<SingleSubscriberOutputStream> {
259    /// Spawns a new process with single subscriber output streams and custom channel capacities.
260    ///
261    /// This method is intended for internal use by the `Process` builder.
262    /// Users should use `Process::new(cmd).spawn_single_subscriber()` instead.
263    pub(crate) fn spawn_with_capacity(
264        name: impl Into<Cow<'static, str>>,
265        mut cmd: tokio::process::Command,
266        stdout_chunk_size: NumBytes,
267        stderr_chunk_size: NumBytes,
268        stdout_channel_capacity: usize,
269        stderr_channel_capacity: usize,
270    ) -> Result<Self, SpawnError> {
271        let process_name = name.into();
272        Self::prepare_command(&mut cmd)
273            .spawn()
274            .map(|child| {
275                Self::new_from_child_with_piped_io_and_capacity(
276                    process_name.clone(),
277                    child,
278                    stdout_chunk_size,
279                    stderr_chunk_size,
280                    stdout_channel_capacity,
281                    stderr_channel_capacity,
282                )
283            })
284            .map_err(|source| SpawnError::SpawnFailed {
285                process_name,
286                source,
287            })
288    }
289
290    fn new_from_child_with_piped_io_and_capacity(
291        name: impl Into<Cow<'static, str>>,
292        mut child: Child,
293        stdout_chunk_size: NumBytes,
294        stderr_chunk_size: NumBytes,
295        stdout_channel_capacity: usize,
296        stderr_channel_capacity: usize,
297    ) -> Self {
298        let std_in = match child.stdin.take() {
299            Some(stdin) => Stdin::Open(stdin),
300            None => Stdin::Closed,
301        };
302        let stdout = child
303            .stdout
304            .take()
305            .expect("Child process stdout wasn't captured");
306        let stderr = child
307            .stderr
308            .take()
309            .expect("Child process stderr wasn't captured");
310
311        let std_out_stream = SingleSubscriberOutputStream::from_stream(
312            stdout,
313            STDOUT_STREAM_NAME,
314            BackpressureControl::DropLatestIncomingIfBufferFull,
315            FromStreamOptions {
316                chunk_size: stdout_chunk_size,
317                channel_capacity: stdout_channel_capacity,
318            },
319        );
320        let std_err_stream = SingleSubscriberOutputStream::from_stream(
321            stderr,
322            STDERR_STREAM_NAME,
323            BackpressureControl::DropLatestIncomingIfBufferFull,
324            FromStreamOptions {
325                chunk_size: stderr_chunk_size,
326                channel_capacity: stderr_channel_capacity,
327            },
328        );
329
330        let mut this = ProcessHandle {
331            name: name.into(),
332            child,
333            std_in,
334            std_out_stream,
335            std_err_stream,
336            panic_on_drop: None,
337        };
338        this.must_be_terminated();
339        this
340    }
341
342    /// Convenience function, waiting for the process to complete using
343    /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
344    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
345    ///
346    /// You may want to destructure this using:
347    /// ```no_run
348    /// # use tokio_process_tools::*;
349    /// # tokio_test::block_on(async {
350    /// # let mut proc = Process::new(tokio::process::Command::new("ls")).spawn_broadcast().unwrap();
351    /// let Output {
352    ///     status,
353    ///     stdout,
354    ///     stderr
355    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
356    /// # });
357    /// ```
358    pub async fn wait_for_completion_with_output(
359        &mut self,
360        timeout: Option<Duration>,
361        options: LineParsingOptions,
362    ) -> Result<Output, WaitError> {
363        let out_collector = self.stdout().collect_lines_into_vec(options);
364        let err_collector = self.stderr().collect_lines_into_vec(options);
365
366        let status = self.wait_for_completion(timeout).await?;
367
368        let stdout = out_collector.wait().await?;
369        let stderr = err_collector.wait().await?;
370
371        Ok(Output {
372            status,
373            stdout,
374            stderr,
375        })
376    }
377
378    /// Convenience function, waiting for the process to complete using
379    /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
380    /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
381    pub async fn wait_for_completion_with_output_or_terminate(
382        &mut self,
383        wait_timeout: Duration,
384        interrupt_timeout: Duration,
385        terminate_timeout: Duration,
386        options: LineParsingOptions,
387    ) -> Result<Output, WaitError> {
388        let out_collector = self.stdout().collect_lines_into_vec(options);
389        let err_collector = self.stderr().collect_lines_into_vec(options);
390
391        let status = self
392            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
393            .await?;
394
395        let stdout = out_collector.wait().await?;
396        let stderr = err_collector.wait().await?;
397
398        Ok(Output {
399            status,
400            stdout,
401            stderr,
402        })
403    }
404}
405
406impl<O: OutputStream> ProcessHandle<O> {
407    /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
408    /// which works more like `killpg`. Sending to the current process ID will likely trigger
409    /// undefined behavior of sending the event to every process that's attached to the console,
410    /// i.e. sending the event to group ID 0. Therefore, we create a new process group
411    /// for the child process we are about to spawn.
412    ///
413    /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
414    fn prepare_platform_specifics(
415        command: &mut tokio::process::Command,
416    ) -> &mut tokio::process::Command {
417        #[cfg(windows)]
418        {
419            use windows_sys::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
420            command.creation_flags(CREATE_NEW_PROCESS_GROUP)
421        }
422        #[cfg(not(windows))]
423        {
424            command
425        }
426    }
427
428    fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
429        Self::prepare_platform_specifics(command)
430            .stdin(Stdio::piped())
431            .stdout(Stdio::piped())
432            .stderr(Stdio::piped())
433            // It is much too easy to leave dangling resources here and there.
434            // This library tries to make it clear and encourage users to terminate spawned
435            // processes appropriately. If not done so anyway, this acts as a "last resort"
436            // type of solution, less graceful as the `terminate_on_drop` effect but at least
437            // capable of cleaning up.
438            .kill_on_drop(true)
439    }
440
441    /// Returns the OS process ID if the process hasn't exited yet.
442    ///
443    /// Once this process has been polled to completion this will return None.
444    pub fn id(&self) -> Option<u32> {
445        self.child.id()
446    }
447
448    /// Checks if the process is currently running.
449    ///
450    /// Returns [`RunningState::Running`] if the process is still running,
451    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
452    /// if the state could not be determined.
453    //noinspection RsSelfConvention
454    pub fn is_running(&mut self) -> RunningState {
455        match self.child.try_wait() {
456            Ok(None) => RunningState::Running,
457            Ok(Some(exit_status)) => {
458                self.must_not_be_terminated();
459                RunningState::Terminated(exit_status)
460            }
461            Err(err) => RunningState::Uncertain(err),
462        }
463    }
464
465    /// Returns a mutable reference to the (potentially already closed) stdin stream.
466    ///
467    /// Use this to write data to the child process's stdin. The stdin stream implements
468    /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
469    ///
470    /// # Example
471    ///
472    /// ```no_run
473    /// # use tokio::process::Command;
474    /// # use tokio_process_tools::*;
475    /// # use tokio::io::AsyncWriteExt;
476    /// # tokio_test::block_on(async {
477    /// // Whether we `spawn_broadcast` or `spawn_single_subscriber` does not make a difference here.
478    /// let mut process = Process::new(Command::new("cat"))
479    ///     .spawn_broadcast()
480    ///     .unwrap();
481    ///
482    /// // Write to stdin.
483    /// if let Some(stdin) = process.stdin().as_mut() {
484    ///     stdin.write_all(b"Hello, process!\n").await.unwrap();
485    ///     stdin.flush().await.unwrap();
486    /// }
487    ///
488    /// // Close stdin to signal EOF.
489    /// process.stdin().close();
490    /// # });
491    /// ```
492    pub fn stdin(&mut self) -> &mut Stdin {
493        &mut self.std_in
494    }
495
496    /// Returns a reference to the stdout stream.
497    ///
498    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
499    /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
500    /// attempts will panic with a helpful error message).
501    pub fn stdout(&self) -> &O {
502        &self.std_out_stream
503    }
504
505    /// Returns a reference to the stderr stream.
506    ///
507    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
508    /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
509    /// attempts will panic with a helpful error message).
510    pub fn stderr(&self) -> &O {
511        &self.std_err_stream
512    }
513
514    /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
515    ///
516    /// This method enables a safeguard that ensures that the process represented by this
517    /// `ProcessHandle` is properly terminated or awaited before being dropped.
518    /// If `must_be_terminated` is set and the `ProcessHandle` is
519    /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
520    /// prevent silent failure-states, ensuring that system resources are handled correctly.
521    ///
522    /// You typically do not need to call this, as every ProcessHandle is marked by default.
523    /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
524    /// process without terminating it.
525    ///
526    /// # Panic
527    ///
528    /// If the `ProcessHandle` is dropped without being awaited or terminated
529    /// after calling this method, a panic will occur with a descriptive message
530    /// to inform about the incorrect usage.
531    pub fn must_be_terminated(&mut self) {
532        self.panic_on_drop = Some(PanicOnDrop::new(
533            "tokio_process_tools::ProcessHandle",
534            "The process was not terminated.",
535            "Call `wait_for_completion` or `terminate` before the type is dropped!",
536        ));
537    }
538
539    /// Disables the panic-on-drop safeguard, allowing the spawned process to be kept running
540    /// uncontrolled in the background, while this handle can safely be dropped.
541    pub fn must_not_be_terminated(&mut self) {
542        if let Some(mut it) = self.panic_on_drop.take() {
543            it.defuse()
544        }
545    }
546
547    /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
548    /// automatically when this handle is dropped.
549    ///
550    /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
551    ///
552    /// Prefer manual termination of the process or awaiting it and relying on the (automatically
553    /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
554    /// nor terminated before being dropped.
555    pub fn terminate_on_drop(
556        self,
557        graceful_termination_timeout: Duration,
558        forceful_termination_timeout: Duration,
559    ) -> TerminateOnDrop<O> {
560        TerminateOnDrop {
561            process_handle: self,
562            interrupt_timeout: graceful_termination_timeout,
563            terminate_timeout: forceful_termination_timeout,
564        }
565    }
566
567    /// Manually send a `SIGINT` on unix or equivalent on Windows to this process.
568    ///
569    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
570    pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
571        signal::send_interrupt(&self.child)
572    }
573
574    /// Manually send a `SIGTERM` on unix or equivalent on Windows to this process.
575    ///
576    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
577    pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
578        signal::send_terminate(&self.child)
579    }
580
581    /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
582    /// doesn't run to completion after receiving any of the first two signals.
583    ///
584    /// This handle can be dropped safely after this call returned, no matter the outcome.
585    /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
586    pub async fn terminate(
587        &mut self,
588        interrupt_timeout: Duration,
589        terminate_timeout: Duration,
590    ) -> Result<ExitStatus, TerminationError> {
591        // Whether or not this function will ultimately succeed, we tried our best to terminate
592        // this process.
593        // Dropping this handle should not create any on-drop panic anymore.
594        // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
595        self.must_not_be_terminated();
596
597        self.send_interrupt_signal()
598            .map_err(|err| TerminationError::SignallingFailed {
599                process_name: self.name.clone(),
600                source: err,
601                signal: "SIGINT",
602            })?;
603
604        match self.wait_for_completion(Some(interrupt_timeout)).await {
605            Ok(exit_status) => Ok(exit_status),
606            Err(not_terminated_after_sigint) => {
607                tracing::warn!(
608                    process = %self.name,
609                    error = %not_terminated_after_sigint,
610                    "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
611                );
612
613                self.send_terminate_signal()
614                    .map_err(|err| TerminationError::SignallingFailed {
615                        process_name: self.name.clone(),
616                        source: err,
617                        signal: "SIGTERM",
618                    })?;
619
620                match self.wait_for_completion(Some(terminate_timeout)).await {
621                    Ok(exit_status) => Ok(exit_status),
622                    Err(not_terminated_after_sigterm) => {
623                        tracing::warn!(
624                            process = %self.name,
625                            error = %not_terminated_after_sigterm,
626                            "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
627                        );
628
629                        match self.kill().await {
630                            Ok(()) => {
631                                // Note: A SIGKILL should typically (somewhat) immediately lead to
632                                // termination of the process. But there are cases in which even
633                                // a SIGKILL does not / cannot / will not kill a process.
634                                // Something must have gone horribly wrong then...
635                                // But: We do not want to wait indefinitely in case this happens
636                                // and therefore wait (at max) for a fixed duration after any
637                                // SIGKILL event.
638                                match self.wait_for_completion(Some(SIGKILL_WAIT_TIMEOUT)).await {
639                                    Ok(exit_status) => Ok(exit_status),
640                                    Err(not_terminated_after_sigkill) => {
641                                        // Unlikely. See the note above.
642                                        tracing::error!(
643                                            "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
644                                            self.name
645                                        );
646                                        Err(TerminationError::TerminationFailed {
647                                            process_name: self.name.clone(),
648                                            sigint_error: not_terminated_after_sigint.to_string(),
649                                            sigterm_error: not_terminated_after_sigterm.to_string(),
650                                            sigkill_error: io::Error::new(
651                                                io::ErrorKind::TimedOut,
652                                                not_terminated_after_sigkill.to_string(),
653                                            ),
654                                        })
655                                    }
656                                }
657                            }
658                            Err(kill_error) => {
659                                tracing::error!(
660                                    process = %self.name,
661                                    error = %kill_error,
662                                    "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
663                                );
664
665                                Err(TerminationError::TerminationFailed {
666                                    process_name: self.name.clone(),
667                                    sigint_error: not_terminated_after_sigint.to_string(),
668                                    sigterm_error: not_terminated_after_sigterm.to_string(),
669                                    sigkill_error: kill_error,
670                                })
671                            }
672                        }
673                    }
674                }
675            }
676        }
677    }
678
679    /// Forces the process to exit. Most users should call [ProcessHandle::terminate] instead.
680    ///
681    /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
682    pub async fn kill(&mut self) -> io::Result<()> {
683        self.child.kill().await
684    }
685
686    /// Successfully awaiting the completion of the process will unset the
687    /// "must be terminated" setting, as a successfully awaited process is already terminated.
688    /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
689    /// "must be terminated" panic being raised.
690    async fn wait(&mut self) -> io::Result<ExitStatus> {
691        match self.child.wait().await {
692            Ok(status) => {
693                self.must_not_be_terminated();
694                Ok(status)
695            }
696            Err(err) => Err(err),
697        }
698    }
699
700    /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
701    ///
702    /// If the timeout is reached before the process terminated, an error is returned but the
703    /// process remains untouched / keeps running.
704    /// Use [ProcessHandle::wait_for_completion_or_terminate] if you want immediate termination.
705    ///
706    /// This does not provide the processes output. You can take a look at the convenience function
707    /// [ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output] to see
708    /// how the [ProcessHandle::stdout] and [ProcessHandle::stderr] streams (also available in
709    /// *_mut variants) can be used to inspect / watch over / capture the processes output.
710    pub async fn wait_for_completion(
711        &mut self,
712        timeout: Option<Duration>,
713    ) -> Result<ExitStatus, WaitError> {
714        match timeout {
715            None => self.wait().await.map_err(|source| WaitError::IoError {
716                process_name: self.name.clone(),
717                source,
718            }),
719            Some(timeout_duration) => {
720                match tokio::time::timeout(timeout_duration, self.wait()).await {
721                    Ok(Ok(exit_status)) => Ok(exit_status),
722                    Ok(Err(source)) => Err(WaitError::IoError {
723                        process_name: self.name.clone(),
724                        source,
725                    }),
726                    Err(_elapsed) => Err(WaitError::Timeout {
727                        process_name: self.name.clone(),
728                        timeout: timeout_duration,
729                    }),
730                }
731            }
732        }
733    }
734
735    /// Wait for this process to run to completion within `timeout`.
736    ///
737    /// If the timeout is reached before the process terminated normally, external termination of
738    /// the process is forced through [ProcessHandle::terminate].
739    ///
740    /// Note that this function may return `Ok` even though the timeout was reached, carrying the
741    /// exit status received after sending a termination signal!
742    pub async fn wait_for_completion_or_terminate(
743        &mut self,
744        wait_timeout: Duration,
745        interrupt_timeout: Duration,
746        terminate_timeout: Duration,
747    ) -> Result<ExitStatus, TerminationError> {
748        match self.wait_for_completion(Some(wait_timeout)).await {
749            Ok(exit_status) => Ok(exit_status),
750            Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
751        }
752    }
753
754    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
755    /// stdout and stderr output streams.
756    pub fn into_inner(self) -> (Child, O, O) {
757        (self.child, self.std_out_stream, self.std_err_stream)
758    }
759}
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764    use assertr::prelude::*;
765    use tokio::io::AsyncWriteExt;
766
767    #[tokio::test]
768    async fn test_termination() {
769        let mut cmd = tokio::process::Command::new("sleep");
770        cmd.arg("5");
771
772        let started_at = jiff::Zoned::now();
773        let mut handle = crate::Process::new(cmd)
774            .name("sleep")
775            .spawn_broadcast()
776            .unwrap();
777        tokio::time::sleep(Duration::from_millis(100)).await;
778        let exit_status = handle
779            .terminate(Duration::from_secs(1), Duration::from_secs(1))
780            .await
781            .unwrap();
782        let terminated_at = jiff::Zoned::now();
783
784        // We terminate after roughly 100 ms of waiting.
785        // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
786        // We can increase this if the test should turn out to be flaky.
787        let ran_for = started_at.duration_until(&terminated_at);
788        assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
789
790        // When terminated, we do not get an exit code (unix).
791        assert_that(exit_status.code()).is_none();
792    }
793
794    #[tokio::test]
795    async fn test_stdin_write_and_read() {
796        let cmd = tokio::process::Command::new("cat");
797        let mut process = crate::Process::new(cmd)
798            .name("cat")
799            .spawn_broadcast()
800            .unwrap();
801
802        // Verify stdin starts as open.
803        assert_that(process.stdin().is_open()).is_true();
804
805        // Write to stdin.
806        let test_data = b"Hello from stdin!\n";
807        if let Some(stdin) = process.stdin().as_mut() {
808            stdin.write_all(test_data).await.unwrap();
809            stdin.flush().await.unwrap();
810        }
811
812        // Close stdin to signal EOF.
813        process.stdin().close();
814        assert_that(process.stdin().is_open()).is_false();
815
816        // Collect stdout.
817        let output = process
818            .wait_for_completion_with_output(
819                Some(Duration::from_secs(2)),
820                LineParsingOptions::default(),
821            )
822            .await
823            .unwrap();
824
825        assert_that(output.status.success()).is_true();
826        assert_that(&output.stdout).has_length(1);
827        assert_that(output.stdout[0].as_str()).is_equal_to("Hello from stdin!");
828    }
829
830    #[tokio::test]
831    async fn test_stdin_close_sends_eof() {
832        // Use `cat` which will exit when stdin is closed.
833        let cmd = tokio::process::Command::new("cat");
834        let mut process = crate::Process::new(cmd)
835            .name("cat")
836            .spawn_broadcast()
837            .unwrap();
838
839        // Close stdin immediately without writing.
840        process.stdin().close();
841        assert_that(process.stdin().is_open()).is_false();
842
843        // Process should terminate since it receives EOF.
844        let status = process
845            .wait_for_completion(Some(Duration::from_secs(2)))
846            .await
847            .unwrap();
848
849        assert_that(status.success()).is_true();
850    }
851
852    #[tokio::test]
853    async fn test_stdin_multiple_writes() {
854        let cmd = tokio::process::Command::new("cat");
855        let mut process = crate::Process::new(cmd)
856            .name("cat")
857            .spawn_broadcast()
858            .unwrap();
859
860        // Write multiple lines.
861        if let Some(stdin) = process.stdin().as_mut() {
862            stdin.write_all(b"Line 1\n").await.unwrap();
863            stdin.write_all(b"Line 2\n").await.unwrap();
864            stdin.write_all(b"Line 3\n").await.unwrap();
865            stdin.flush().await.unwrap();
866        }
867
868        process.stdin().close();
869
870        let output = process
871            .wait_for_completion_with_output(
872                Some(Duration::from_secs(2)),
873                LineParsingOptions::default(),
874            )
875            .await
876            .unwrap();
877
878        assert_that(&output.stdout).has_length(3);
879        assert_that(output.stdout[0].as_str()).is_equal_to("Line 1");
880        assert_that(output.stdout[1].as_str()).is_equal_to("Line 2");
881        assert_that(output.stdout[2].as_str()).is_equal_to("Line 3");
882    }
883
884    #[tokio::test]
885    async fn test_python_command_dispatch() {
886        let mut cmd = tokio::process::Command::new("python3");
887        cmd.arg("-i"); // Interactive mode.
888
889        let mut process = crate::Process::new(cmd).spawn_broadcast().unwrap();
890
891        // Monitor output.
892        let collector = process
893            .stdout()
894            .collect_lines_into_vec(LineParsingOptions::default());
895
896        // Send command to Python.
897        if let Some(stdin) = process.stdin().as_mut() {
898            stdin
899                .write_all(b"print('Hello from Python')\n")
900                .await
901                .unwrap();
902            stdin.flush().await.unwrap();
903        }
904
905        // Wait a bit for output.
906        tokio::time::sleep(Duration::from_millis(500)).await;
907
908        // We can either:
909        // - not close stdin and manually `terminate` the process or
910        // - close stdin, and wait for the process to naturally terminate (which python3 will).
911        process.stdin().close();
912        process
913            .wait_for_completion(Some(Duration::from_secs(1)))
914            .await
915            .unwrap();
916
917        let collected = collector.wait().await.unwrap();
918        assert_that(&collected).has_length(1);
919        assert_that(collected[0].as_str()).is_equal_to("Hello from Python");
920    }
921}