tokio_process_tools/
process_handle.rs

1use crate::output_stream::broadcast::BroadcastOutputStream;
2use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
3use crate::output_stream::{BackpressureControl, FromStreamOptions};
4use crate::panic_on_drop::PanicOnDrop;
5use crate::terminate_on_drop::TerminateOnDrop;
6use crate::{CollectorError, LineParsingOptions, OutputStream, signal};
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::io;
10use std::process::{ExitStatus, Stdio};
11use std::time::Duration;
12use thiserror::Error;
13use tokio::process::Child;
14
15#[derive(Debug, Error)]
16pub enum TerminationError {
17    #[error("Failed to send '{signal}' signal to process: {source}")]
18    SignallingFailed {
19        source: io::Error,
20        signal: &'static str,
21    },
22
23    #[error(
24        "Failed to terminate process. Graceful SIGINT termination failure: {not_terminated_after_sigint}. Graceful SIGTERM termination failure: {not_terminated_after_sigterm}. Forceful termination failure: {not_terminated_after_sigkill}"
25    )]
26    TerminationFailed {
27        not_terminated_after_sigint: io::Error,
28        not_terminated_after_sigterm: io::Error,
29        not_terminated_after_sigkill: io::Error,
30    },
31}
32
33/// Represents the running state of a process.
34#[derive(Debug)]
35pub enum RunningState {
36    /// The process is still running.
37    Running,
38
39    /// The process has terminated with the given exit status.
40    Terminated(ExitStatus),
41
42    /// Failed to determine process state.
43    Uncertain(io::Error),
44}
45
46impl RunningState {
47    pub fn as_bool(&self) -> bool {
48        match self {
49            RunningState::Running => true,
50            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
51        }
52    }
53}
54
55impl From<RunningState> for bool {
56    fn from(is_running: RunningState) -> Self {
57        match is_running {
58            RunningState::Running => true,
59            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
60        }
61    }
62}
63
64/// Errors that can occur when waiting for process output.
65#[derive(Debug, Error)]
66pub enum WaitError {
67    #[error("A general io error occurred")]
68    IoError(#[from] io::Error),
69
70    #[error("Collector failed")]
71    CollectorFailed(#[from] CollectorError),
72}
73
74#[derive(Debug)]
75pub struct ProcessHandle<O: OutputStream> {
76    pub(crate) name: Cow<'static, str>,
77    child: Child,
78    std_out_stream: O,
79    std_err_stream: O,
80    panic_on_drop: Option<PanicOnDrop>,
81}
82
83impl ProcessHandle<BroadcastOutputStream> {
84    pub fn spawn(
85        name: impl Into<Cow<'static, str>>,
86        cmd: tokio::process::Command,
87    ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
88        Self::spawn_with_capacity(name, cmd, 128, 128)
89    }
90
91    pub fn spawn_with_capacity(
92        name: impl Into<Cow<'static, str>>,
93        mut cmd: tokio::process::Command,
94        stdout_channel_capacity: usize,
95        stderr_channel_capacity: usize,
96    ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
97        Self::prepare_command(&mut cmd).spawn().map(|child| {
98            Self::new_from_child_with_piped_io_and_capacity(
99                name,
100                child,
101                stdout_channel_capacity,
102                stderr_channel_capacity,
103            )
104        })
105    }
106
107    fn new_from_child_with_piped_io_and_capacity(
108        name: impl Into<Cow<'static, str>>,
109        mut child: Child,
110        stdout_channel_capacity: usize,
111        stderr_channel_capacity: usize,
112    ) -> ProcessHandle<BroadcastOutputStream> {
113        let stdout = child
114            .stdout
115            .take()
116            .expect("Child process stdout wasn't captured");
117        let stderr = child
118            .stderr
119            .take()
120            .expect("Child process stderr wasn't captured");
121
122        let (child, std_out_stream, std_err_stream) = (
123            child,
124            BroadcastOutputStream::from_stream(
125                stdout,
126                FromStreamOptions {
127                    channel_capacity: stdout_channel_capacity,
128                    ..Default::default()
129                },
130            ),
131            BroadcastOutputStream::from_stream(
132                stderr,
133                FromStreamOptions {
134                    channel_capacity: stderr_channel_capacity,
135                    ..Default::default()
136                },
137            ),
138        );
139
140        let mut this = ProcessHandle {
141            name: name.into(),
142            child,
143            std_out_stream,
144            std_err_stream,
145            panic_on_drop: None,
146        };
147        this.must_be_terminated();
148        this
149    }
150
151    pub async fn wait_with_output(
152        &mut self,
153        options: LineParsingOptions,
154    ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
155        let out_collector = self.std_out_stream.collect_lines_into_vec(options);
156        let err_collector = self.std_err_stream.collect_lines_into_vec(options);
157
158        let status = self.wait().await?;
159        let std_out = out_collector.cancel().await?;
160        let std_err = err_collector.cancel().await?;
161
162        Ok((status, std_out, std_err))
163    }
164}
165
166impl ProcessHandle<SingleSubscriberOutputStream> {
167    pub fn spawn(
168        name: impl Into<Cow<'static, str>>,
169        cmd: tokio::process::Command,
170    ) -> io::Result<Self> {
171        Self::spawn_with_capacity(name, cmd, 128, 128)
172    }
173
174    pub fn spawn_with_capacity(
175        name: impl Into<Cow<'static, str>>,
176        mut cmd: tokio::process::Command,
177        stdout_channel_capacity: usize,
178        stderr_channel_capacity: usize,
179    ) -> io::Result<Self> {
180        Self::prepare_command(&mut cmd).spawn().map(|child| {
181            Self::new_from_child_with_piped_io_and_capacity(
182                name,
183                child,
184                stdout_channel_capacity,
185                stderr_channel_capacity,
186            )
187        })
188    }
189
190    fn new_from_child_with_piped_io_and_capacity(
191        name: impl Into<Cow<'static, str>>,
192        mut child: Child,
193        stdout_channel_capacity: usize,
194        stderr_channel_capacity: usize,
195    ) -> Self {
196        let stdout = child
197            .stdout
198            .take()
199            .expect("Child process stdout wasn't captured");
200        let stderr = child
201            .stderr
202            .take()
203            .expect("Child process stderr wasn't captured");
204
205        let (child, std_out_stream, std_err_stream) = (
206            child,
207            SingleSubscriberOutputStream::from_stream(
208                stdout,
209                BackpressureControl::DropLatestIncomingIfBufferFull,
210                FromStreamOptions {
211                    channel_capacity: stdout_channel_capacity,
212                    ..Default::default()
213                },
214            ),
215            SingleSubscriberOutputStream::from_stream(
216                stderr,
217                BackpressureControl::DropLatestIncomingIfBufferFull,
218                FromStreamOptions {
219                    channel_capacity: stderr_channel_capacity,
220                    ..Default::default()
221                },
222            ),
223        );
224
225        let mut this = ProcessHandle {
226            name: name.into(),
227            child,
228            std_out_stream,
229            std_err_stream,
230            panic_on_drop: None,
231        };
232        this.must_be_terminated();
233        this
234    }
235
236    pub async fn wait_with_output(
237        &mut self,
238        options: LineParsingOptions,
239    ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
240        let out_collector = self.std_out_stream.collect_lines_into_vec(options);
241        let err_collector = self.std_err_stream.collect_lines_into_vec(options);
242
243        let status = self.wait().await?;
244        let std_out = out_collector.cancel().await?;
245        let std_err = err_collector.cancel().await?;
246
247        Ok((status, std_out, std_err))
248    }
249}
250
251impl<O: OutputStream> ProcessHandle<O> {
252    /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
253    /// which works more like `killpg`. Sending to the current process ID will likely trigger
254    /// undefined behavior of sending the event to every process that's attached to the console,
255    /// i.e. sending the event to group ID 0. Therefore, we need to create a new process group
256    /// for the child process we are about to spawn.
257    ///
258    /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
259    fn prepare_platform_specifics(
260        command: &mut tokio::process::Command,
261    ) -> &mut tokio::process::Command {
262        #[cfg(windows)]
263        {
264            use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
265
266            let flag = if self.graceful_exit {
267                CREATE_NEW_PROCESS_GROUP.0
268            } else {
269                0
270            };
271            command.creation_flags(flag)
272        }
273        #[cfg(not(windows))]
274        {
275            command
276        }
277    }
278
279    fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
280        Self::prepare_platform_specifics(command)
281            .stdout(Stdio::piped())
282            .stderr(Stdio::piped())
283            // It is much too easy to leave dangling resources here and there.
284            // This library tries to make it clear and encourage users to terminate spawned
285            // processes appropriately. If not done so anyway, this acts as a "last resort"
286            // type of solution, less graceful as the `terminate_on_drop` effect but at least
287            // capable of cleaning up.
288            .kill_on_drop(true)
289    }
290
291    pub fn id(&self) -> Option<u32> {
292        self.child.id()
293    }
294
295    //noinspection RsSelfConvention
296    pub fn is_running(&mut self) -> RunningState {
297        match self.child.try_wait() {
298            Ok(None) => RunningState::Running,
299            Ok(Some(exit_status)) => {
300                self.must_not_be_terminated();
301                RunningState::Terminated(exit_status)
302            }
303            Err(err) => RunningState::Uncertain(err),
304        }
305    }
306
307    pub fn stdout(&self) -> &O {
308        &self.std_out_stream
309    }
310    pub fn stdout_mut(&mut self) -> &mut O {
311        &mut self.std_out_stream
312    }
313
314    pub fn stderr(&self) -> &O {
315        &self.std_err_stream
316    }
317
318    pub fn stderr_mut(&mut self) -> &mut O {
319        &mut self.std_err_stream
320    }
321
322    /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
323    ///
324    /// This method enables a safeguard that ensures that the process represented by this
325    /// `ProcessHandle` is properly terminated or awaited before being dropped.
326    /// If `must_be_terminated` is set and the `ProcessHandle` is
327    /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
328    /// prevent silent failure-states, ensuring that system resources are handled correctly.
329    ///
330    /// You typically do not need to call this, as every ProcessHandle is marked by default.
331    /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
332    /// process without terminating it.
333    ///
334    /// # Panic
335    ///
336    /// If the `ProcessHandle` is dropped without being awaited or terminated
337    /// after calling this method, a panic will occur with a descriptive message
338    /// to inform about the incorrect usage.
339    pub fn must_be_terminated(&mut self) {
340        self.panic_on_drop = Some(PanicOnDrop {
341            resource_name: "ProcessHandle".into(),
342            details: "Call `terminate()` before the type is dropped!".into(),
343            armed: true,
344        });
345    }
346
347    pub fn must_not_be_terminated(&mut self) {
348        if let Some(mut it) = self.panic_on_drop.take() {
349            it.defuse()
350        }
351    }
352
353    /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
354    ///
355    /// Prefer manual termination of the process or awaiting it and relying on the (automatically
356    /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
357    /// nor terminated before being dropped.
358    pub fn terminate_on_drop(
359        self,
360        graceful_termination_timeout: Duration,
361        forceful_termination_timeout: Duration,
362    ) -> TerminateOnDrop<O> {
363        TerminateOnDrop {
364            process_handle: self,
365            interrupt_timeout: graceful_termination_timeout,
366            terminate_timeout: forceful_termination_timeout,
367        }
368    }
369
370    /// Manually sed a `SIGINT` on unix or equivalent on Windows to this process.
371    ///
372    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
373    pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
374        signal::send_interrupt(&self.child)
375    }
376
377    /// Manually sed a `SIGTERM` on unix or equivalent on Windows to this process.
378    ///
379    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
380    pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
381        signal::send_terminate(&self.child)
382    }
383
384    /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
385    /// doesn't run to completion after receiving any of the first two signals.
386    pub async fn terminate(
387        &mut self,
388        interrupt_timeout: Duration,
389        terminate_timeout: Duration,
390    ) -> Result<ExitStatus, TerminationError> {
391        // Whether or not this function will ultimately succeed, we tried to terminate the process.
392        // Dropping this handle should not create any on-drop error anymore.
393        self.must_not_be_terminated();
394
395        self.send_interrupt_signal()
396            .map_err(|err| TerminationError::SignallingFailed {
397                source: err,
398                signal: "SIGINT",
399            })?;
400
401        match self.wait_for_completion(Some(interrupt_timeout)).await {
402            Ok(exit_status) => Ok(exit_status),
403            Err(not_terminated_after_sigint) => {
404                tracing::warn!(
405                    process = %self.name,
406                    error = %not_terminated_after_sigint,
407                    "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
408                );
409
410                self.send_terminate_signal()
411                    .map_err(|err| TerminationError::SignallingFailed {
412                        source: err,
413                        signal: "SIGTERM",
414                    })?;
415
416                match self.wait_for_completion(Some(terminate_timeout)).await {
417                    Ok(exit_status) => Ok(exit_status),
418                    Err(not_terminated_after_sigterm) => {
419                        tracing::warn!(
420                            process = %self.name,
421                            error = %not_terminated_after_sigterm,
422                            "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
423                        );
424
425                        match self.kill().await {
426                            Ok(()) => {
427                                // Note: A SIGKILL should typically (somewhat) immediately lead to
428                                // termination of the process. But there are cases in which even
429                                // a SIGKILL does not / cannot / will not kill a process.
430                                // Something must have gone horribly wrong then...
431                                // But: We do not want to wait indefinitely in case this happens
432                                // and therefore wait (at max) for a fixed three seconds after any
433                                // SIGKILL event.
434                                match self.wait_for_completion(Some(Duration::from_secs(3))).await {
435                                    Ok(exit_status) => Ok(exit_status),
436                                    Err(not_terminated_after_sigkill) => {
437                                        // Unlikely. See the note above.
438                                        tracing::error!(
439                                            "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
440                                            self.name
441                                        );
442                                        Err(TerminationError::TerminationFailed {
443                                            not_terminated_after_sigint,
444                                            not_terminated_after_sigterm,
445                                            not_terminated_after_sigkill,
446                                        })
447                                    }
448                                }
449                            }
450                            Err(not_terminated_after_sigkill) => {
451                                tracing::error!(
452                                    process = %self.name,
453                                    error = %not_terminated_after_sigkill,
454                                    "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
455                                );
456
457                                Err(TerminationError::TerminationFailed {
458                                    not_terminated_after_sigint,
459                                    not_terminated_after_sigterm,
460                                    not_terminated_after_sigkill,
461                                })
462                            }
463                        }
464                    }
465                }
466            }
467        }
468    }
469
470    pub async fn kill(&mut self) -> io::Result<()> {
471        self.child.kill().await
472    }
473
474    /// Successfully awaiting the completion of the process will unset the
475    /// "must be terminated" setting, as a successfully awaited process is already terminated.
476    /// Dropping this `ProcessHandle` after calling `wait` should never lead to a
477    /// "must be terminated" panic being raised.
478    async fn wait(&mut self) -> io::Result<ExitStatus> {
479        match self.child.wait().await {
480            Ok(status) => {
481                self.must_not_be_terminated();
482                Ok(status)
483            }
484            Err(err) => Err(err),
485        }
486    }
487
488    /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
489    pub async fn wait_for_completion(
490        &mut self,
491        timeout: Option<Duration>,
492    ) -> io::Result<ExitStatus> {
493        match timeout {
494            None => self.wait().await,
495            Some(timeout) => match tokio::time::timeout(timeout, self.wait()).await {
496                Ok(exit_status) => exit_status,
497                Err(err) => Err(err.into()),
498            },
499        }
500    }
501
502    pub async fn wait_for_completion_or_terminate(
503        &mut self,
504        wait_timeout: Duration,
505        interrupt_timeout: Duration,
506        terminate_timeout: Duration,
507    ) -> Result<ExitStatus, TerminationError> {
508        match self.wait_for_completion(Some(wait_timeout)).await {
509            Ok(exit_status) => Ok(exit_status),
510            Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
511        }
512    }
513
514    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
515    /// stdout and stderr output streams.
516    pub fn into_inner(self) -> (Child, O, O) {
517        (self.child, self.std_out_stream, self.std_err_stream)
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use assertr::prelude::*;
525
526    #[tokio::test]
527    async fn test_termination() {
528        let mut cmd = tokio::process::Command::new("sleep");
529        cmd.arg("5");
530
531        let started_at = jiff::Zoned::now();
532        let mut handle = ProcessHandle::<BroadcastOutputStream>::spawn("sleep", cmd).unwrap();
533        tokio::time::sleep(Duration::from_millis(100)).await;
534        let exit_status = handle
535            .terminate(Duration::from_secs(1), Duration::from_secs(1))
536            .await
537            .unwrap();
538        let terminated_at = jiff::Zoned::now();
539
540        // We terminate after roughly 100 ms of waiting.
541        // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
542        // We can increase this if the test should turn out to be flaky.
543        let ran_for = started_at.duration_until(&terminated_at);
544        assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
545
546        // When terminated, we do not get an exit code (unix).
547        assert_that(exit_status.code()).is_none();
548    }
549}