tokio_process_tools/process_handle.rs
1use crate::output::Output;
2use crate::output_stream::broadcast::BroadcastOutputStream;
3use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
4use crate::output_stream::{BackpressureControl, FromStreamOptions};
5use crate::panic_on_drop::PanicOnDrop;
6use crate::terminate_on_drop::TerminateOnDrop;
7use crate::{CollectorError, LineParsingOptions, OutputStream, signal};
8use std::borrow::Cow;
9use std::fmt::Debug;
10use std::io;
11use std::process::{ExitStatus, Stdio};
12use std::time::Duration;
13use thiserror::Error;
14use tokio::process::Child;
15
16/// Errors that can occur when terminating a process.
17#[derive(Debug, Error)]
18pub enum TerminationError {
19 /// Failed to send a signal to the process.
20 #[error("Failed to send '{signal}' signal to process: {source}")]
21 SignallingFailed {
22 /// The underlying IO error.
23 source: io::Error,
24 /// The signal that could not be sent.
25 signal: &'static str,
26 },
27
28 /// Failed to terminate the process after trying all signals (SIGINT, SIGTERM, SIGKILL).
29 #[error(
30 "Failed to terminate process. Graceful SIGINT termination failure: {not_terminated_after_sigint}. Graceful SIGTERM termination failure: {not_terminated_after_sigterm}. Forceful termination failure: {not_terminated_after_sigkill}"
31 )]
32 TerminationFailed {
33 /// Error from waiting after sending SIGINT.
34 not_terminated_after_sigint: io::Error,
35 /// Error from waiting after sending SIGTERM.
36 not_terminated_after_sigterm: io::Error,
37 /// Error from waiting after sending SIGKILL.
38 not_terminated_after_sigkill: io::Error,
39 },
40}
41
42/// Represents the running state of a process.
43#[derive(Debug)]
44pub enum RunningState {
45 /// The process is still running.
46 Running,
47
48 /// The process has terminated with the given exit status.
49 Terminated(ExitStatus),
50
51 /// Failed to determine process state.
52 Uncertain(io::Error),
53}
54
55impl RunningState {
56 /// Returns `true` if the process is running, `false` otherwise.
57 pub fn as_bool(&self) -> bool {
58 match self {
59 RunningState::Running => true,
60 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
61 }
62 }
63}
64
65impl From<RunningState> for bool {
66 fn from(is_running: RunningState) -> Self {
67 match is_running {
68 RunningState::Running => true,
69 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
70 }
71 }
72}
73
74/// Errors that can occur when waiting for process output.
75#[derive(Debug, Error)]
76pub enum WaitError {
77 /// A general IO error occurred.
78 #[error("A general io error occurred")]
79 IoError(#[from] io::Error),
80
81 /// Could not terminate the process.
82 #[error("Could not terminate the process")]
83 TerminationError(#[from] TerminationError),
84
85 /// Collector failed to collect output.
86 #[error("Collector failed to collect output")]
87 CollectorFailed(#[from] CollectorError),
88}
89
90/// A handle to a spawned process with captured stdout/stderr streams.
91///
92/// This type provides methods for waiting on process completion, terminating the process,
93/// and accessing its output streams. By default, processes must be explicitly waited on
94/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
95///
96/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
97/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
98#[derive(Debug)]
99pub struct ProcessHandle<O: OutputStream> {
100 pub(crate) name: Cow<'static, str>,
101 child: Child,
102 std_out_stream: O,
103 std_err_stream: O,
104 panic_on_drop: Option<PanicOnDrop>,
105}
106
107impl ProcessHandle<BroadcastOutputStream> {
108 /// Spawns a new process with broadcast output streams.
109 ///
110 /// Uses a default channel capacity of 128 for both stdout and stderr.
111 pub fn spawn(
112 name: impl Into<Cow<'static, str>>,
113 cmd: tokio::process::Command,
114 ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
115 Self::spawn_with_capacity(name, cmd, 128, 128)
116 }
117
118 /// Spawns a new process with broadcast output streams and custom channel capacities.
119 pub fn spawn_with_capacity(
120 name: impl Into<Cow<'static, str>>,
121 mut cmd: tokio::process::Command,
122 stdout_channel_capacity: usize,
123 stderr_channel_capacity: usize,
124 ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
125 Self::prepare_command(&mut cmd).spawn().map(|child| {
126 Self::new_from_child_with_piped_io_and_capacity(
127 name,
128 child,
129 stdout_channel_capacity,
130 stderr_channel_capacity,
131 )
132 })
133 }
134
135 fn new_from_child_with_piped_io_and_capacity(
136 name: impl Into<Cow<'static, str>>,
137 mut child: Child,
138 stdout_channel_capacity: usize,
139 stderr_channel_capacity: usize,
140 ) -> ProcessHandle<BroadcastOutputStream> {
141 let stdout = child
142 .stdout
143 .take()
144 .expect("Child process stdout wasn't captured");
145 let stderr = child
146 .stderr
147 .take()
148 .expect("Child process stderr wasn't captured");
149
150 let (child, std_out_stream, std_err_stream) = (
151 child,
152 BroadcastOutputStream::from_stream(
153 stdout,
154 FromStreamOptions {
155 channel_capacity: stdout_channel_capacity,
156 ..Default::default()
157 },
158 ),
159 BroadcastOutputStream::from_stream(
160 stderr,
161 FromStreamOptions {
162 channel_capacity: stderr_channel_capacity,
163 ..Default::default()
164 },
165 ),
166 );
167
168 let mut this = ProcessHandle {
169 name: name.into(),
170 child,
171 std_out_stream,
172 std_err_stream,
173 panic_on_drop: None,
174 };
175 this.must_be_terminated();
176 this
177 }
178
179 /// Convenience function, waiting for the process to complete using
180 /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
181 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
182 ///
183 /// You may want to destructure this using:
184 /// ```no_run
185 /// # use tokio_process_tools::*;
186 /// # use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
187 /// # tokio_test::block_on(async {
188 /// # let mut proc = ProcessHandle::<SingleSubscriberOutputStream>::spawn("cmd", tokio::process::Command::new("ls")).unwrap();
189 /// let Output {
190 /// status,
191 /// stdout,
192 /// stderr
193 /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
194 /// # });
195 /// ```
196 pub async fn wait_for_completion_with_output(
197 &mut self,
198 timeout: Option<Duration>,
199 options: LineParsingOptions,
200 ) -> Result<Output, WaitError> {
201 let out_collector = self.stdout().collect_lines_into_vec(options);
202 let err_collector = self.stderr().collect_lines_into_vec(options);
203
204 let status = self.wait_for_completion(timeout).await?;
205
206 let stdout = out_collector.wait().await?;
207 let stderr = err_collector.wait().await?;
208
209 Ok(Output {
210 status,
211 stdout,
212 stderr,
213 })
214 }
215
216 /// Convenience function, waiting for the process to complete using
217 /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
218 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
219 pub async fn wait_for_completion_with_output_or_terminate(
220 &mut self,
221 wait_timeout: Duration,
222 interrupt_timeout: Duration,
223 terminate_timeout: Duration,
224 options: LineParsingOptions,
225 ) -> Result<Output, WaitError> {
226 let out_collector = self.stdout().collect_lines_into_vec(options);
227 let err_collector = self.stderr().collect_lines_into_vec(options);
228
229 let status = self
230 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
231 .await?;
232
233 let stdout = out_collector.wait().await?;
234 let stderr = err_collector.wait().await?;
235
236 Ok(Output {
237 status,
238 stdout,
239 stderr,
240 })
241 }
242}
243
244impl ProcessHandle<SingleSubscriberOutputStream> {
245 /// Spawns a new process with single subscriber output streams.
246 ///
247 /// Uses a default channel capacity of 128 for both stdout and stderr.
248 pub fn spawn(
249 name: impl Into<Cow<'static, str>>,
250 cmd: tokio::process::Command,
251 ) -> io::Result<Self> {
252 Self::spawn_with_capacity(name, cmd, 128, 128)
253 }
254
255 /// Spawns a new process with single subscriber output streams and custom channel capacities.
256 pub fn spawn_with_capacity(
257 name: impl Into<Cow<'static, str>>,
258 mut cmd: tokio::process::Command,
259 stdout_channel_capacity: usize,
260 stderr_channel_capacity: usize,
261 ) -> io::Result<Self> {
262 Self::prepare_command(&mut cmd).spawn().map(|child| {
263 Self::new_from_child_with_piped_io_and_capacity(
264 name,
265 child,
266 stdout_channel_capacity,
267 stderr_channel_capacity,
268 )
269 })
270 }
271
272 fn new_from_child_with_piped_io_and_capacity(
273 name: impl Into<Cow<'static, str>>,
274 mut child: Child,
275 stdout_channel_capacity: usize,
276 stderr_channel_capacity: usize,
277 ) -> Self {
278 let stdout = child
279 .stdout
280 .take()
281 .expect("Child process stdout wasn't captured");
282 let stderr = child
283 .stderr
284 .take()
285 .expect("Child process stderr wasn't captured");
286
287 let (child, std_out_stream, std_err_stream) = (
288 child,
289 SingleSubscriberOutputStream::from_stream(
290 stdout,
291 BackpressureControl::DropLatestIncomingIfBufferFull,
292 FromStreamOptions {
293 channel_capacity: stdout_channel_capacity,
294 ..Default::default()
295 },
296 ),
297 SingleSubscriberOutputStream::from_stream(
298 stderr,
299 BackpressureControl::DropLatestIncomingIfBufferFull,
300 FromStreamOptions {
301 channel_capacity: stderr_channel_capacity,
302 ..Default::default()
303 },
304 ),
305 );
306
307 let mut this = ProcessHandle {
308 name: name.into(),
309 child,
310 std_out_stream,
311 std_err_stream,
312 panic_on_drop: None,
313 };
314 this.must_be_terminated();
315 this
316 }
317
318 /// Convenience function, waiting for the process to complete using
319 /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
320 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
321 ///
322 /// You may want to destructure this using:
323 /// ```no_run
324 /// # use tokio_process_tools::*;
325 /// # use tokio_process_tools::single_subscriber::SingleSubscriberOutputStream;
326 /// # tokio_test::block_on(async {
327 /// # let mut proc = ProcessHandle::<SingleSubscriberOutputStream>::spawn("cmd", tokio::process::Command::new("ls")).unwrap();
328 /// let Output {
329 /// status,
330 /// stdout,
331 /// stderr
332 /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
333 /// # });
334 /// ```
335 pub async fn wait_for_completion_with_output(
336 &mut self,
337 timeout: Option<Duration>,
338 options: LineParsingOptions,
339 ) -> Result<Output, WaitError> {
340 let out_collector = self.stdout_mut().collect_lines_into_vec(options);
341 let err_collector = self.stderr_mut().collect_lines_into_vec(options);
342
343 let status = self.wait_for_completion(timeout).await?;
344
345 let stdout = out_collector.wait().await?;
346 let stderr = err_collector.wait().await?;
347
348 Ok(Output {
349 status,
350 stdout,
351 stderr,
352 })
353 }
354
355 /// Convenience function, waiting for the process to complete using
356 /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
357 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
358 pub async fn wait_for_completion_with_output_or_terminate(
359 &mut self,
360 wait_timeout: Duration,
361 interrupt_timeout: Duration,
362 terminate_timeout: Duration,
363 options: LineParsingOptions,
364 ) -> Result<Output, WaitError> {
365 let out_collector = self.stdout_mut().collect_lines_into_vec(options);
366 let err_collector = self.stderr_mut().collect_lines_into_vec(options);
367
368 let status = self
369 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
370 .await?;
371
372 let stdout = out_collector.wait().await?;
373 let stderr = err_collector.wait().await?;
374
375 Ok(Output {
376 status,
377 stdout,
378 stderr,
379 })
380 }
381}
382
383impl<O: OutputStream> ProcessHandle<O> {
384 /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
385 /// which works more like `killpg`. Sending to the current process ID will likely trigger
386 /// undefined behavior of sending the event to every process that's attached to the console,
387 /// i.e. sending the event to group ID 0. Therefore, we need to create a new process group
388 /// for the child process we are about to spawn.
389 ///
390 /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
391 fn prepare_platform_specifics(
392 command: &mut tokio::process::Command,
393 ) -> &mut tokio::process::Command {
394 #[cfg(windows)]
395 {
396 use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
397
398 let flag = if self.graceful_exit {
399 CREATE_NEW_PROCESS_GROUP.0
400 } else {
401 0
402 };
403 command.creation_flags(flag)
404 }
405 #[cfg(not(windows))]
406 {
407 command
408 }
409 }
410
411 fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
412 Self::prepare_platform_specifics(command)
413 .stdout(Stdio::piped())
414 .stderr(Stdio::piped())
415 // It is much too easy to leave dangling resources here and there.
416 // This library tries to make it clear and encourage users to terminate spawned
417 // processes appropriately. If not done so anyway, this acts as a "last resort"
418 // type of solution, less graceful as the `terminate_on_drop` effect but at least
419 // capable of cleaning up.
420 .kill_on_drop(true)
421 }
422
423 /// Returns the OS process ID if the process hasn't exited yet.
424 ///
425 /// Once this process has been polled to completion this will return None.
426 pub fn id(&self) -> Option<u32> {
427 self.child.id()
428 }
429
430 /// Checks if the process is currently running.
431 ///
432 /// Returns [`RunningState::Running`] if the process is still running,
433 /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
434 /// if the state could not be determined.
435 //noinspection RsSelfConvention
436 pub fn is_running(&mut self) -> RunningState {
437 match self.child.try_wait() {
438 Ok(None) => RunningState::Running,
439 Ok(Some(exit_status)) => {
440 self.must_not_be_terminated();
441 RunningState::Terminated(exit_status)
442 }
443 Err(err) => RunningState::Uncertain(err),
444 }
445 }
446
447 /// Returns a reference to the stdout stream.
448 pub fn stdout(&self) -> &O {
449 &self.std_out_stream
450 }
451
452 /// Returns a mutable reference to the stdout stream.
453 pub fn stdout_mut(&mut self) -> &mut O {
454 &mut self.std_out_stream
455 }
456
457 /// Returns a reference to the stderr stream.
458 pub fn stderr(&self) -> &O {
459 &self.std_err_stream
460 }
461
462 /// Returns a mutable reference to the stderr stream.
463 pub fn stderr_mut(&mut self) -> &mut O {
464 &mut self.std_err_stream
465 }
466
467 /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
468 ///
469 /// This method enables a safeguard that ensures that the process represented by this
470 /// `ProcessHandle` is properly terminated or awaited before being dropped.
471 /// If `must_be_terminated` is set and the `ProcessHandle` is
472 /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
473 /// prevent silent failure-states, ensuring that system resources are handled correctly.
474 ///
475 /// You typically do not need to call this, as every ProcessHandle is marked by default.
476 /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
477 /// process without terminating it.
478 ///
479 /// # Panic
480 ///
481 /// If the `ProcessHandle` is dropped without being awaited or terminated
482 /// after calling this method, a panic will occur with a descriptive message
483 /// to inform about the incorrect usage.
484 pub fn must_be_terminated(&mut self) {
485 self.panic_on_drop = Some(PanicOnDrop::new(
486 "tokio_process_tools::ProcessHandle",
487 "The process was not terminated.",
488 "Call `wait_for_completion` or `terminate` before the type is dropped!",
489 ));
490 }
491
492 /// Disables the panic-on-drop safeguard, allowing the spawned process to be kept running
493 /// uncontrolled in the background, while this handle can safely be dropped.
494 pub fn must_not_be_terminated(&mut self) {
495 if let Some(mut it) = self.panic_on_drop.take() {
496 it.defuse()
497 }
498 }
499
500 /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
501 /// automatically when this handle is dropped.
502 ///
503 /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
504 ///
505 /// Prefer manual termination of the process or awaiting it and relying on the (automatically
506 /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
507 /// nor terminated before being dropped.
508 pub fn terminate_on_drop(
509 self,
510 graceful_termination_timeout: Duration,
511 forceful_termination_timeout: Duration,
512 ) -> TerminateOnDrop<O> {
513 TerminateOnDrop {
514 process_handle: self,
515 interrupt_timeout: graceful_termination_timeout,
516 terminate_timeout: forceful_termination_timeout,
517 }
518 }
519
520 /// Manually sed a `SIGINT` on unix or equivalent on Windows to this process.
521 ///
522 /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
523 pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
524 signal::send_interrupt(&self.child)
525 }
526
527 /// Manually sed a `SIGTERM` on unix or equivalent on Windows to this process.
528 ///
529 /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
530 pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
531 signal::send_terminate(&self.child)
532 }
533
534 /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
535 /// doesn't run to completion after receiving any of the first two signals.
536 ///
537 /// This handle can be dropped safely after this call returned, no matter the outcome.
538 /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
539 pub async fn terminate(
540 &mut self,
541 interrupt_timeout: Duration,
542 terminate_timeout: Duration,
543 ) -> Result<ExitStatus, TerminationError> {
544 // Whether or not this function will ultimately succeed, we tried our best to terminate
545 // this process.
546 // Dropping this handle should not create any on-drop panic anymore.
547 // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
548 self.must_not_be_terminated();
549
550 self.send_interrupt_signal()
551 .map_err(|err| TerminationError::SignallingFailed {
552 source: err,
553 signal: "SIGINT",
554 })?;
555
556 match self.wait_for_completion(Some(interrupt_timeout)).await {
557 Ok(exit_status) => Ok(exit_status),
558 Err(not_terminated_after_sigint) => {
559 tracing::warn!(
560 process = %self.name,
561 error = %not_terminated_after_sigint,
562 "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
563 );
564
565 self.send_terminate_signal()
566 .map_err(|err| TerminationError::SignallingFailed {
567 source: err,
568 signal: "SIGTERM",
569 })?;
570
571 match self.wait_for_completion(Some(terminate_timeout)).await {
572 Ok(exit_status) => Ok(exit_status),
573 Err(not_terminated_after_sigterm) => {
574 tracing::warn!(
575 process = %self.name,
576 error = %not_terminated_after_sigterm,
577 "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
578 );
579
580 match self.kill().await {
581 Ok(()) => {
582 // Note: A SIGKILL should typically (somewhat) immediately lead to
583 // termination of the process. But there are cases in which even
584 // a SIGKILL does not / cannot / will not kill a process.
585 // Something must have gone horribly wrong then...
586 // But: We do not want to wait indefinitely in case this happens
587 // and therefore wait (at max) for a fixed three seconds after any
588 // SIGKILL event.
589 match self.wait_for_completion(Some(Duration::from_secs(3))).await {
590 Ok(exit_status) => Ok(exit_status),
591 Err(not_terminated_after_sigkill) => {
592 // Unlikely. See the note above.
593 tracing::error!(
594 "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
595 self.name
596 );
597 Err(TerminationError::TerminationFailed {
598 not_terminated_after_sigint,
599 not_terminated_after_sigterm,
600 not_terminated_after_sigkill,
601 })
602 }
603 }
604 }
605 Err(not_terminated_after_sigkill) => {
606 tracing::error!(
607 process = %self.name,
608 error = %not_terminated_after_sigkill,
609 "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
610 );
611
612 Err(TerminationError::TerminationFailed {
613 not_terminated_after_sigint,
614 not_terminated_after_sigterm,
615 not_terminated_after_sigkill,
616 })
617 }
618 }
619 }
620 }
621 }
622 }
623 }
624
625 /// Forces the process to exit. Most users should call [ProcessHandle::terminate] instead.
626 ///
627 /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
628 pub async fn kill(&mut self) -> io::Result<()> {
629 self.child.kill().await
630 }
631
632 /// Successfully awaiting the completion of the process will unset the
633 /// "must be terminated" setting, as a successfully awaited process is already terminated.
634 /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
635 /// "must be terminated" panic being raised.
636 async fn wait(&mut self) -> io::Result<ExitStatus> {
637 match self.child.wait().await {
638 Ok(status) => {
639 self.must_not_be_terminated();
640 Ok(status)
641 }
642 Err(err) => Err(err),
643 }
644 }
645
646 /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
647 ///
648 /// If the timeout is reached before the process terminated, an error is returned but the
649 /// process remains untouched / keeps running.
650 /// Use [ProcessHandle::wait_for_completion_or_terminate] if you want immediate termination.
651 ///
652 /// This does not provide the processes output. You can take a look at the convenience function
653 /// [ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output] to see
654 /// how the [ProcessHandle::stdout] and [ProcessHandle::stderr] streams (also available in
655 /// *_mut variants) can be used to inspect / watch over / capture the processes output.
656 pub async fn wait_for_completion(
657 &mut self,
658 timeout: Option<Duration>,
659 ) -> io::Result<ExitStatus> {
660 match timeout {
661 None => self.wait().await,
662 Some(timeout) => match tokio::time::timeout(timeout, self.wait()).await {
663 Ok(exit_status) => exit_status,
664 Err(err) => Err(err.into()),
665 },
666 }
667 }
668
669 /// Wait for this process to run to completion within `timeout`.
670 ///
671 /// If the timeout is reached before the process terminated normally, external termination of
672 /// the process is forced through [ProcessHandle::terminate].
673 ///
674 /// Note that this function may return `Ok` even though the timeout was reached, carrying the
675 /// exit status received after sending a termination signal!
676 pub async fn wait_for_completion_or_terminate(
677 &mut self,
678 wait_timeout: Duration,
679 interrupt_timeout: Duration,
680 terminate_timeout: Duration,
681 ) -> Result<ExitStatus, TerminationError> {
682 match self.wait_for_completion(Some(wait_timeout)).await {
683 Ok(exit_status) => Ok(exit_status),
684 Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
685 }
686 }
687
688 /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
689 /// stdout and stderr output streams.
690 pub fn into_inner(self) -> (Child, O, O) {
691 (self.child, self.std_out_stream, self.std_err_stream)
692 }
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698 use assertr::prelude::*;
699
700 #[tokio::test]
701 async fn test_termination() {
702 let mut cmd = tokio::process::Command::new("sleep");
703 cmd.arg("5");
704
705 let started_at = jiff::Zoned::now();
706 let mut handle = ProcessHandle::<BroadcastOutputStream>::spawn("sleep", cmd).unwrap();
707 tokio::time::sleep(Duration::from_millis(100)).await;
708 let exit_status = handle
709 .terminate(Duration::from_secs(1), Duration::from_secs(1))
710 .await
711 .unwrap();
712 let terminated_at = jiff::Zoned::now();
713
714 // We terminate after roughly 100 ms of waiting.
715 // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
716 // We can increase this if the test should turn out to be flaky.
717 let ran_for = started_at.duration_until(&terminated_at);
718 assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
719
720 // When terminated, we do not get an exit code (unix).
721 assert_that(exit_status.code()).is_none();
722 }
723}