tokio_process_tools/process_handle.rs
1use crate::error::{SpawnError, TerminationError, WaitError};
2use crate::output::Output;
3use crate::output_stream::broadcast::BroadcastOutputStream;
4use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::{BackpressureControl, FromStreamOptions};
6use crate::panic_on_drop::PanicOnDrop;
7use crate::terminate_on_drop::TerminateOnDrop;
8use crate::{LineParsingOptions, NumBytes, OutputStream, signal};
9use std::borrow::Cow;
10use std::fmt::Debug;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13use std::time::Duration;
14use tokio::process::Child;
15
16const STDOUT_STREAM_NAME: &str = "stdout";
17const STDERR_STREAM_NAME: &str = "stderr";
18
19/// Maximum time to wait for process termination after sending SIGKILL.
20///
21/// This is a safety timeout since SIGKILL should terminate processes immediately,
22/// but there are rare cases where even SIGKILL may not work.
23const SIGKILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
24
25/// Represents the running state of a process.
26#[derive(Debug)]
27pub enum RunningState {
28 /// The process is still running.
29 Running,
30
31 /// The process has terminated with the given exit status.
32 Terminated(ExitStatus),
33
34 /// Failed to determine process state.
35 Uncertain(io::Error),
36}
37
38impl RunningState {
39 /// Returns `true` if the process is running, `false` otherwise.
40 pub fn as_bool(&self) -> bool {
41 match self {
42 RunningState::Running => true,
43 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
44 }
45 }
46}
47
48impl From<RunningState> for bool {
49 fn from(is_running: RunningState) -> Self {
50 match is_running {
51 RunningState::Running => true,
52 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
53 }
54 }
55}
56
57/// A handle to a spawned process with captured stdout/stderr streams.
58///
59/// This type provides methods for waiting on process completion, terminating the process,
60/// and accessing its output streams. By default, processes must be explicitly waited on
61/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
62///
63/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
64/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
65#[derive(Debug)]
66pub struct ProcessHandle<O: OutputStream> {
67 pub(crate) name: Cow<'static, str>,
68 child: Child,
69 std_out_stream: O,
70 std_err_stream: O,
71 panic_on_drop: Option<PanicOnDrop>,
72}
73
74impl ProcessHandle<BroadcastOutputStream> {
75 /// Spawns a new process with broadcast output streams and custom channel capacities.
76 ///
77 /// This method is intended for internal use by the `Process` builder.
78 /// Users should use `Process::new(cmd).spawn_broadcast()` instead.
79 pub(crate) fn spawn_with_capacity(
80 name: impl Into<Cow<'static, str>>,
81 mut cmd: tokio::process::Command,
82 stdout_chunk_size: NumBytes,
83 stderr_chunk_size: NumBytes,
84 stdout_channel_capacity: usize,
85 stderr_channel_capacity: usize,
86 ) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
87 let process_name = name.into();
88 Self::prepare_command(&mut cmd)
89 .spawn()
90 .map(|child| {
91 Self::new_from_child_with_piped_io_and_capacity(
92 process_name.clone(),
93 child,
94 stdout_chunk_size,
95 stderr_chunk_size,
96 stdout_channel_capacity,
97 stderr_channel_capacity,
98 )
99 })
100 .map_err(|source| SpawnError::SpawnFailed {
101 process_name,
102 source,
103 })
104 }
105
106 fn new_from_child_with_piped_io_and_capacity(
107 name: impl Into<Cow<'static, str>>,
108 mut child: Child,
109 stdout_chunk_size: NumBytes,
110 stderr_chunk_size: NumBytes,
111 stdout_channel_capacity: usize,
112 stderr_channel_capacity: usize,
113 ) -> ProcessHandle<BroadcastOutputStream> {
114 let stdout = child
115 .stdout
116 .take()
117 .expect("Child process stdout wasn't captured");
118 let stderr = child
119 .stderr
120 .take()
121 .expect("Child process stderr wasn't captured");
122
123 let (child, std_out_stream, std_err_stream) = (
124 child,
125 BroadcastOutputStream::from_stream(
126 stdout,
127 "stdout",
128 FromStreamOptions {
129 chunk_size: stdout_chunk_size,
130 channel_capacity: stdout_channel_capacity,
131 },
132 ),
133 BroadcastOutputStream::from_stream(
134 stderr,
135 STDERR_STREAM_NAME,
136 FromStreamOptions {
137 chunk_size: stderr_chunk_size,
138 channel_capacity: stderr_channel_capacity,
139 },
140 ),
141 );
142
143 let mut this = ProcessHandle {
144 name: name.into(),
145 child,
146 std_out_stream,
147 std_err_stream,
148 panic_on_drop: None,
149 };
150 this.must_be_terminated();
151 this
152 }
153
154 /// Convenience function, waiting for the process to complete using
155 /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
156 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
157 ///
158 /// You may want to destructure this using:
159 /// ```no_run
160 /// # use tokio::process::Command;
161 /// # use tokio_process_tools::*;
162 /// # tokio_test::block_on(async {
163 /// # let mut proc = Process::new(Command::new("ls")).spawn_broadcast().unwrap();
164 /// let Output {
165 /// status,
166 /// stdout,
167 /// stderr
168 /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
169 /// # });
170 /// ```
171 pub async fn wait_for_completion_with_output(
172 &mut self,
173 timeout: Option<Duration>,
174 options: LineParsingOptions,
175 ) -> Result<Output, WaitError> {
176 let out_collector = self.stdout().collect_lines_into_vec(options);
177 let err_collector = self.stderr().collect_lines_into_vec(options);
178
179 let status = self.wait_for_completion(timeout).await?;
180
181 let stdout = out_collector.wait().await?;
182 let stderr = err_collector.wait().await?;
183
184 Ok(Output {
185 status,
186 stdout,
187 stderr,
188 })
189 }
190
191 /// Convenience function, waiting for the process to complete using
192 /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
193 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
194 pub async fn wait_for_completion_with_output_or_terminate(
195 &mut self,
196 wait_timeout: Duration,
197 interrupt_timeout: Duration,
198 terminate_timeout: Duration,
199 options: LineParsingOptions,
200 ) -> Result<Output, WaitError> {
201 let out_collector = self.stdout().collect_lines_into_vec(options);
202 let err_collector = self.stderr().collect_lines_into_vec(options);
203
204 let status = self
205 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
206 .await?;
207
208 let stdout = out_collector.wait().await?;
209 let stderr = err_collector.wait().await?;
210
211 Ok(Output {
212 status,
213 stdout,
214 stderr,
215 })
216 }
217}
218
219impl ProcessHandle<SingleSubscriberOutputStream> {
220 /// Spawns a new process with single subscriber output streams and custom channel capacities.
221 ///
222 /// This method is intended for internal use by the `Process` builder.
223 /// Users should use `Process::new(cmd).spawn_single_subscriber()` instead.
224 pub(crate) fn spawn_with_capacity(
225 name: impl Into<Cow<'static, str>>,
226 mut cmd: tokio::process::Command,
227 stdout_chunk_size: NumBytes,
228 stderr_chunk_size: NumBytes,
229 stdout_channel_capacity: usize,
230 stderr_channel_capacity: usize,
231 ) -> Result<Self, SpawnError> {
232 let process_name = name.into();
233 Self::prepare_command(&mut cmd)
234 .spawn()
235 .map(|child| {
236 Self::new_from_child_with_piped_io_and_capacity(
237 process_name.clone(),
238 child,
239 stdout_chunk_size,
240 stderr_chunk_size,
241 stdout_channel_capacity,
242 stderr_channel_capacity,
243 )
244 })
245 .map_err(|source| SpawnError::SpawnFailed {
246 process_name,
247 source,
248 })
249 }
250
251 fn new_from_child_with_piped_io_and_capacity(
252 name: impl Into<Cow<'static, str>>,
253 mut child: Child,
254 stdout_chunk_size: NumBytes,
255 stderr_chunk_size: NumBytes,
256 stdout_channel_capacity: usize,
257 stderr_channel_capacity: usize,
258 ) -> Self {
259 let stdout = child
260 .stdout
261 .take()
262 .expect("Child process stdout wasn't captured");
263 let stderr = child
264 .stderr
265 .take()
266 .expect("Child process stderr wasn't captured");
267
268 let (child, std_out_stream, std_err_stream) = (
269 child,
270 SingleSubscriberOutputStream::from_stream(
271 stdout,
272 STDOUT_STREAM_NAME,
273 BackpressureControl::DropLatestIncomingIfBufferFull,
274 FromStreamOptions {
275 chunk_size: stdout_chunk_size,
276 channel_capacity: stdout_channel_capacity,
277 },
278 ),
279 SingleSubscriberOutputStream::from_stream(
280 stderr,
281 STDERR_STREAM_NAME,
282 BackpressureControl::DropLatestIncomingIfBufferFull,
283 FromStreamOptions {
284 chunk_size: stderr_chunk_size,
285 channel_capacity: stderr_channel_capacity,
286 },
287 ),
288 );
289
290 let mut this = ProcessHandle {
291 name: name.into(),
292 child,
293 std_out_stream,
294 std_err_stream,
295 panic_on_drop: None,
296 };
297 this.must_be_terminated();
298 this
299 }
300
301 /// Convenience function, waiting for the process to complete using
302 /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
303 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
304 ///
305 /// You may want to destructure this using:
306 /// ```no_run
307 /// # use tokio_process_tools::*;
308 /// # tokio_test::block_on(async {
309 /// # let mut proc = Process::new(tokio::process::Command::new("ls")).spawn_broadcast().unwrap();
310 /// let Output {
311 /// status,
312 /// stdout,
313 /// stderr
314 /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
315 /// # });
316 /// ```
317 pub async fn wait_for_completion_with_output(
318 &mut self,
319 timeout: Option<Duration>,
320 options: LineParsingOptions,
321 ) -> Result<Output, WaitError> {
322 let out_collector = self.stdout().collect_lines_into_vec(options);
323 let err_collector = self.stderr().collect_lines_into_vec(options);
324
325 let status = self.wait_for_completion(timeout).await?;
326
327 let stdout = out_collector.wait().await?;
328 let stderr = err_collector.wait().await?;
329
330 Ok(Output {
331 status,
332 stdout,
333 stderr,
334 })
335 }
336
337 /// Convenience function, waiting for the process to complete using
338 /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
339 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
340 pub async fn wait_for_completion_with_output_or_terminate(
341 &mut self,
342 wait_timeout: Duration,
343 interrupt_timeout: Duration,
344 terminate_timeout: Duration,
345 options: LineParsingOptions,
346 ) -> Result<Output, WaitError> {
347 let out_collector = self.stdout().collect_lines_into_vec(options);
348 let err_collector = self.stderr().collect_lines_into_vec(options);
349
350 let status = self
351 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
352 .await?;
353
354 let stdout = out_collector.wait().await?;
355 let stderr = err_collector.wait().await?;
356
357 Ok(Output {
358 status,
359 stdout,
360 stderr,
361 })
362 }
363}
364
365impl<O: OutputStream> ProcessHandle<O> {
366 /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
367 /// which works more like `killpg`. Sending to the current process ID will likely trigger
368 /// undefined behavior of sending the event to every process that's attached to the console,
369 /// i.e. sending the event to group ID 0. Therefore, we create a new process group
370 /// for the child process we are about to spawn.
371 ///
372 /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
373 fn prepare_platform_specifics(
374 command: &mut tokio::process::Command,
375 ) -> &mut tokio::process::Command {
376 #[cfg(windows)]
377 {
378 use windows_sys::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
379 command.creation_flags(CREATE_NEW_PROCESS_GROUP)
380 }
381 #[cfg(not(windows))]
382 {
383 command
384 }
385 }
386
387 fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
388 Self::prepare_platform_specifics(command)
389 .stdout(Stdio::piped())
390 .stderr(Stdio::piped())
391 // It is much too easy to leave dangling resources here and there.
392 // This library tries to make it clear and encourage users to terminate spawned
393 // processes appropriately. If not done so anyway, this acts as a "last resort"
394 // type of solution, less graceful as the `terminate_on_drop` effect but at least
395 // capable of cleaning up.
396 .kill_on_drop(true)
397 }
398
399 /// Returns the OS process ID if the process hasn't exited yet.
400 ///
401 /// Once this process has been polled to completion this will return None.
402 pub fn id(&self) -> Option<u32> {
403 self.child.id()
404 }
405
406 /// Checks if the process is currently running.
407 ///
408 /// Returns [`RunningState::Running`] if the process is still running,
409 /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
410 /// if the state could not be determined.
411 //noinspection RsSelfConvention
412 pub fn is_running(&mut self) -> RunningState {
413 match self.child.try_wait() {
414 Ok(None) => RunningState::Running,
415 Ok(Some(exit_status)) => {
416 self.must_not_be_terminated();
417 RunningState::Terminated(exit_status)
418 }
419 Err(err) => RunningState::Uncertain(err),
420 }
421 }
422
423 /// Returns a reference to the stdout stream.
424 ///
425 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
426 /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
427 /// attempts will panic with a helpful error message).
428 pub fn stdout(&self) -> &O {
429 &self.std_out_stream
430 }
431
432 /// Returns a reference to the stderr stream.
433 ///
434 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
435 /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
436 /// attempts will panic with a helpful error message).
437 pub fn stderr(&self) -> &O {
438 &self.std_err_stream
439 }
440
441 /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
442 ///
443 /// This method enables a safeguard that ensures that the process represented by this
444 /// `ProcessHandle` is properly terminated or awaited before being dropped.
445 /// If `must_be_terminated` is set and the `ProcessHandle` is
446 /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
447 /// prevent silent failure-states, ensuring that system resources are handled correctly.
448 ///
449 /// You typically do not need to call this, as every ProcessHandle is marked by default.
450 /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
451 /// process without terminating it.
452 ///
453 /// # Panic
454 ///
455 /// If the `ProcessHandle` is dropped without being awaited or terminated
456 /// after calling this method, a panic will occur with a descriptive message
457 /// to inform about the incorrect usage.
458 pub fn must_be_terminated(&mut self) {
459 self.panic_on_drop = Some(PanicOnDrop::new(
460 "tokio_process_tools::ProcessHandle",
461 "The process was not terminated.",
462 "Call `wait_for_completion` or `terminate` before the type is dropped!",
463 ));
464 }
465
466 /// Disables the panic-on-drop safeguard, allowing the spawned process to be kept running
467 /// uncontrolled in the background, while this handle can safely be dropped.
468 pub fn must_not_be_terminated(&mut self) {
469 if let Some(mut it) = self.panic_on_drop.take() {
470 it.defuse()
471 }
472 }
473
474 /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
475 /// automatically when this handle is dropped.
476 ///
477 /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
478 ///
479 /// Prefer manual termination of the process or awaiting it and relying on the (automatically
480 /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
481 /// nor terminated before being dropped.
482 pub fn terminate_on_drop(
483 self,
484 graceful_termination_timeout: Duration,
485 forceful_termination_timeout: Duration,
486 ) -> TerminateOnDrop<O> {
487 TerminateOnDrop {
488 process_handle: self,
489 interrupt_timeout: graceful_termination_timeout,
490 terminate_timeout: forceful_termination_timeout,
491 }
492 }
493
494 /// Manually send a `SIGINT` on unix or equivalent on Windows to this process.
495 ///
496 /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
497 pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
498 signal::send_interrupt(&self.child)
499 }
500
501 /// Manually send a `SIGTERM` on unix or equivalent on Windows to this process.
502 ///
503 /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
504 pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
505 signal::send_terminate(&self.child)
506 }
507
508 /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
509 /// doesn't run to completion after receiving any of the first two signals.
510 ///
511 /// This handle can be dropped safely after this call returned, no matter the outcome.
512 /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
513 pub async fn terminate(
514 &mut self,
515 interrupt_timeout: Duration,
516 terminate_timeout: Duration,
517 ) -> Result<ExitStatus, TerminationError> {
518 // Whether or not this function will ultimately succeed, we tried our best to terminate
519 // this process.
520 // Dropping this handle should not create any on-drop panic anymore.
521 // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
522 self.must_not_be_terminated();
523
524 self.send_interrupt_signal()
525 .map_err(|err| TerminationError::SignallingFailed {
526 process_name: self.name.clone(),
527 source: err,
528 signal: "SIGINT",
529 })?;
530
531 match self.wait_for_completion(Some(interrupt_timeout)).await {
532 Ok(exit_status) => Ok(exit_status),
533 Err(not_terminated_after_sigint) => {
534 tracing::warn!(
535 process = %self.name,
536 error = %not_terminated_after_sigint,
537 "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
538 );
539
540 self.send_terminate_signal()
541 .map_err(|err| TerminationError::SignallingFailed {
542 process_name: self.name.clone(),
543 source: err,
544 signal: "SIGTERM",
545 })?;
546
547 match self.wait_for_completion(Some(terminate_timeout)).await {
548 Ok(exit_status) => Ok(exit_status),
549 Err(not_terminated_after_sigterm) => {
550 tracing::warn!(
551 process = %self.name,
552 error = %not_terminated_after_sigterm,
553 "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
554 );
555
556 match self.kill().await {
557 Ok(()) => {
558 // Note: A SIGKILL should typically (somewhat) immediately lead to
559 // termination of the process. But there are cases in which even
560 // a SIGKILL does not / cannot / will not kill a process.
561 // Something must have gone horribly wrong then...
562 // But: We do not want to wait indefinitely in case this happens
563 // and therefore wait (at max) for a fixed duration after any
564 // SIGKILL event.
565 match self.wait_for_completion(Some(SIGKILL_WAIT_TIMEOUT)).await {
566 Ok(exit_status) => Ok(exit_status),
567 Err(not_terminated_after_sigkill) => {
568 // Unlikely. See the note above.
569 tracing::error!(
570 "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
571 self.name
572 );
573 Err(TerminationError::TerminationFailed {
574 process_name: self.name.clone(),
575 sigint_error: not_terminated_after_sigint.to_string(),
576 sigterm_error: not_terminated_after_sigterm.to_string(),
577 sigkill_error: io::Error::new(
578 io::ErrorKind::TimedOut,
579 not_terminated_after_sigkill.to_string(),
580 ),
581 })
582 }
583 }
584 }
585 Err(kill_error) => {
586 tracing::error!(
587 process = %self.name,
588 error = %kill_error,
589 "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
590 );
591
592 Err(TerminationError::TerminationFailed {
593 process_name: self.name.clone(),
594 sigint_error: not_terminated_after_sigint.to_string(),
595 sigterm_error: not_terminated_after_sigterm.to_string(),
596 sigkill_error: kill_error,
597 })
598 }
599 }
600 }
601 }
602 }
603 }
604 }
605
606 /// Forces the process to exit. Most users should call [ProcessHandle::terminate] instead.
607 ///
608 /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
609 pub async fn kill(&mut self) -> io::Result<()> {
610 self.child.kill().await
611 }
612
613 /// Successfully awaiting the completion of the process will unset the
614 /// "must be terminated" setting, as a successfully awaited process is already terminated.
615 /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
616 /// "must be terminated" panic being raised.
617 async fn wait(&mut self) -> io::Result<ExitStatus> {
618 match self.child.wait().await {
619 Ok(status) => {
620 self.must_not_be_terminated();
621 Ok(status)
622 }
623 Err(err) => Err(err),
624 }
625 }
626
627 /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
628 ///
629 /// If the timeout is reached before the process terminated, an error is returned but the
630 /// process remains untouched / keeps running.
631 /// Use [ProcessHandle::wait_for_completion_or_terminate] if you want immediate termination.
632 ///
633 /// This does not provide the processes output. You can take a look at the convenience function
634 /// [ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output] to see
635 /// how the [ProcessHandle::stdout] and [ProcessHandle::stderr] streams (also available in
636 /// *_mut variants) can be used to inspect / watch over / capture the processes output.
637 pub async fn wait_for_completion(
638 &mut self,
639 timeout: Option<Duration>,
640 ) -> Result<ExitStatus, WaitError> {
641 match timeout {
642 None => self.wait().await.map_err(|source| WaitError::IoError {
643 process_name: self.name.clone(),
644 source,
645 }),
646 Some(timeout_duration) => {
647 match tokio::time::timeout(timeout_duration, self.wait()).await {
648 Ok(Ok(exit_status)) => Ok(exit_status),
649 Ok(Err(source)) => Err(WaitError::IoError {
650 process_name: self.name.clone(),
651 source,
652 }),
653 Err(_elapsed) => Err(WaitError::Timeout {
654 process_name: self.name.clone(),
655 timeout: timeout_duration,
656 }),
657 }
658 }
659 }
660 }
661
662 /// Wait for this process to run to completion within `timeout`.
663 ///
664 /// If the timeout is reached before the process terminated normally, external termination of
665 /// the process is forced through [ProcessHandle::terminate].
666 ///
667 /// Note that this function may return `Ok` even though the timeout was reached, carrying the
668 /// exit status received after sending a termination signal!
669 pub async fn wait_for_completion_or_terminate(
670 &mut self,
671 wait_timeout: Duration,
672 interrupt_timeout: Duration,
673 terminate_timeout: Duration,
674 ) -> Result<ExitStatus, TerminationError> {
675 match self.wait_for_completion(Some(wait_timeout)).await {
676 Ok(exit_status) => Ok(exit_status),
677 Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
678 }
679 }
680
681 /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
682 /// stdout and stderr output streams.
683 pub fn into_inner(self) -> (Child, O, O) {
684 (self.child, self.std_out_stream, self.std_err_stream)
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691 use assertr::prelude::*;
692
693 #[tokio::test]
694 async fn test_termination() {
695 let mut cmd = tokio::process::Command::new("sleep");
696 cmd.arg("5");
697
698 let started_at = jiff::Zoned::now();
699 let mut handle = crate::Process::new(cmd)
700 .name("sleep")
701 .spawn_broadcast()
702 .unwrap();
703 tokio::time::sleep(Duration::from_millis(100)).await;
704 let exit_status = handle
705 .terminate(Duration::from_secs(1), Duration::from_secs(1))
706 .await
707 .unwrap();
708 let terminated_at = jiff::Zoned::now();
709
710 // We terminate after roughly 100 ms of waiting.
711 // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
712 // We can increase this if the test should turn out to be flaky.
713 let ran_for = started_at.duration_until(&terminated_at);
714 assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
715
716 // When terminated, we do not get an exit code (unix).
717 assert_that(exit_status.code()).is_none();
718 }
719}