tokio_process_tools/process_handle.rs
1use crate::error::{SpawnError, TerminationError, WaitError};
2use crate::output::Output;
3use crate::output_stream::broadcast::BroadcastOutputStream;
4use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::{BackpressureControl, FromStreamOptions};
6use crate::panic_on_drop::PanicOnDrop;
7use crate::terminate_on_drop::TerminateOnDrop;
8use crate::{LineParsingOptions, NumBytes, OutputStream, signal};
9use std::borrow::Cow;
10use std::fmt::Debug;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13use std::time::Duration;
14use tokio::process::{Child, ChildStdin};
15
16const STDOUT_STREAM_NAME: &str = "stdout";
17const STDERR_STREAM_NAME: &str = "stderr";
18
19/// Maximum time to wait for process termination after sending SIGKILL.
20///
21/// This is a safety timeout since SIGKILL should terminate processes immediately,
22/// but there are rare cases where even SIGKILL may not work.
23const SIGKILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
24
25/// Represents the stdin stream of a child process.
26///
27/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
28/// that can be used to write data to the process. It can be explicitly closed by calling
29/// [`Stdin::close`], after which it transitions to the `Closed` state.
30#[derive(Debug)]
31pub enum Stdin {
32 /// stdin is open and available for writing.
33 Open(ChildStdin),
34 /// stdin has been closed.
35 Closed,
36}
37
38impl Stdin {
39 /// Returns `true` if stdin is open and available for writing.
40 pub fn is_open(&self) -> bool {
41 matches!(self, Stdin::Open(_))
42 }
43
44 /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
45 pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
46 match self {
47 Stdin::Open(stdin) => Some(stdin),
48 Stdin::Closed => None,
49 }
50 }
51
52 /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
53 ///
54 /// This sends `EOF` to the child process. After calling this method, this stdin
55 /// will be in the `Closed` state and no further writes will be possible.
56 pub fn close(&mut self) {
57 *self = Stdin::Closed;
58 }
59}
60
61/// Represents the running state of a process.
62#[derive(Debug)]
63pub enum RunningState {
64 /// The process is still running.
65 Running,
66
67 /// The process has terminated with the given exit status.
68 Terminated(ExitStatus),
69
70 /// Failed to determine process state.
71 Uncertain(io::Error),
72}
73
74impl RunningState {
75 /// Returns `true` if the process is running, `false` otherwise.
76 pub fn as_bool(&self) -> bool {
77 match self {
78 RunningState::Running => true,
79 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
80 }
81 }
82}
83
84impl From<RunningState> for bool {
85 fn from(is_running: RunningState) -> Self {
86 match is_running {
87 RunningState::Running => true,
88 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
89 }
90 }
91}
92
93/// A handle to a spawned process with captured stdout/stderr streams.
94///
95/// This type provides methods for waiting on process completion, terminating the process,
96/// and accessing its output streams. By default, processes must be explicitly waited on
97/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
98///
99/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
100/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
101#[derive(Debug)]
102pub struct ProcessHandle<O: OutputStream> {
103 pub(crate) name: Cow<'static, str>,
104 child: Child,
105 std_in: Stdin,
106 std_out_stream: O,
107 std_err_stream: O,
108 panic_on_drop: Option<PanicOnDrop>,
109}
110
111impl ProcessHandle<BroadcastOutputStream> {
112 /// Spawns a new process with broadcast output streams and custom channel capacities.
113 ///
114 /// This method is intended for internal use by the `Process` builder.
115 /// Users should use `Process::new(cmd).spawn_broadcast()` instead.
116 pub(crate) fn spawn_with_capacity(
117 name: impl Into<Cow<'static, str>>,
118 mut cmd: tokio::process::Command,
119 stdout_chunk_size: NumBytes,
120 stderr_chunk_size: NumBytes,
121 stdout_channel_capacity: usize,
122 stderr_channel_capacity: usize,
123 ) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
124 let process_name = name.into();
125 Self::prepare_command(&mut cmd)
126 .spawn()
127 .map(|child| {
128 Self::new_from_child_with_piped_io_and_capacity(
129 process_name.clone(),
130 child,
131 stdout_chunk_size,
132 stderr_chunk_size,
133 stdout_channel_capacity,
134 stderr_channel_capacity,
135 )
136 })
137 .map_err(|source| SpawnError::SpawnFailed {
138 process_name,
139 source,
140 })
141 }
142
143 fn new_from_child_with_piped_io_and_capacity(
144 name: impl Into<Cow<'static, str>>,
145 mut child: Child,
146 stdout_chunk_size: NumBytes,
147 stderr_chunk_size: NumBytes,
148 stdout_channel_capacity: usize,
149 stderr_channel_capacity: usize,
150 ) -> ProcessHandle<BroadcastOutputStream> {
151 let std_in = match child.stdin.take() {
152 Some(stdin) => Stdin::Open(stdin),
153 None => Stdin::Closed,
154 };
155 let stdout = child
156 .stdout
157 .take()
158 .expect("Child process stdout wasn't captured");
159 let stderr = child
160 .stderr
161 .take()
162 .expect("Child process stderr wasn't captured");
163
164 let std_out_stream = BroadcastOutputStream::from_stream(
165 stdout,
166 STDOUT_STREAM_NAME,
167 FromStreamOptions {
168 chunk_size: stdout_chunk_size,
169 channel_capacity: stdout_channel_capacity,
170 },
171 );
172 let std_err_stream = BroadcastOutputStream::from_stream(
173 stderr,
174 STDERR_STREAM_NAME,
175 FromStreamOptions {
176 chunk_size: stderr_chunk_size,
177 channel_capacity: stderr_channel_capacity,
178 },
179 );
180
181 let mut this = ProcessHandle {
182 name: name.into(),
183 child,
184 std_in,
185 std_out_stream,
186 std_err_stream,
187 panic_on_drop: None,
188 };
189 this.must_be_terminated();
190 this
191 }
192
193 /// Convenience function, waiting for the process to complete using
194 /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
195 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
196 ///
197 /// You may want to destructure this using:
198 /// ```no_run
199 /// # use tokio::process::Command;
200 /// # use tokio_process_tools::*;
201 /// # tokio_test::block_on(async {
202 /// # let mut proc = Process::new(Command::new("ls")).spawn_broadcast().unwrap();
203 /// let Output {
204 /// status,
205 /// stdout,
206 /// stderr
207 /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
208 /// # });
209 /// ```
210 pub async fn wait_for_completion_with_output(
211 &mut self,
212 timeout: Option<Duration>,
213 options: LineParsingOptions,
214 ) -> Result<Output, WaitError> {
215 let out_collector = self.stdout().collect_lines_into_vec(options);
216 let err_collector = self.stderr().collect_lines_into_vec(options);
217
218 let status = self.wait_for_completion(timeout).await?;
219
220 let stdout = out_collector.wait().await?;
221 let stderr = err_collector.wait().await?;
222
223 Ok(Output {
224 status,
225 stdout,
226 stderr,
227 })
228 }
229
230 /// Convenience function, waiting for the process to complete using
231 /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
232 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
233 pub async fn wait_for_completion_with_output_or_terminate(
234 &mut self,
235 wait_timeout: Duration,
236 interrupt_timeout: Duration,
237 terminate_timeout: Duration,
238 options: LineParsingOptions,
239 ) -> Result<Output, WaitError> {
240 let out_collector = self.stdout().collect_lines_into_vec(options);
241 let err_collector = self.stderr().collect_lines_into_vec(options);
242
243 let status = self
244 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
245 .await?;
246
247 let stdout = out_collector.wait().await?;
248 let stderr = err_collector.wait().await?;
249
250 Ok(Output {
251 status,
252 stdout,
253 stderr,
254 })
255 }
256}
257
258impl ProcessHandle<SingleSubscriberOutputStream> {
259 /// Spawns a new process with single subscriber output streams and custom channel capacities.
260 ///
261 /// This method is intended for internal use by the `Process` builder.
262 /// Users should use `Process::new(cmd).spawn_single_subscriber()` instead.
263 pub(crate) fn spawn_with_capacity(
264 name: impl Into<Cow<'static, str>>,
265 mut cmd: tokio::process::Command,
266 stdout_chunk_size: NumBytes,
267 stderr_chunk_size: NumBytes,
268 stdout_channel_capacity: usize,
269 stderr_channel_capacity: usize,
270 ) -> Result<Self, SpawnError> {
271 let process_name = name.into();
272 Self::prepare_command(&mut cmd)
273 .spawn()
274 .map(|child| {
275 Self::new_from_child_with_piped_io_and_capacity(
276 process_name.clone(),
277 child,
278 stdout_chunk_size,
279 stderr_chunk_size,
280 stdout_channel_capacity,
281 stderr_channel_capacity,
282 )
283 })
284 .map_err(|source| SpawnError::SpawnFailed {
285 process_name,
286 source,
287 })
288 }
289
290 fn new_from_child_with_piped_io_and_capacity(
291 name: impl Into<Cow<'static, str>>,
292 mut child: Child,
293 stdout_chunk_size: NumBytes,
294 stderr_chunk_size: NumBytes,
295 stdout_channel_capacity: usize,
296 stderr_channel_capacity: usize,
297 ) -> Self {
298 let std_in = match child.stdin.take() {
299 Some(stdin) => Stdin::Open(stdin),
300 None => Stdin::Closed,
301 };
302 let stdout = child
303 .stdout
304 .take()
305 .expect("Child process stdout wasn't captured");
306 let stderr = child
307 .stderr
308 .take()
309 .expect("Child process stderr wasn't captured");
310
311 let std_out_stream = SingleSubscriberOutputStream::from_stream(
312 stdout,
313 STDOUT_STREAM_NAME,
314 BackpressureControl::DropLatestIncomingIfBufferFull,
315 FromStreamOptions {
316 chunk_size: stdout_chunk_size,
317 channel_capacity: stdout_channel_capacity,
318 },
319 );
320 let std_err_stream = SingleSubscriberOutputStream::from_stream(
321 stderr,
322 STDERR_STREAM_NAME,
323 BackpressureControl::DropLatestIncomingIfBufferFull,
324 FromStreamOptions {
325 chunk_size: stderr_chunk_size,
326 channel_capacity: stderr_channel_capacity,
327 },
328 );
329
330 let mut this = ProcessHandle {
331 name: name.into(),
332 child,
333 std_in,
334 std_out_stream,
335 std_err_stream,
336 panic_on_drop: None,
337 };
338 this.must_be_terminated();
339 this
340 }
341
342 /// Convenience function, waiting for the process to complete using
343 /// [ProcessHandle::wait_for_completion] while collecting both `stdout` and `stderr`
344 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
345 ///
346 /// You may want to destructure this using:
347 /// ```no_run
348 /// # use tokio_process_tools::*;
349 /// # tokio_test::block_on(async {
350 /// # let mut proc = Process::new(tokio::process::Command::new("ls")).spawn_broadcast().unwrap();
351 /// let Output {
352 /// status,
353 /// stdout,
354 /// stderr
355 /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
356 /// # });
357 /// ```
358 pub async fn wait_for_completion_with_output(
359 &mut self,
360 timeout: Option<Duration>,
361 options: LineParsingOptions,
362 ) -> Result<Output, WaitError> {
363 let out_collector = self.stdout().collect_lines_into_vec(options);
364 let err_collector = self.stderr().collect_lines_into_vec(options);
365
366 let status = self.wait_for_completion(timeout).await?;
367
368 let stdout = out_collector.wait().await?;
369 let stderr = err_collector.wait().await?;
370
371 Ok(Output {
372 status,
373 stdout,
374 stderr,
375 })
376 }
377
378 /// Convenience function, waiting for the process to complete using
379 /// [ProcessHandle::wait_for_completion_or_terminate] while collecting both `stdout` and `stderr`
380 /// into individual `Vec<String>` collections using the provided [LineParsingOptions].
381 pub async fn wait_for_completion_with_output_or_terminate(
382 &mut self,
383 wait_timeout: Duration,
384 interrupt_timeout: Duration,
385 terminate_timeout: Duration,
386 options: LineParsingOptions,
387 ) -> Result<Output, WaitError> {
388 let out_collector = self.stdout().collect_lines_into_vec(options);
389 let err_collector = self.stderr().collect_lines_into_vec(options);
390
391 let status = self
392 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
393 .await?;
394
395 let stdout = out_collector.wait().await?;
396 let stderr = err_collector.wait().await?;
397
398 Ok(Output {
399 status,
400 stdout,
401 stderr,
402 })
403 }
404}
405
406impl<O: OutputStream> ProcessHandle<O> {
407 /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
408 /// which works more like `killpg`. Sending to the current process ID will likely trigger
409 /// undefined behavior of sending the event to every process that's attached to the console,
410 /// i.e. sending the event to group ID 0. Therefore, we create a new process group
411 /// for the child process we are about to spawn.
412 ///
413 /// See: https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6
414 fn prepare_platform_specifics(
415 command: &mut tokio::process::Command,
416 ) -> &mut tokio::process::Command {
417 #[cfg(windows)]
418 {
419 use windows_sys::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
420 command.creation_flags(CREATE_NEW_PROCESS_GROUP)
421 }
422 #[cfg(not(windows))]
423 {
424 command
425 }
426 }
427
428 fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
429 Self::prepare_platform_specifics(command)
430 .stdin(Stdio::piped())
431 .stdout(Stdio::piped())
432 .stderr(Stdio::piped())
433 // It is much too easy to leave dangling resources here and there.
434 // This library tries to make it clear and encourage users to terminate spawned
435 // processes appropriately. If not done so anyway, this acts as a "last resort"
436 // type of solution, less graceful as the `terminate_on_drop` effect but at least
437 // capable of cleaning up.
438 .kill_on_drop(true)
439 }
440
441 /// Returns the OS process ID if the process hasn't exited yet.
442 ///
443 /// Once this process has been polled to completion this will return None.
444 pub fn id(&self) -> Option<u32> {
445 self.child.id()
446 }
447
448 /// Checks if the process is currently running.
449 ///
450 /// Returns [`RunningState::Running`] if the process is still running,
451 /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
452 /// if the state could not be determined.
453 //noinspection RsSelfConvention
454 pub fn is_running(&mut self) -> RunningState {
455 match self.child.try_wait() {
456 Ok(None) => RunningState::Running,
457 Ok(Some(exit_status)) => {
458 self.must_not_be_terminated();
459 RunningState::Terminated(exit_status)
460 }
461 Err(err) => RunningState::Uncertain(err),
462 }
463 }
464
465 /// Returns a mutable reference to the (potentially already closed) stdin stream.
466 ///
467 /// Use this to write data to the child process's stdin. The stdin stream implements
468 /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
469 ///
470 /// # Example
471 ///
472 /// ```no_run
473 /// # use tokio::process::Command;
474 /// # use tokio_process_tools::*;
475 /// # use tokio::io::AsyncWriteExt;
476 /// # tokio_test::block_on(async {
477 /// // Whether we `spawn_broadcast` or `spawn_single_subscriber` does not make a difference here.
478 /// let mut process = Process::new(Command::new("cat"))
479 /// .spawn_broadcast()
480 /// .unwrap();
481 ///
482 /// // Write to stdin.
483 /// if let Some(stdin) = process.stdin().as_mut() {
484 /// stdin.write_all(b"Hello, process!\n").await.unwrap();
485 /// stdin.flush().await.unwrap();
486 /// }
487 ///
488 /// // Close stdin to signal EOF.
489 /// process.stdin().close();
490 /// # });
491 /// ```
492 pub fn stdin(&mut self) -> &mut Stdin {
493 &mut self.std_in
494 }
495
496 /// Returns a reference to the stdout stream.
497 ///
498 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
499 /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
500 /// attempts will panic with a helpful error message).
501 pub fn stdout(&self) -> &O {
502 &self.std_out_stream
503 }
504
505 /// Returns a reference to the stderr stream.
506 ///
507 /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
508 /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
509 /// attempts will panic with a helpful error message).
510 pub fn stderr(&self) -> &O {
511 &self.std_err_stream
512 }
513
514 /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
515 ///
516 /// This method enables a safeguard that ensures that the process represented by this
517 /// `ProcessHandle` is properly terminated or awaited before being dropped.
518 /// If `must_be_terminated` is set and the `ProcessHandle` is
519 /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
520 /// prevent silent failure-states, ensuring that system resources are handled correctly.
521 ///
522 /// You typically do not need to call this, as every ProcessHandle is marked by default.
523 /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
524 /// process without terminating it.
525 ///
526 /// # Panic
527 ///
528 /// If the `ProcessHandle` is dropped without being awaited or terminated
529 /// after calling this method, a panic will occur with a descriptive message
530 /// to inform about the incorrect usage.
531 pub fn must_be_terminated(&mut self) {
532 self.panic_on_drop = Some(PanicOnDrop::new(
533 "tokio_process_tools::ProcessHandle",
534 "The process was not terminated.",
535 "Call `wait_for_completion` or `terminate` before the type is dropped!",
536 ));
537 }
538
539 /// Disables the panic-on-drop safeguard, allowing the spawned process to be kept running
540 /// uncontrolled in the background, while this handle can safely be dropped.
541 pub fn must_not_be_terminated(&mut self) {
542 if let Some(mut it) = self.panic_on_drop.take() {
543 it.defuse()
544 }
545 }
546
547 /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
548 /// automatically when this handle is dropped.
549 ///
550 /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
551 ///
552 /// Prefer manual termination of the process or awaiting it and relying on the (automatically
553 /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
554 /// nor terminated before being dropped.
555 pub fn terminate_on_drop(
556 self,
557 graceful_termination_timeout: Duration,
558 forceful_termination_timeout: Duration,
559 ) -> TerminateOnDrop<O> {
560 TerminateOnDrop {
561 process_handle: self,
562 interrupt_timeout: graceful_termination_timeout,
563 terminate_timeout: forceful_termination_timeout,
564 }
565 }
566
567 /// Manually send a `SIGINT` on unix or equivalent on Windows to this process.
568 ///
569 /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
570 pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
571 signal::send_interrupt(&self.child)
572 }
573
574 /// Manually send a `SIGTERM` on unix or equivalent on Windows to this process.
575 ///
576 /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
577 pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
578 signal::send_terminate(&self.child)
579 }
580
581 /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
582 /// doesn't run to completion after receiving any of the first two signals.
583 ///
584 /// This handle can be dropped safely after this call returned, no matter the outcome.
585 /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
586 pub async fn terminate(
587 &mut self,
588 interrupt_timeout: Duration,
589 terminate_timeout: Duration,
590 ) -> Result<ExitStatus, TerminationError> {
591 // Whether or not this function will ultimately succeed, we tried our best to terminate
592 // this process.
593 // Dropping this handle should not create any on-drop panic anymore.
594 // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
595 self.must_not_be_terminated();
596
597 self.send_interrupt_signal()
598 .map_err(|err| TerminationError::SignallingFailed {
599 process_name: self.name.clone(),
600 source: err,
601 signal: "SIGINT",
602 })?;
603
604 match self.wait_for_completion(Some(interrupt_timeout)).await {
605 Ok(exit_status) => Ok(exit_status),
606 Err(not_terminated_after_sigint) => {
607 tracing::warn!(
608 process = %self.name,
609 error = %not_terminated_after_sigint,
610 "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
611 );
612
613 self.send_terminate_signal()
614 .map_err(|err| TerminationError::SignallingFailed {
615 process_name: self.name.clone(),
616 source: err,
617 signal: "SIGTERM",
618 })?;
619
620 match self.wait_for_completion(Some(terminate_timeout)).await {
621 Ok(exit_status) => Ok(exit_status),
622 Err(not_terminated_after_sigterm) => {
623 tracing::warn!(
624 process = %self.name,
625 error = %not_terminated_after_sigterm,
626 "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
627 );
628
629 match self.kill().await {
630 Ok(()) => {
631 // Note: A SIGKILL should typically (somewhat) immediately lead to
632 // termination of the process. But there are cases in which even
633 // a SIGKILL does not / cannot / will not kill a process.
634 // Something must have gone horribly wrong then...
635 // But: We do not want to wait indefinitely in case this happens
636 // and therefore wait (at max) for a fixed duration after any
637 // SIGKILL event.
638 match self.wait_for_completion(Some(SIGKILL_WAIT_TIMEOUT)).await {
639 Ok(exit_status) => Ok(exit_status),
640 Err(not_terminated_after_sigkill) => {
641 // Unlikely. See the note above.
642 tracing::error!(
643 "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
644 self.name
645 );
646 Err(TerminationError::TerminationFailed {
647 process_name: self.name.clone(),
648 sigint_error: not_terminated_after_sigint.to_string(),
649 sigterm_error: not_terminated_after_sigterm.to_string(),
650 sigkill_error: io::Error::new(
651 io::ErrorKind::TimedOut,
652 not_terminated_after_sigkill.to_string(),
653 ),
654 })
655 }
656 }
657 }
658 Err(kill_error) => {
659 tracing::error!(
660 process = %self.name,
661 error = %kill_error,
662 "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
663 );
664
665 Err(TerminationError::TerminationFailed {
666 process_name: self.name.clone(),
667 sigint_error: not_terminated_after_sigint.to_string(),
668 sigterm_error: not_terminated_after_sigterm.to_string(),
669 sigkill_error: kill_error,
670 })
671 }
672 }
673 }
674 }
675 }
676 }
677 }
678
679 /// Forces the process to exit. Most users should call [ProcessHandle::terminate] instead.
680 ///
681 /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
682 pub async fn kill(&mut self) -> io::Result<()> {
683 self.child.kill().await
684 }
685
686 /// Successfully awaiting the completion of the process will unset the
687 /// "must be terminated" setting, as a successfully awaited process is already terminated.
688 /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
689 /// "must be terminated" panic being raised.
690 async fn wait(&mut self) -> io::Result<ExitStatus> {
691 match self.child.wait().await {
692 Ok(status) => {
693 self.must_not_be_terminated();
694 Ok(status)
695 }
696 Err(err) => Err(err),
697 }
698 }
699
700 /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
701 ///
702 /// If the timeout is reached before the process terminated, an error is returned but the
703 /// process remains untouched / keeps running.
704 /// Use [ProcessHandle::wait_for_completion_or_terminate] if you want immediate termination.
705 ///
706 /// This does not provide the processes output. You can take a look at the convenience function
707 /// [ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output] to see
708 /// how the [ProcessHandle::stdout] and [ProcessHandle::stderr] streams (also available in
709 /// *_mut variants) can be used to inspect / watch over / capture the processes output.
710 pub async fn wait_for_completion(
711 &mut self,
712 timeout: Option<Duration>,
713 ) -> Result<ExitStatus, WaitError> {
714 match timeout {
715 None => self.wait().await.map_err(|source| WaitError::IoError {
716 process_name: self.name.clone(),
717 source,
718 }),
719 Some(timeout_duration) => {
720 match tokio::time::timeout(timeout_duration, self.wait()).await {
721 Ok(Ok(exit_status)) => Ok(exit_status),
722 Ok(Err(source)) => Err(WaitError::IoError {
723 process_name: self.name.clone(),
724 source,
725 }),
726 Err(_elapsed) => Err(WaitError::Timeout {
727 process_name: self.name.clone(),
728 timeout: timeout_duration,
729 }),
730 }
731 }
732 }
733 }
734
735 /// Wait for this process to run to completion within `timeout`.
736 ///
737 /// If the timeout is reached before the process terminated normally, external termination of
738 /// the process is forced through [ProcessHandle::terminate].
739 ///
740 /// Note that this function may return `Ok` even though the timeout was reached, carrying the
741 /// exit status received after sending a termination signal!
742 pub async fn wait_for_completion_or_terminate(
743 &mut self,
744 wait_timeout: Duration,
745 interrupt_timeout: Duration,
746 terminate_timeout: Duration,
747 ) -> Result<ExitStatus, TerminationError> {
748 match self.wait_for_completion(Some(wait_timeout)).await {
749 Ok(exit_status) => Ok(exit_status),
750 Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
751 }
752 }
753
754 /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
755 /// stdout and stderr output streams.
756 pub fn into_inner(self) -> (Child, O, O) {
757 (self.child, self.std_out_stream, self.std_err_stream)
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764 use assertr::prelude::*;
765 use tokio::io::AsyncWriteExt;
766
767 #[tokio::test]
768 async fn test_termination() {
769 let mut cmd = tokio::process::Command::new("sleep");
770 cmd.arg("5");
771
772 let started_at = jiff::Zoned::now();
773 let mut handle = crate::Process::new(cmd)
774 .name("sleep")
775 .spawn_broadcast()
776 .unwrap();
777 tokio::time::sleep(Duration::from_millis(100)).await;
778 let exit_status = handle
779 .terminate(Duration::from_secs(1), Duration::from_secs(1))
780 .await
781 .unwrap();
782 let terminated_at = jiff::Zoned::now();
783
784 // We terminate after roughly 100 ms of waiting.
785 // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
786 // We can increase this if the test should turn out to be flaky.
787 let ran_for = started_at.duration_until(&terminated_at);
788 assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
789
790 // When terminated, we do not get an exit code (unix).
791 assert_that(exit_status.code()).is_none();
792 }
793
794 #[tokio::test]
795 async fn test_stdin_write_and_read() {
796 let cmd = tokio::process::Command::new("cat");
797 let mut process = crate::Process::new(cmd)
798 .name("cat")
799 .spawn_broadcast()
800 .unwrap();
801
802 // Verify stdin starts as open.
803 assert_that(process.stdin().is_open()).is_true();
804
805 // Write to stdin.
806 let test_data = b"Hello from stdin!\n";
807 if let Some(stdin) = process.stdin().as_mut() {
808 stdin.write_all(test_data).await.unwrap();
809 stdin.flush().await.unwrap();
810 }
811
812 // Close stdin to signal EOF.
813 process.stdin().close();
814 assert_that(process.stdin().is_open()).is_false();
815
816 // Collect stdout.
817 let output = process
818 .wait_for_completion_with_output(
819 Some(Duration::from_secs(2)),
820 LineParsingOptions::default(),
821 )
822 .await
823 .unwrap();
824
825 assert_that(output.status.success()).is_true();
826 assert_that(&output.stdout).has_length(1);
827 assert_that(output.stdout[0].as_str()).is_equal_to("Hello from stdin!");
828 }
829
830 #[tokio::test]
831 async fn test_stdin_close_sends_eof() {
832 // Use `cat` which will exit when stdin is closed.
833 let cmd = tokio::process::Command::new("cat");
834 let mut process = crate::Process::new(cmd)
835 .name("cat")
836 .spawn_broadcast()
837 .unwrap();
838
839 // Close stdin immediately without writing.
840 process.stdin().close();
841 assert_that(process.stdin().is_open()).is_false();
842
843 // Process should terminate since it receives EOF.
844 let status = process
845 .wait_for_completion(Some(Duration::from_secs(2)))
846 .await
847 .unwrap();
848
849 assert_that(status.success()).is_true();
850 }
851
852 #[tokio::test]
853 async fn test_stdin_multiple_writes() {
854 let cmd = tokio::process::Command::new("cat");
855 let mut process = crate::Process::new(cmd)
856 .name("cat")
857 .spawn_broadcast()
858 .unwrap();
859
860 // Write multiple lines.
861 if let Some(stdin) = process.stdin().as_mut() {
862 stdin.write_all(b"Line 1\n").await.unwrap();
863 stdin.write_all(b"Line 2\n").await.unwrap();
864 stdin.write_all(b"Line 3\n").await.unwrap();
865 stdin.flush().await.unwrap();
866 }
867
868 process.stdin().close();
869
870 let output = process
871 .wait_for_completion_with_output(
872 Some(Duration::from_secs(2)),
873 LineParsingOptions::default(),
874 )
875 .await
876 .unwrap();
877
878 assert_that(&output.stdout).has_length(3);
879 assert_that(output.stdout[0].as_str()).is_equal_to("Line 1");
880 assert_that(output.stdout[1].as_str()).is_equal_to("Line 2");
881 assert_that(output.stdout[2].as_str()).is_equal_to("Line 3");
882 }
883
884 #[tokio::test]
885 async fn test_python_command_dispatch() {
886 let mut cmd = tokio::process::Command::new("python3");
887 cmd.arg("-i"); // Interactive mode.
888
889 let mut process = crate::Process::new(cmd).spawn_broadcast().unwrap();
890
891 // Monitor output.
892 let collector = process
893 .stdout()
894 .collect_lines_into_vec(LineParsingOptions::default());
895
896 // Send command to Python.
897 if let Some(stdin) = process.stdin().as_mut() {
898 stdin
899 .write_all(b"print('Hello from Python')\n")
900 .await
901 .unwrap();
902 stdin.flush().await.unwrap();
903 }
904
905 // Wait a bit for output.
906 tokio::time::sleep(Duration::from_millis(500)).await;
907
908 // We can either:
909 // - not close stdin and manually `terminate` the process or
910 // - close stdin, and wait for the process to naturally terminate (which python3 will).
911 process.stdin().close();
912 process
913 .wait_for_completion(Some(Duration::from_secs(1)))
914 .await
915 .unwrap();
916
917 let collected = collector.wait().await.unwrap();
918 assert_that(&collected).has_length(1);
919 assert_that(collected[0].as_str()).is_equal_to("Hello from Python");
920 }
921}