tokio_process_tools/
process_handle.rs

1use crate::output_stream::broadcast::BroadcastOutputStream;
2use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
3use crate::output_stream::{BackpressureControl, FromStreamOptions};
4use crate::panic_on_drop::PanicOnDrop;
5use crate::terminate_on_drop::TerminateOnDrop;
6use crate::{CollectorError, LineParsingOptions, OutputStream, signal};
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::io;
10use std::process::{ExitStatus, Stdio};
11use std::time::Duration;
12use thiserror::Error;
13use tokio::process::Child;
14
15#[derive(Debug, Error)]
16pub enum TerminationError {
17    #[error("Failed to send '{signal}' signal to process: {source}")]
18    SignallingFailed {
19        source: io::Error,
20        signal: &'static str,
21    },
22
23    #[error(
24        "Failed to terminate process. Graceful SIGINT termination failure: {not_terminated_after_sigint}. Graceful SIGTERM termination failure: {not_terminated_after_sigterm}. Forceful termination failure: {not_terminated_after_sigkill}"
25    )]
26    TerminationFailed {
27        not_terminated_after_sigint: io::Error,
28        not_terminated_after_sigterm: io::Error,
29        not_terminated_after_sigkill: io::Error,
30    },
31}
32
33/// Represents the running state of a process.
34#[derive(Debug)]
35pub enum RunningState {
36    /// The process is still running.
37    Running,
38
39    /// The process has terminated with the given exit status.
40    Terminated(ExitStatus),
41
42    /// Failed to determine process state.
43    Uncertain(io::Error),
44}
45
46impl RunningState {
47    pub fn as_bool(&self) -> bool {
48        match self {
49            RunningState::Running => true,
50            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
51        }
52    }
53}
54
55impl From<RunningState> for bool {
56    fn from(is_running: RunningState) -> Self {
57        match is_running {
58            RunningState::Running => true,
59            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
60        }
61    }
62}
63
64/// Errors that can occur when waiting for process output.
65#[derive(Debug, Error)]
66pub enum WaitError {
67    #[error("A general io error occurred")]
68    IoError(#[from] io::Error),
69
70    #[error("Collector failed")]
71    CollectorFailed(#[from] CollectorError),
72}
73
74#[derive(Debug)]
75pub struct ProcessHandle<O: OutputStream> {
76    pub(crate) name: Cow<'static, str>,
77    child: Child,
78    std_out_stream: O,
79    std_err_stream: O,
80    panic_on_drop: Option<PanicOnDrop>,
81}
82
83impl ProcessHandle<BroadcastOutputStream> {
84    pub fn spawn(
85        name: impl Into<Cow<'static, str>>,
86        cmd: tokio::process::Command,
87    ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
88        Self::spawn_with_capacity(name, cmd, 128, 128)
89    }
90
91    pub fn spawn_with_capacity(
92        name: impl Into<Cow<'static, str>>,
93        mut cmd: tokio::process::Command,
94        stdout_channel_capacity: usize,
95        stderr_channel_capacity: usize,
96    ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
97        Self::prepare_command(&mut cmd).spawn().map(|child| {
98            Self::new_from_child_with_piped_io_and_capacity(
99                name,
100                child,
101                stdout_channel_capacity,
102                stderr_channel_capacity,
103            )
104        })
105    }
106
107    fn new_from_child_with_piped_io_and_capacity(
108        name: impl Into<Cow<'static, str>>,
109        mut child: Child,
110        stdout_channel_capacity: usize,
111        stderr_channel_capacity: usize,
112    ) -> ProcessHandle<BroadcastOutputStream> {
113        let stdout = child
114            .stdout
115            .take()
116            .expect("Child process stdout wasn't captured");
117        let stderr = child
118            .stderr
119            .take()
120            .expect("Child process stderr wasn't captured");
121
122        let (child, std_out_stream, std_err_stream) = (
123            child,
124            BroadcastOutputStream::from_stream(
125                stdout,
126                FromStreamOptions {
127                    channel_capacity: stdout_channel_capacity,
128                    ..Default::default()
129                },
130            ),
131            BroadcastOutputStream::from_stream(
132                stderr,
133                FromStreamOptions {
134                    channel_capacity: stderr_channel_capacity,
135                    ..Default::default()
136                },
137            ),
138        );
139
140        let mut this = ProcessHandle {
141            name: name.into(),
142            child,
143            std_out_stream,
144            std_err_stream,
145            panic_on_drop: None,
146        };
147        this.must_be_terminated();
148        this
149    }
150
151    pub async fn wait_with_output(
152        &mut self,
153        options: LineParsingOptions,
154    ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
155        let out_collector = self.std_out_stream.collect_lines_into_vec(options);
156        let err_collector = self.std_err_stream.collect_lines_into_vec(options);
157
158        let status = self.wait().await?;
159        let std_out = out_collector.cancel().await?;
160        let std_err = err_collector.cancel().await?;
161
162        Ok((status, std_out, std_err))
163    }
164}
165
166impl ProcessHandle<SingleSubscriberOutputStream> {
167    pub fn spawn(
168        name: impl Into<Cow<'static, str>>,
169        cmd: tokio::process::Command,
170    ) -> io::Result<Self> {
171        Self::spawn_with_capacity(name, cmd, 128, 128)
172    }
173
174    pub fn spawn_with_capacity(
175        name: impl Into<Cow<'static, str>>,
176        mut cmd: tokio::process::Command,
177        stdout_channel_capacity: usize,
178        stderr_channel_capacity: usize,
179    ) -> io::Result<Self> {
180        Self::prepare_command(&mut cmd).spawn().map(|child| {
181            Self::new_from_child_with_piped_io_and_capacity(
182                name,
183                child,
184                stdout_channel_capacity,
185                stderr_channel_capacity,
186            )
187        })
188    }
189
190    fn new_from_child_with_piped_io_and_capacity(
191        name: impl Into<Cow<'static, str>>,
192        mut child: Child,
193        stdout_channel_capacity: usize,
194        stderr_channel_capacity: usize,
195    ) -> Self {
196        let stdout = child
197            .stdout
198            .take()
199            .expect("Child process stdout wasn't captured");
200        let stderr = child
201            .stderr
202            .take()
203            .expect("Child process stderr wasn't captured");
204
205        let (child, std_out_stream, std_err_stream) = (
206            child,
207            SingleSubscriberOutputStream::from_stream(
208                stdout,
209                BackpressureControl::DropLatestIncomingIfBufferFull,
210                FromStreamOptions {
211                    channel_capacity: stdout_channel_capacity,
212                    ..Default::default()
213                },
214            ),
215            SingleSubscriberOutputStream::from_stream(
216                stderr,
217                BackpressureControl::DropLatestIncomingIfBufferFull,
218                FromStreamOptions {
219                    channel_capacity: stderr_channel_capacity,
220                    ..Default::default()
221                },
222            ),
223        );
224
225        let mut this = ProcessHandle {
226            name: name.into(),
227            child,
228            std_out_stream,
229            std_err_stream,
230            panic_on_drop: None,
231        };
232        this.must_be_terminated();
233        this
234    }
235
236    pub async fn wait_with_output(
237        &mut self,
238        options: LineParsingOptions,
239    ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
240        let out_collector = self.std_out_stream.collect_lines_into_vec(options);
241        let err_collector = self.std_err_stream.collect_lines_into_vec(options);
242
243        let status = self.wait().await?;
244        let std_out = out_collector.cancel().await?;
245        let std_err = err_collector.cancel().await?;
246
247        Ok((status, std_out, std_err))
248    }
249}
250
251impl<O: OutputStream> ProcessHandle<O> {
252    /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
253    /// which works more like `killpg`. Sending to the current process ID will likely trigger
254    /// undefined behavior of sending the event to every process that's attached to the console,
255    /// i.e. sending the event to group ID 0. Therefore, we need to create a new process group
256    /// for the child process we are about to spawn.
257    ///
258    /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
259    fn prepare_platform_specifics(
260        command: &mut tokio::process::Command,
261    ) -> &mut tokio::process::Command {
262        #[cfg(windows)]
263        {
264            use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
265
266            let flag = if self.graceful_exit {
267                CREATE_NEW_PROCESS_GROUP.0
268            } else {
269                0
270            };
271            command.creation_flags(flag)
272        }
273        #[cfg(not(windows))]
274        {
275            command
276        }
277    }
278
279    fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
280        Self::prepare_platform_specifics(command)
281            .stdout(Stdio::piped())
282            .stderr(Stdio::piped())
283            // It is much too easy to leave dangling resources here and there.
284            // This library tries to make it clear and encourage users to terminate spawned
285            // processes appropriately. If not done so anyway, this acts as a "last resort"
286            // type of solution, less graceful as the `terminate_on_drop` effect but at least
287            // capable of cleaning up.
288            .kill_on_drop(true)
289    }
290
291    pub fn id(&self) -> Option<u32> {
292        self.child.id()
293    }
294
295    //noinspection RsSelfConvention
296    pub fn is_running(&mut self) -> RunningState {
297        match self.child.try_wait() {
298            Ok(None) => RunningState::Running,
299            Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
300            Err(err) => RunningState::Uncertain(err),
301        }
302    }
303
304    pub fn stdout(&self) -> &O {
305        &self.std_out_stream
306    }
307    pub fn stdout_mut(&mut self) -> &mut O {
308        &mut self.std_out_stream
309    }
310
311    pub fn stderr(&self) -> &O {
312        &self.std_err_stream
313    }
314
315    pub fn stderr_mut(&mut self) -> &mut O {
316        &mut self.std_err_stream
317    }
318
319    /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
320    ///
321    /// This method enables a safeguard that ensures that the process represented by this
322    /// `ProcessHandle` is properly terminated or awaited before being dropped.
323    /// If `must_be_terminated` is set and the `ProcessHandle` is
324    /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
325    /// prevent silent failure-states, ensuring that system resources are handled correctly.
326    ///
327    /// You typically do not need to call this, as every ProcessHandle is marked by default.
328    /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
329    /// process without terminating it.
330    ///
331    /// # Panic
332    ///
333    /// If the `ProcessHandle` is dropped without being awaited or terminated
334    /// after calling this method, a panic will occur with a descriptive message
335    /// to inform about the incorrect usage.
336    pub fn must_be_terminated(&mut self) {
337        self.panic_on_drop = Some(PanicOnDrop {
338            resource_name: "ProcessHandle".into(),
339            details: "Call `terminate()` before the type is dropped!".into(),
340            armed: true,
341        });
342    }
343
344    pub fn must_not_be_terminated(&mut self) {
345        if let Some(mut it) = self.panic_on_drop.take() {
346            it.defuse()
347        }
348    }
349
350    /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
351    ///
352    /// Prefer manual termination of the process or awaiting it and relying on the (automatically
353    /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
354    /// nor terminated before being dropped.
355    pub fn terminate_on_drop(
356        self,
357        graceful_termination_timeout: Duration,
358        forceful_termination_timeout: Duration,
359    ) -> TerminateOnDrop<O> {
360        TerminateOnDrop {
361            process_handle: self,
362            interrupt_timeout: graceful_termination_timeout,
363            terminate_timeout: forceful_termination_timeout,
364        }
365    }
366
367    /// Manually sed a `SIGINT` on unix or equivalent on Windows to this process.
368    ///
369    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
370    pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
371        signal::send_interrupt(&self.child)
372    }
373
374    /// Manually sed a `SIGTERM` on unix or equivalent on Windows to this process.
375    ///
376    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
377    pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
378        signal::send_terminate(&self.child)
379    }
380
381    /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
382    /// doesn't run to completion after receiving any of the first two signals.
383    pub async fn terminate(
384        &mut self,
385        interrupt_timeout: Duration,
386        terminate_timeout: Duration,
387    ) -> Result<ExitStatus, TerminationError> {
388        // Whether or not this function will ultimately succeed, we tried to terminate the process.
389        // Dropping this handle should not create any on-drop error anymore.
390        self.must_not_be_terminated();
391
392        self.send_interrupt_signal()
393            .map_err(|err| TerminationError::SignallingFailed {
394                source: err,
395                signal: "SIGINT",
396            })?;
397
398        match self.wait_for_completion(Some(interrupt_timeout)).await {
399            Ok(exit_status) => Ok(exit_status),
400            Err(not_terminated_after_sigint) => {
401                tracing::warn!(
402                    process = %self.name,
403                    error = %not_terminated_after_sigint,
404                    "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
405                );
406
407                self.send_terminate_signal()
408                    .map_err(|err| TerminationError::SignallingFailed {
409                        source: err,
410                        signal: "SIGTERM",
411                    })?;
412
413                match self.wait_for_completion(Some(terminate_timeout)).await {
414                    Ok(exit_status) => Ok(exit_status),
415                    Err(not_terminated_after_sigterm) => {
416                        tracing::warn!(
417                            process = %self.name,
418                            error = %not_terminated_after_sigterm,
419                            "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
420                        );
421
422                        match self.kill().await {
423                            Ok(()) => {
424                                // Note: A SIGKILL should typically (somewhat) immediately lead to
425                                // termination of the process. But there are cases in which even
426                                // a SIGKILL does not / cannot / will not kill a process.
427                                // Something must have gone horribly wrong then...
428                                // But: We do not want to wait indefinitely in case this happens
429                                // and therefore wait (at max) for a fixed three seconds after any
430                                // SIGKILL event.
431                                match self.wait_for_completion(Some(Duration::from_secs(3))).await {
432                                    Ok(exit_status) => Ok(exit_status),
433                                    Err(not_terminated_after_sigkill) => {
434                                        // Unlikely. See the note above.
435                                        tracing::error!(
436                                            "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
437                                            self.name
438                                        );
439                                        Err(TerminationError::TerminationFailed {
440                                            not_terminated_after_sigint,
441                                            not_terminated_after_sigterm,
442                                            not_terminated_after_sigkill,
443                                        })
444                                    }
445                                }
446                            }
447                            Err(not_terminated_after_sigkill) => {
448                                tracing::error!(
449                                    process = %self.name,
450                                    error = %not_terminated_after_sigkill,
451                                    "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
452                                );
453
454                                Err(TerminationError::TerminationFailed {
455                                    not_terminated_after_sigint,
456                                    not_terminated_after_sigterm,
457                                    not_terminated_after_sigkill,
458                                })
459                            }
460                        }
461                    }
462                }
463            }
464        }
465    }
466
467    pub async fn kill(&mut self) -> io::Result<()> {
468        self.child.kill().await
469    }
470
471    /// Successfully awaiting the completion of the process will unset the
472    /// "must be terminated" setting, as a successfully awaited process is already terminated.
473    /// Dropping this `ProcessHandle` after calling `wait` should never lead to a
474    /// "must be terminated" panic being raised.
475    async fn wait(&mut self) -> io::Result<ExitStatus> {
476        match self.child.wait().await {
477            Ok(status) => {
478                self.must_not_be_terminated();
479                Ok(status)
480            }
481            Err(err) => Err(err),
482        }
483    }
484
485    /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
486    pub async fn wait_for_completion(
487        &mut self,
488        timeout: Option<Duration>,
489    ) -> io::Result<ExitStatus> {
490        match timeout {
491            None => self.wait().await,
492            Some(timeout) => match tokio::time::timeout(timeout, self.wait()).await {
493                Ok(exit_status) => exit_status,
494                Err(err) => Err(err.into()),
495            },
496        }
497    }
498
499    pub async fn wait_for_completion_or_terminate(
500        &mut self,
501        wait_timeout: Duration,
502        interrupt_timeout: Duration,
503        terminate_timeout: Duration,
504    ) -> Result<ExitStatus, TerminationError> {
505        match self.wait_for_completion(Some(wait_timeout)).await {
506            Ok(exit_status) => Ok(exit_status),
507            Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
508        }
509    }
510
511    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
512    /// stdout and stderr output streams.
513    pub fn into_inner(self) -> (Child, O, O) {
514        (self.child, self.std_out_stream, self.std_err_stream)
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use assertr::prelude::*;
522
523    #[tokio::test]
524    async fn test_termination() {
525        let mut cmd = tokio::process::Command::new("sleep");
526        cmd.arg("5");
527
528        let started_at = jiff::Zoned::now();
529        let mut handle = ProcessHandle::<BroadcastOutputStream>::spawn("sleep", cmd).unwrap();
530        tokio::time::sleep(Duration::from_millis(100)).await;
531        let exit_status = handle
532            .terminate(Duration::from_secs(1), Duration::from_secs(1))
533            .await
534            .unwrap();
535        let terminated_at = jiff::Zoned::now();
536
537        // We terminate after roughly 100 ms of waiting.
538        // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
539        // We can increase this if the test should turn out to be flaky.
540        let ran_for = started_at.duration_until(&terminated_at);
541        assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
542
543        // When terminated, we do not get an exit code (unix).
544        assert_that(exit_status.code()).is_none();
545    }
546}