Skip to main content

tokio_process_tools/
process_handle.rs

1use crate::error::{SpawnError, TerminationError, WaitError};
2use crate::output::{Output, RawOutput};
3use crate::output_stream::broadcast::BroadcastOutputStream;
4use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::{BackpressureControl, FromStreamOptions};
6use crate::panic_on_drop::PanicOnDrop;
7use crate::terminate_on_drop::TerminateOnDrop;
8use crate::{LineParsingOptions, NumBytes, OutputStream, signal};
9use std::borrow::Cow;
10use std::fmt::Debug;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13use std::time::Duration;
14use tokio::process::{Child, ChildStdin};
15
16const STDOUT_STREAM_NAME: &str = "stdout";
17const STDERR_STREAM_NAME: &str = "stderr";
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub(crate) struct SingleSubscriberStreamConfig {
21    pub(crate) chunk_size: NumBytes,
22    pub(crate) channel_capacity: usize,
23    pub(crate) backpressure_control: BackpressureControl,
24}
25
26/// Maximum time to wait for process termination after sending SIGKILL.
27///
28/// This is a safety timeout since SIGKILL should terminate processes immediately,
29/// but there are rare cases where even SIGKILL may not work.
30const SIGKILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
31
32/// Represents the stdin stream of a child process.
33///
34/// stdin is always configured as piped, so it starts as `Open` with a [`ChildStdin`] handle
35/// that can be used to write data to the process. It can be explicitly closed by calling
36/// [`Stdin::close`], after which it transitions to the `Closed` state.
37#[derive(Debug)]
38pub enum Stdin {
39    /// stdin is open and available for writing.
40    Open(ChildStdin),
41    /// stdin has been closed.
42    Closed,
43}
44
45impl Stdin {
46    /// Returns `true` if stdin is open and available for writing.
47    #[must_use]
48    pub fn is_open(&self) -> bool {
49        matches!(self, Stdin::Open(_))
50    }
51
52    /// Returns a mutable reference to the underlying [`ChildStdin`] if open, or `None` if closed.
53    pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
54        match self {
55            Stdin::Open(stdin) => Some(stdin),
56            Stdin::Closed => None,
57        }
58    }
59
60    /// Closes stdin by dropping the underlying [`ChildStdin`] handle.
61    ///
62    /// This sends `EOF` to the child process. After calling this method, this stdin
63    /// will be in the `Closed` state and no further writes will be possible.
64    pub fn close(&mut self) {
65        *self = Stdin::Closed;
66    }
67}
68
69/// Represents the running state of a process.
70#[derive(Debug)]
71pub enum RunningState {
72    /// The process is still running.
73    Running,
74
75    /// The process has terminated with the given exit status.
76    Terminated(ExitStatus),
77
78    /// Failed to determine process state.
79    Uncertain(io::Error),
80}
81
82impl RunningState {
83    /// Returns `true` if the process is running, `false` otherwise.
84    #[must_use]
85    pub fn as_bool(&self) -> bool {
86        match self {
87            RunningState::Running => true,
88            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
89        }
90    }
91}
92
93impl From<RunningState> for bool {
94    fn from(is_running: RunningState) -> Self {
95        match is_running {
96            RunningState::Running => true,
97            RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
98        }
99    }
100}
101
102/// A handle to a spawned process with captured stdout/stderr streams.
103///
104/// This type provides methods for waiting on process completion, terminating the process,
105/// and accessing its output streams. By default, processes must be explicitly waited on
106/// or terminated before being dropped (see [`ProcessHandle::must_be_terminated`]).
107///
108/// If applicable, a process handle can be wrapped in a [`TerminateOnDrop`] to be terminated
109/// automatically upon being dropped. Note that this requires a multi-threaded runtime!
110#[derive(Debug)]
111pub struct ProcessHandle<O: OutputStream> {
112    pub(crate) name: Cow<'static, str>,
113    child: Child,
114    std_in: Stdin,
115    std_out_stream: O,
116    std_err_stream: O,
117    cleanup_on_drop: bool,
118    panic_on_drop: Option<PanicOnDrop>,
119}
120
121impl<O: OutputStream> Drop for ProcessHandle<O> {
122    fn drop(&mut self) {
123        if self.cleanup_on_drop {
124            // We want users to explicitly await or terminate spawned processes.
125            // If not done so, kill the process now to have some sort of last-resort cleanup.
126            // A separate panic-on-drop guard may additionally raise a panic to signal the misuse.
127            if let Err(err) = self.child.start_kill() {
128                tracing::warn!(
129                    process = %self.name,
130                    error = %err,
131                    "Failed to kill process while dropping an armed ProcessHandle"
132                );
133            }
134        }
135    }
136}
137
138impl ProcessHandle<BroadcastOutputStream> {
139    /// Spawns a new process with broadcast output streams and custom channel capacities.
140    ///
141    /// This method is intended for internal use by the `Process` builder.
142    /// Users should use `Process::new(cmd).spawn_broadcast()` instead.
143    pub(crate) fn spawn_with_capacity(
144        name: impl Into<Cow<'static, str>>,
145        mut cmd: tokio::process::Command,
146        stdout_chunk_size: NumBytes,
147        stderr_chunk_size: NumBytes,
148        stdout_channel_capacity: usize,
149        stderr_channel_capacity: usize,
150    ) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
151        stdout_chunk_size.assert_non_zero("stdout_chunk_size");
152        stderr_chunk_size.assert_non_zero("stderr_chunk_size");
153
154        let process_name = name.into();
155        Self::prepare_command(&mut cmd)
156            .spawn()
157            .map(|child| {
158                Self::new_from_child_with_piped_io_and_capacity(
159                    process_name.clone(),
160                    child,
161                    stdout_chunk_size,
162                    stderr_chunk_size,
163                    stdout_channel_capacity,
164                    stderr_channel_capacity,
165                )
166            })
167            .map_err(|source| SpawnError::SpawnFailed {
168                process_name,
169                source,
170            })
171    }
172
173    fn new_from_child_with_piped_io_and_capacity(
174        name: impl Into<Cow<'static, str>>,
175        mut child: Child,
176        stdout_chunk_size: NumBytes,
177        stderr_chunk_size: NumBytes,
178        stdout_channel_capacity: usize,
179        stderr_channel_capacity: usize,
180    ) -> ProcessHandle<BroadcastOutputStream> {
181        let std_in = match child.stdin.take() {
182            Some(stdin) => Stdin::Open(stdin),
183            None => Stdin::Closed,
184        };
185        let stdout = child
186            .stdout
187            .take()
188            .expect("Child process stdout wasn't captured");
189        let stderr = child
190            .stderr
191            .take()
192            .expect("Child process stderr wasn't captured");
193
194        let std_out_stream = BroadcastOutputStream::from_stream(
195            stdout,
196            STDOUT_STREAM_NAME,
197            FromStreamOptions {
198                chunk_size: stdout_chunk_size,
199                channel_capacity: stdout_channel_capacity,
200            },
201        );
202        let std_err_stream = BroadcastOutputStream::from_stream(
203            stderr,
204            STDERR_STREAM_NAME,
205            FromStreamOptions {
206                chunk_size: stderr_chunk_size,
207                channel_capacity: stderr_channel_capacity,
208            },
209        );
210
211        let mut this = ProcessHandle {
212            name: name.into(),
213            child,
214            std_in,
215            std_out_stream,
216            std_err_stream,
217            cleanup_on_drop: false,
218            panic_on_drop: None,
219        };
220        this.must_be_terminated();
221        this
222    }
223
224    /// Convenience function, waiting for the process to complete using
225    /// [`ProcessHandle::wait_for_completion`] while collecting both `stdout` and `stderr`
226    /// into individual `Vec<String>` collections using the provided [`LineParsingOptions`].
227    ///
228    /// You may want to destructure this using:
229    /// ```no_run
230    /// # use tokio::process::Command;
231    /// # use tokio_process_tools::*;
232    /// # tokio_test::block_on(async {
233    /// # let mut proc = Process::new(Command::new("ls")).spawn_broadcast().unwrap();
234    /// let Output {
235    ///     status,
236    ///     stdout,
237    ///     stderr
238    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
239    /// # });
240    /// ```
241    ///
242    /// # Errors
243    ///
244    /// Returns [`WaitError`] if waiting for the process or collecting output fails.
245    pub async fn wait_for_completion_with_output(
246        &mut self,
247        timeout: Option<Duration>,
248        options: LineParsingOptions,
249    ) -> Result<Output, WaitError> {
250        let out_collector = self.stdout().collect_lines_into_vec(options);
251        let err_collector = self.stderr().collect_lines_into_vec(options);
252
253        let status = self.wait_for_completion(timeout).await?;
254
255        let stdout = out_collector.wait().await?;
256        let stderr = err_collector.wait().await?;
257
258        Ok(Output {
259            status,
260            stdout,
261            stderr,
262        })
263    }
264
265    /// Convenience function, waiting for the process to complete using
266    /// [`ProcessHandle::wait_for_completion`] while collecting both `stdout` and `stderr`
267    /// into raw byte vectors.
268    ///
269    /// # Errors
270    ///
271    /// Returns [`WaitError`] if waiting for the process or collecting output fails.
272    pub async fn wait_for_completion_with_raw_output(
273        &mut self,
274        timeout: Option<Duration>,
275    ) -> Result<RawOutput, WaitError> {
276        let out_collector = self.stdout().collect_chunks_into_vec();
277        let err_collector = self.stderr().collect_chunks_into_vec();
278
279        let status = self.wait_for_completion(timeout).await?;
280
281        let stdout = out_collector.wait().await?;
282        let stderr = err_collector.wait().await?;
283
284        Ok(RawOutput {
285            status,
286            stdout,
287            stderr,
288        })
289    }
290
291    /// Convenience function, waiting for the process to complete using
292    /// [`ProcessHandle::wait_for_completion_or_terminate`] while collecting both `stdout` and `stderr`
293    /// into individual `Vec<String>` collections using the provided [`LineParsingOptions`].
294    ///
295    /// # Errors
296    ///
297    /// Returns [`WaitError`] if waiting for the process, terminating it, or collecting output fails.
298    pub async fn wait_for_completion_with_output_or_terminate(
299        &mut self,
300        wait_timeout: Duration,
301        interrupt_timeout: Duration,
302        terminate_timeout: Duration,
303        options: LineParsingOptions,
304    ) -> Result<Output, WaitError> {
305        let out_collector = self.stdout().collect_lines_into_vec(options);
306        let err_collector = self.stderr().collect_lines_into_vec(options);
307
308        let status = self
309            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
310            .await?;
311
312        let stdout = out_collector.wait().await?;
313        let stderr = err_collector.wait().await?;
314
315        Ok(Output {
316            status,
317            stdout,
318            stderr,
319        })
320    }
321
322    /// Convenience function, waiting for the process to complete using
323    /// [`ProcessHandle::wait_for_completion_or_terminate`] while collecting both `stdout` and
324    /// `stderr` into raw byte vectors.
325    ///
326    /// # Errors
327    ///
328    /// Returns [`WaitError`] if waiting for the process, terminating it, or collecting output fails.
329    pub async fn wait_for_completion_with_raw_output_or_terminate(
330        &mut self,
331        wait_timeout: Duration,
332        interrupt_timeout: Duration,
333        terminate_timeout: Duration,
334    ) -> Result<RawOutput, WaitError> {
335        let out_collector = self.stdout().collect_chunks_into_vec();
336        let err_collector = self.stderr().collect_chunks_into_vec();
337
338        let status = self
339            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
340            .await?;
341
342        let stdout = out_collector.wait().await?;
343        let stderr = err_collector.wait().await?;
344
345        Ok(RawOutput {
346            status,
347            stdout,
348            stderr,
349        })
350    }
351}
352
353impl ProcessHandle<SingleSubscriberOutputStream> {
354    /// Spawns a new process with single subscriber output streams and custom channel capacities.
355    ///
356    /// This method is intended for internal use by the `Process` builder.
357    /// Users should use `Process::new(cmd).spawn_single_subscriber()` instead.
358    pub(crate) fn spawn_with_capacity(
359        name: impl Into<Cow<'static, str>>,
360        mut cmd: tokio::process::Command,
361        stdout_config: SingleSubscriberStreamConfig,
362        stderr_config: SingleSubscriberStreamConfig,
363    ) -> Result<Self, SpawnError> {
364        stdout_config
365            .chunk_size
366            .assert_non_zero("stdout_config.chunk_size");
367        stderr_config
368            .chunk_size
369            .assert_non_zero("stderr_config.chunk_size");
370
371        let process_name = name.into();
372        Self::prepare_command(&mut cmd)
373            .spawn()
374            .map(|child| {
375                Self::new_from_child_with_piped_io_and_capacity(
376                    process_name.clone(),
377                    child,
378                    stdout_config,
379                    stderr_config,
380                )
381            })
382            .map_err(|source| SpawnError::SpawnFailed {
383                process_name,
384                source,
385            })
386    }
387
388    fn new_from_child_with_piped_io_and_capacity(
389        name: impl Into<Cow<'static, str>>,
390        mut child: Child,
391        stdout_config: SingleSubscriberStreamConfig,
392        stderr_config: SingleSubscriberStreamConfig,
393    ) -> Self {
394        let std_in = match child.stdin.take() {
395            Some(stdin) => Stdin::Open(stdin),
396            None => Stdin::Closed,
397        };
398        let stdout = child
399            .stdout
400            .take()
401            .expect("Child process stdout wasn't captured");
402        let stderr = child
403            .stderr
404            .take()
405            .expect("Child process stderr wasn't captured");
406
407        let std_out_stream = SingleSubscriberOutputStream::from_stream(
408            stdout,
409            STDOUT_STREAM_NAME,
410            stdout_config.backpressure_control,
411            FromStreamOptions {
412                chunk_size: stdout_config.chunk_size,
413                channel_capacity: stdout_config.channel_capacity,
414            },
415        );
416        let std_err_stream = SingleSubscriberOutputStream::from_stream(
417            stderr,
418            STDERR_STREAM_NAME,
419            stderr_config.backpressure_control,
420            FromStreamOptions {
421                chunk_size: stderr_config.chunk_size,
422                channel_capacity: stderr_config.channel_capacity,
423            },
424        );
425
426        let mut this = ProcessHandle {
427            name: name.into(),
428            child,
429            std_in,
430            std_out_stream,
431            std_err_stream,
432            cleanup_on_drop: false,
433            panic_on_drop: None,
434        };
435        this.must_be_terminated();
436        this
437    }
438
439    /// Convenience function, waiting for the process to complete using
440    /// [`ProcessHandle::wait_for_completion`] while collecting both `stdout` and `stderr`
441    /// into individual `Vec<String>` collections using the provided [`LineParsingOptions`].
442    ///
443    /// You may want to destructure this using:
444    /// ```no_run
445    /// # use tokio_process_tools::*;
446    /// # tokio_test::block_on(async {
447    /// # let mut proc = Process::new(tokio::process::Command::new("ls")).spawn_broadcast().unwrap();
448    /// let Output {
449    ///     status,
450    ///     stdout,
451    ///     stderr
452    /// } = proc.wait_for_completion_with_output(None, LineParsingOptions::default()).await.unwrap();
453    /// # });
454    /// ```
455    ///
456    /// # Errors
457    ///
458    /// Returns [`WaitError`] if waiting for the process or collecting output fails.
459    pub async fn wait_for_completion_with_output(
460        &mut self,
461        timeout: Option<Duration>,
462        options: LineParsingOptions,
463    ) -> Result<Output, WaitError> {
464        let out_collector = self.stdout().collect_lines_into_vec(options);
465        let err_collector = self.stderr().collect_lines_into_vec(options);
466
467        let status = self.wait_for_completion(timeout).await?;
468
469        let stdout = out_collector.wait().await?;
470        let stderr = err_collector.wait().await?;
471
472        Ok(Output {
473            status,
474            stdout,
475            stderr,
476        })
477    }
478
479    /// Convenience function, waiting for the process to complete using
480    /// [`ProcessHandle::wait_for_completion`] while collecting both `stdout` and `stderr`
481    /// into raw byte vectors.
482    ///
483    /// # Errors
484    ///
485    /// Returns [`WaitError`] if waiting for the process or collecting output fails.
486    pub async fn wait_for_completion_with_raw_output(
487        &mut self,
488        timeout: Option<Duration>,
489    ) -> Result<RawOutput, WaitError> {
490        let out_collector = self.stdout().collect_chunks_into_vec();
491        let err_collector = self.stderr().collect_chunks_into_vec();
492
493        let status = self.wait_for_completion(timeout).await?;
494
495        let stdout = out_collector.wait().await?;
496        let stderr = err_collector.wait().await?;
497
498        Ok(RawOutput {
499            status,
500            stdout,
501            stderr,
502        })
503    }
504
505    /// Convenience function, waiting for the process to complete using
506    /// [`ProcessHandle::wait_for_completion_or_terminate`] while collecting both `stdout` and `stderr`
507    /// into individual `Vec<String>` collections using the provided [`LineParsingOptions`].
508    ///
509    /// # Errors
510    ///
511    /// Returns [`WaitError`] if waiting for the process, terminating it, or collecting output fails.
512    pub async fn wait_for_completion_with_output_or_terminate(
513        &mut self,
514        wait_timeout: Duration,
515        interrupt_timeout: Duration,
516        terminate_timeout: Duration,
517        options: LineParsingOptions,
518    ) -> Result<Output, WaitError> {
519        let out_collector = self.stdout().collect_lines_into_vec(options);
520        let err_collector = self.stderr().collect_lines_into_vec(options);
521
522        let status = self
523            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
524            .await?;
525
526        let stdout = out_collector.wait().await?;
527        let stderr = err_collector.wait().await?;
528
529        Ok(Output {
530            status,
531            stdout,
532            stderr,
533        })
534    }
535
536    /// Convenience function, waiting for the process to complete using
537    /// [`ProcessHandle::wait_for_completion_or_terminate`] while collecting both `stdout` and
538    /// `stderr` into raw byte vectors.
539    ///
540    /// # Errors
541    ///
542    /// Returns [`WaitError`] if waiting for the process, terminating it, or collecting output fails.
543    pub async fn wait_for_completion_with_raw_output_or_terminate(
544        &mut self,
545        wait_timeout: Duration,
546        interrupt_timeout: Duration,
547        terminate_timeout: Duration,
548    ) -> Result<RawOutput, WaitError> {
549        let out_collector = self.stdout().collect_chunks_into_vec();
550        let err_collector = self.stderr().collect_chunks_into_vec();
551
552        let status = self
553            .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
554            .await?;
555
556        let stdout = out_collector.wait().await?;
557        let stderr = err_collector.wait().await?;
558
559        Ok(RawOutput {
560            status,
561            stdout,
562            stderr,
563        })
564    }
565}
566
567impl<O: OutputStream> ProcessHandle<O> {
568    /// On Windows, you can only send `CTRL_C_EVENT` and `CTRL_BREAK_EVENT` to process groups,
569    /// which works more like `killpg`. Sending to the current process ID will likely trigger
570    /// undefined behavior of sending the event to every process that's attached to the console,
571    /// i.e. sending the event to group ID 0. Therefore, we create a new process group
572    /// for the child process we are about to spawn.
573    ///
574    /// See: <https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6>
575    fn prepare_platform_specifics(
576        command: &mut tokio::process::Command,
577    ) -> &mut tokio::process::Command {
578        #[cfg(windows)]
579        {
580            use windows_sys::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
581            command.creation_flags(CREATE_NEW_PROCESS_GROUP)
582        }
583        #[cfg(not(windows))]
584        {
585            command
586        }
587    }
588
589    fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
590        Self::prepare_platform_specifics(command)
591            .stdin(Stdio::piped())
592            .stdout(Stdio::piped())
593            .stderr(Stdio::piped())
594            // ProcessHandle itself performs the last-resort cleanup while its panic-on-drop guard
595            // is armed. Keeping Tokio's unconditional kill-on-drop disabled ensures that
596            // `must_not_be_terminated()` can really opt out.
597            .kill_on_drop(false)
598    }
599
600    /// Returns the OS process ID if the process hasn't exited yet.
601    ///
602    /// Once this process has been polled to completion this will return None.
603    pub fn id(&self) -> Option<u32> {
604        self.child.id()
605    }
606
607    fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
608        match self.child.try_wait() {
609            Ok(Some(exit_status)) => {
610                self.must_not_be_terminated();
611                Ok(Some(exit_status))
612            }
613            Ok(None) => Ok(None),
614            Err(err) => Err(err),
615        }
616    }
617
618    fn signalling_failed_or_reap(
619        &mut self,
620        signal: &'static str,
621        source: io::Error,
622    ) -> Result<ExitStatus, TerminationError> {
623        match self.try_reap_exit_status() {
624            Ok(Some(exit_status)) => Ok(exit_status),
625            Ok(None) | Err(_) => Err(TerminationError::SignallingFailed {
626                process_name: self.name.clone(),
627                source,
628                signal,
629            }),
630        }
631    }
632
633    /// Checks if the process is currently running.
634    ///
635    /// Returns [`RunningState::Running`] if the process is still running,
636    /// [`RunningState::Terminated`] if it has exited, or [`RunningState::Uncertain`]
637    /// if the state could not be determined.
638    //noinspection RsSelfConvention
639    pub fn is_running(&mut self) -> RunningState {
640        match self.try_reap_exit_status() {
641            Ok(None) => RunningState::Running,
642            Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
643            Err(err) => RunningState::Uncertain(err),
644        }
645    }
646
647    /// Returns a mutable reference to the (potentially already closed) stdin stream.
648    ///
649    /// Use this to write data to the child process's stdin. The stdin stream implements
650    /// [`tokio::io::AsyncWrite`], allowing you to use methods like `write_all()` and `flush()`.
651    ///
652    /// # Example
653    ///
654    /// ```no_run
655    /// # use tokio::process::Command;
656    /// # use tokio_process_tools::*;
657    /// # use tokio::io::AsyncWriteExt;
658    /// # tokio_test::block_on(async {
659    /// // Whether we `spawn_broadcast` or `spawn_single_subscriber` does not make a difference here.
660    /// let mut process = Process::new(Command::new("cat"))
661    ///     .spawn_broadcast()
662    ///     .unwrap();
663    ///
664    /// // Write to stdin.
665    /// if let Some(stdin) = process.stdin().as_mut() {
666    ///     stdin.write_all(b"Hello, process!\n").await.unwrap();
667    ///     stdin.flush().await.unwrap();
668    /// }
669    ///
670    /// // Close stdin to signal EOF.
671    /// process.stdin().close();
672    /// # });
673    /// ```
674    pub fn stdin(&mut self) -> &mut Stdin {
675        &mut self.std_in
676    }
677
678    /// Returns a reference to the stdout stream.
679    ///
680    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
681    /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
682    /// attempts will panic with a helpful error message).
683    pub fn stdout(&self) -> &O {
684        &self.std_out_stream
685    }
686
687    /// Returns a reference to the stderr stream.
688    ///
689    /// For `BroadcastOutputStream`, this allows creating multiple concurrent consumers.
690    /// For `SingleSubscriberOutputStream`, only one consumer can be created (subsequent
691    /// attempts will panic with a helpful error message).
692    pub fn stderr(&self) -> &O {
693        &self.std_err_stream
694    }
695
696    /// Sets a panic-on-drop mechanism for this `ProcessHandle`.
697    ///
698    /// This method enables a safeguard that ensures that the process represented by this
699    /// `ProcessHandle` is properly terminated or awaited before being dropped.
700    /// If `must_be_terminated` is set and the `ProcessHandle` is
701    /// dropped without invoking `terminate()` or `wait()`, an intentional panic will occur to
702    /// prevent silent failure-states, ensuring that system resources are handled correctly.
703    ///
704    /// You typically do not need to call this, as every `ProcessHandle` is marked by default.
705    /// Call `must_not_be_terminated` to clear this safeguard to explicitly allow dropping the
706    /// process without terminating it.
707    ///
708    /// # Panic
709    ///
710    /// If the `ProcessHandle` is dropped without being awaited or terminated
711    /// after calling this method, a panic will occur with a descriptive message
712    /// to inform about the incorrect usage.
713    pub fn must_be_terminated(&mut self) {
714        self.cleanup_on_drop = true;
715        self.panic_on_drop = Some(PanicOnDrop::new(
716            "tokio_process_tools::ProcessHandle",
717            "The process was not terminated.",
718            "Call `wait_for_completion` or `terminate` before the type is dropped!",
719        ));
720    }
721
722    /// Disables the kill/panic-on-drop safeguards for this handle.
723    ///
724    /// Dropping the handle after calling this method will no longer signal, kill, or panic.
725    /// However, this does **not** keep the library-owned stdio pipes alive. If the child still
726    /// depends on stdin, stdout, or stderr being open, dropping the handle may still affect it.
727    ///
728    /// Use plain [`tokio::process::Command`] directly when you need a child process that can
729    /// outlive the original handle without depending on captured stdio pipes.
730    pub fn must_not_be_terminated(&mut self) {
731        self.cleanup_on_drop = false;
732        self.defuse_drop_panic();
733    }
734
735    fn defuse_drop_panic(&mut self) {
736        if let Some(mut it) = self.panic_on_drop.take() {
737            it.defuse();
738        }
739    }
740
741    /// Wrap this process handle in a `TerminateOnDrop` instance, terminating the controlled process
742    /// automatically when this handle is dropped.
743    ///
744    /// **SAFETY: This only works when your code is running in a multithreaded tokio runtime!**
745    ///
746    /// Prefer manual termination of the process or awaiting it and relying on the (automatically
747    /// configured) `must_be_terminated` logic, raising a panic when a process was neither awaited
748    /// nor terminated before being dropped.
749    pub fn terminate_on_drop(
750        self,
751        graceful_termination_timeout: Duration,
752        forceful_termination_timeout: Duration,
753    ) -> TerminateOnDrop<O> {
754        TerminateOnDrop {
755            process_handle: self,
756            interrupt_timeout: graceful_termination_timeout,
757            terminate_timeout: forceful_termination_timeout,
758        }
759    }
760
761    /// Manually send a `SIGINT` on unix or equivalent on Windows to this process.
762    ///
763    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the platform signal could not be sent.
768    pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
769        signal::send_interrupt(&self.child)
770    }
771
772    /// Manually send a `SIGTERM` on unix or equivalent on Windows to this process.
773    ///
774    /// Prefer to call `terminate` instead, if you want to make sure this process is terminated.
775    ///
776    /// # Errors
777    ///
778    /// Returns an error if the platform signal could not be sent.
779    pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
780        signal::send_terminate(&self.child)
781    }
782
783    /// Terminates this process by sending a `SIGINT`, `SIGTERM` or even a `SIGKILL` if the process
784    /// doesn't run to completion after receiving any of the first two signals.
785    ///
786    /// This handle can be dropped safely after this call returned, no matter the outcome.
787    /// We accept that in extremely rare cases, failed `SIGKILL`, a rogue process may be left over.
788    ///
789    /// # Errors
790    ///
791    /// Returns [`TerminationError`] if signalling or waiting for process termination fails.
792    pub async fn terminate(
793        &mut self,
794        interrupt_timeout: Duration,
795        terminate_timeout: Duration,
796    ) -> Result<ExitStatus, TerminationError> {
797        // Whether or not this function will ultimately succeed, we tried our best to terminate
798        // this process.
799        // Dropping this handle should not create any on-drop panic anymore.
800        // We accept that in extremely rare cases, failed `kill`, a rogue process may be left over.
801        self.defuse_drop_panic();
802
803        if let Some(exit_status) =
804            self.try_reap_exit_status()
805                .map_err(|source| TerminationError::SignallingFailed {
806                    process_name: self.name.clone(),
807                    source,
808                    signal: "SIGINT",
809                })?
810        {
811            return Ok(exit_status);
812        }
813
814        if let Err(err) = self.send_interrupt_signal() {
815            return self.signalling_failed_or_reap("SIGINT", err);
816        }
817
818        match self.wait_for_completion(Some(interrupt_timeout)).await {
819            Ok(exit_status) => Ok(exit_status),
820            Err(not_terminated_after_sigint) => {
821                tracing::warn!(
822                    process = %self.name,
823                    error = %not_terminated_after_sigint,
824                    "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
825                );
826
827                if let Err(err) = self.send_terminate_signal() {
828                    return self.signalling_failed_or_reap("SIGTERM", err);
829                }
830
831                match self.wait_for_completion(Some(terminate_timeout)).await {
832                    Ok(exit_status) => Ok(exit_status),
833                    Err(not_terminated_after_sigterm) => {
834                        tracing::warn!(
835                            process = %self.name,
836                            error = %not_terminated_after_sigterm,
837                            "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
838                        );
839
840                        match self.kill().await {
841                            Ok(()) => {
842                                // Note: A SIGKILL should typically (somewhat) immediately lead to
843                                // termination of the process. But there are cases in which even
844                                // a SIGKILL does not / cannot / will not kill a process.
845                                // Something must have gone horribly wrong then...
846                                // But: We do not want to wait indefinitely in case this happens
847                                // and therefore wait (at max) for a fixed duration after any
848                                // SIGKILL event.
849                                match self.wait_for_completion(Some(SIGKILL_WAIT_TIMEOUT)).await {
850                                    Ok(exit_status) => Ok(exit_status),
851                                    Err(not_terminated_after_sigkill) => {
852                                        // Unlikely. See the note above.
853                                        tracing::error!(
854                                            "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
855                                            self.name
856                                        );
857                                        Err(TerminationError::TerminationFailed {
858                                            process_name: self.name.clone(),
859                                            sigint_error: not_terminated_after_sigint.to_string(),
860                                            sigterm_error: not_terminated_after_sigterm.to_string(),
861                                            sigkill_error: io::Error::new(
862                                                io::ErrorKind::TimedOut,
863                                                not_terminated_after_sigkill.to_string(),
864                                            ),
865                                        })
866                                    }
867                                }
868                            }
869                            Err(kill_error) => {
870                                if let Ok(Some(exit_status)) = self.try_reap_exit_status() {
871                                    return Ok(exit_status);
872                                }
873                                tracing::error!(
874                                    process = %self.name,
875                                    error = %kill_error,
876                                    "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
877                                );
878
879                                Err(TerminationError::TerminationFailed {
880                                    process_name: self.name.clone(),
881                                    sigint_error: not_terminated_after_sigint.to_string(),
882                                    sigterm_error: not_terminated_after_sigterm.to_string(),
883                                    sigkill_error: kill_error,
884                                })
885                            }
886                        }
887                    }
888                }
889            }
890        }
891    }
892
893    /// Forces the process to exit. Most users should call [`ProcessHandle::terminate`] instead.
894    ///
895    /// This is equivalent to sending a SIGKILL on unix platforms followed by wait.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if Tokio cannot kill the child process.
900    pub async fn kill(&mut self) -> io::Result<()> {
901        self.child.kill().await
902    }
903
904    /// Successfully awaiting the completion of the process will unset the
905    /// "must be terminated" setting, as a successfully awaited process is already terminated.
906    /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
907    /// "must be terminated" panic being raised.
908    async fn wait(&mut self) -> io::Result<ExitStatus> {
909        match self.child.wait().await {
910            Ok(status) => {
911                self.must_not_be_terminated();
912                Ok(status)
913            }
914            Err(err) => Err(err),
915        }
916    }
917
918    /// Wait for this process to run to completion. Within `timeout`, if set, or unbound otherwise.
919    ///
920    /// If the timeout is reached before the process terminated, an error is returned but the
921    /// process remains untouched / keeps running.
922    /// Use [`ProcessHandle::wait_for_completion_or_terminate`] if you want immediate termination.
923    ///
924    /// This does not provide the processes output. You can take a look at the convenience function
925    /// [`ProcessHandle::<BroadcastOutputStream>::wait_for_completion_with_output`] to see
926    /// how the [`ProcessHandle::stdout`] and [`ProcessHandle::stderr`] streams (also available in
927    /// *_mut variants) can be used to inspect / watch over / capture the processes output.
928    ///
929    /// # Errors
930    ///
931    /// Returns [`WaitError`] if waiting for the process fails or the timeout elapses.
932    pub async fn wait_for_completion(
933        &mut self,
934        timeout: Option<Duration>,
935    ) -> Result<ExitStatus, WaitError> {
936        match timeout {
937            None => self.wait().await.map_err(|source| WaitError::IoError {
938                process_name: self.name.clone(),
939                source,
940            }),
941            Some(timeout_duration) => {
942                match tokio::time::timeout(timeout_duration, self.wait()).await {
943                    Ok(Ok(exit_status)) => Ok(exit_status),
944                    Ok(Err(source)) => Err(WaitError::IoError {
945                        process_name: self.name.clone(),
946                        source,
947                    }),
948                    Err(_elapsed) => Err(WaitError::Timeout {
949                        process_name: self.name.clone(),
950                        timeout: timeout_duration,
951                    }),
952                }
953            }
954        }
955    }
956
957    /// Wait for this process to run to completion within `timeout`.
958    ///
959    /// If the timeout is reached before the process terminated normally, external termination of
960    /// the process is forced through [`ProcessHandle::terminate`].
961    ///
962    /// Note that this function may return `Ok` even though the timeout was reached, carrying the
963    /// exit status received after sending a termination signal!
964    ///
965    /// # Errors
966    ///
967    /// Returns [`TerminationError`] if termination is required and then fails.
968    pub async fn wait_for_completion_or_terminate(
969        &mut self,
970        wait_timeout: Duration,
971        interrupt_timeout: Duration,
972        terminate_timeout: Duration,
973    ) -> Result<ExitStatus, TerminationError> {
974        match self.wait_for_completion(Some(wait_timeout)).await {
975            Ok(exit_status) => Ok(exit_status),
976            Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
977        }
978    }
979
980    /// Consumes this handle to provide the wrapped `tokio::process::Child` instance as well as the
981    /// stdout and stderr output streams.
982    pub fn into_inner(mut self) -> (Child, O, O) {
983        self.must_not_be_terminated();
984        let mut this = std::mem::ManuallyDrop::new(self);
985
986        unsafe {
987            let child = std::ptr::read(&raw const this.child);
988            let stdout = std::ptr::read(&raw const this.std_out_stream);
989            let stderr = std::ptr::read(&raw const this.std_err_stream);
990
991            std::ptr::drop_in_place(&raw mut this.name);
992            // `ChildStdin` is stored separately from `child`, so we still need to drop it here.
993            std::ptr::drop_in_place(&raw mut this.std_in);
994            std::ptr::drop_in_place(&raw mut this.panic_on_drop);
995
996            (child, stdout, stderr)
997        }
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use assertr::prelude::*;
1005    use std::fs;
1006    use std::sync::{Arc, Mutex};
1007    use tokio::io::AsyncWriteExt;
1008
1009    use crate::Next;
1010
1011    #[tokio::test]
1012    async fn test_termination() {
1013        let mut cmd = tokio::process::Command::new("sleep");
1014        cmd.arg("5");
1015
1016        let started_at = jiff::Zoned::now();
1017        let mut handle = crate::Process::new(cmd)
1018            .name("sleep")
1019            .spawn_broadcast()
1020            .unwrap();
1021        tokio::time::sleep(Duration::from_millis(100)).await;
1022        let exit_status = handle
1023            .terminate(Duration::from_secs(1), Duration::from_secs(1))
1024            .await
1025            .unwrap();
1026        let terminated_at = jiff::Zoned::now();
1027
1028        // We terminate after roughly 100 ms of waiting.
1029        // Let's use a 50 ms grace period on the assertion taken up by performing the termination.
1030        // We can increase this if the test should turn out to be flaky.
1031        let ran_for = started_at.duration_until(&terminated_at);
1032        assert_that!(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
1033
1034        // When terminated, we do not get an exit code (unix).
1035        assert_that!(exit_status.code()).is_none();
1036    }
1037
1038    #[tokio::test]
1039    async fn terminate_returns_normal_exit_when_process_already_exited() {
1040        let mut cmd = tokio::process::Command::new("sh");
1041        cmd.arg("-c").arg("exit 0");
1042
1043        let mut handle = crate::Process::new(cmd)
1044            .name("sh")
1045            .spawn_broadcast()
1046            .unwrap();
1047
1048        tokio::time::sleep(Duration::from_millis(50)).await;
1049
1050        let exit_status = handle
1051            .terminate(Duration::from_millis(50), Duration::from_millis(50))
1052            .await
1053            .unwrap();
1054
1055        assert_that!(exit_status.success()).is_true();
1056    }
1057
1058    #[tokio::test]
1059    async fn test_stdin_write_and_read() {
1060        let cmd = tokio::process::Command::new("cat");
1061        let mut process = crate::Process::new(cmd)
1062            .name("cat")
1063            .spawn_broadcast()
1064            .unwrap();
1065
1066        // Verify stdin starts as open.
1067        assert_that!(process.stdin().is_open()).is_true();
1068
1069        // Write to stdin.
1070        let test_data = b"Hello from stdin!\n";
1071        if let Some(stdin) = process.stdin().as_mut() {
1072            stdin.write_all(test_data).await.unwrap();
1073            stdin.flush().await.unwrap();
1074        }
1075
1076        // Close stdin to signal EOF.
1077        process.stdin().close();
1078        assert_that!(process.stdin().is_open()).is_false();
1079
1080        // Collect stdout.
1081        let output = process
1082            .wait_for_completion_with_output(
1083                Some(Duration::from_secs(2)),
1084                LineParsingOptions::default(),
1085            )
1086            .await
1087            .unwrap();
1088
1089        assert_that!(output.status.success()).is_true();
1090        assert_that!(&output.stdout).has_length(1);
1091        assert_that!(output.stdout[0].as_str()).is_equal_to("Hello from stdin!");
1092    }
1093
1094    #[tokio::test]
1095    async fn test_stdin_close_sends_eof() {
1096        // Use `cat` which will exit when stdin is closed.
1097        let cmd = tokio::process::Command::new("cat");
1098        let mut process = crate::Process::new(cmd)
1099            .name("cat")
1100            .spawn_broadcast()
1101            .unwrap();
1102
1103        // Close stdin immediately without writing.
1104        process.stdin().close();
1105        assert_that!(process.stdin().is_open()).is_false();
1106
1107        // Process should terminate since it receives EOF.
1108        let status = process
1109            .wait_for_completion(Some(Duration::from_secs(2)))
1110            .await
1111            .unwrap();
1112
1113        assert_that!(status.success()).is_true();
1114    }
1115
1116    #[tokio::test]
1117    async fn test_stdin_multiple_writes() {
1118        let cmd = tokio::process::Command::new("cat");
1119        let mut process = crate::Process::new(cmd)
1120            .name("cat")
1121            .spawn_broadcast()
1122            .unwrap();
1123
1124        // Write multiple lines.
1125        if let Some(stdin) = process.stdin().as_mut() {
1126            stdin.write_all(b"Line 1\n").await.unwrap();
1127            stdin.write_all(b"Line 2\n").await.unwrap();
1128            stdin.write_all(b"Line 3\n").await.unwrap();
1129            stdin.flush().await.unwrap();
1130        }
1131
1132        process.stdin().close();
1133
1134        let output = process
1135            .wait_for_completion_with_output(
1136                Some(Duration::from_secs(2)),
1137                LineParsingOptions::default(),
1138            )
1139            .await
1140            .unwrap();
1141
1142        assert_that!(&output.stdout).has_length(3);
1143        assert_that!(output.stdout[0].as_str()).is_equal_to("Line 1");
1144        assert_that!(output.stdout[1].as_str()).is_equal_to("Line 2");
1145        assert_that!(output.stdout[2].as_str()).is_equal_to("Line 3");
1146    }
1147
1148    #[tokio::test]
1149    async fn test_shell_command_dispatch() {
1150        let cmd = tokio::process::Command::new("sh");
1151
1152        let mut process = crate::Process::new(cmd).spawn_broadcast().unwrap();
1153
1154        // Monitor output.
1155        let collector = process
1156            .stdout()
1157            .collect_lines_into_vec(LineParsingOptions::default());
1158
1159        // Send commands to the shell.
1160        if let Some(stdin) = process.stdin().as_mut() {
1161            stdin
1162                .write_all(b"printf 'Hello from shell\\n'\nexit\n")
1163                .await
1164                .unwrap();
1165            stdin.flush().await.unwrap();
1166        }
1167
1168        // Wait a bit for output.
1169        tokio::time::sleep(Duration::from_millis(500)).await;
1170
1171        process.stdin().close();
1172        process
1173            .wait_for_completion(Some(Duration::from_secs(1)))
1174            .await
1175            .unwrap();
1176
1177        let collected = collector.wait().await.unwrap();
1178        assert_that!(&collected).has_length(1);
1179        assert_that!(collected[0].as_str()).is_equal_to("Hello from shell");
1180    }
1181
1182    #[tokio::test]
1183    async fn test_into_inner_defuses_panic_guard() {
1184        let mut cmd = tokio::process::Command::new("sleep");
1185        cmd.arg("5");
1186
1187        let process = crate::Process::new(cmd)
1188            .name("sleep")
1189            .spawn_broadcast()
1190            .unwrap();
1191
1192        let (mut child, _stdout, _stderr) = process.into_inner();
1193        child.kill().await.unwrap();
1194        let _status = child.wait().await.unwrap();
1195    }
1196
1197    #[tokio::test]
1198    async fn test_into_inner_with_owned_name_drops_owned_string() {
1199        // Regression test: `into_inner` manually destructures the handle through
1200        // `ManuallyDrop`. If the `name` field isn't explicitly dropped, a
1201        // `Cow::Owned(String)` allocation leaks. Forcing the owned variant here
1202        // exercises the path that the static-`&str` test above doesn't.
1203        let mut cmd = tokio::process::Command::new("sleep");
1204        cmd.arg("5");
1205
1206        let process = crate::Process::new(cmd)
1207            .with_name(format!("sleeper-{}", 7))
1208            .spawn_broadcast()
1209            .unwrap();
1210
1211        let (mut child, _stdout, _stderr) = process.into_inner();
1212        child.kill().await.unwrap();
1213        let _status = child.wait().await.unwrap();
1214    }
1215
1216    #[tokio::test]
1217    async fn test_defusing_drop_panic_keeps_cleanup_guard_armed() {
1218        let mut cmd = tokio::process::Command::new("sleep");
1219        cmd.arg("5");
1220
1221        let mut process = crate::Process::new(cmd)
1222            .name("sleep")
1223            .spawn_broadcast()
1224            .unwrap();
1225
1226        assert_that!(process.cleanup_on_drop).is_true();
1227        assert_that!(
1228            process
1229                .panic_on_drop
1230                .as_ref()
1231                .is_some_and(PanicOnDrop::is_armed)
1232        )
1233        .is_true();
1234
1235        process.defuse_drop_panic();
1236
1237        assert_that!(process.cleanup_on_drop).is_true();
1238        assert_that!(&process.panic_on_drop).is_none();
1239
1240        process.kill().await.unwrap();
1241        process.wait_for_completion(None).await.unwrap();
1242    }
1243
1244    #[tokio::test]
1245    async fn test_wait_for_completion_disarms_cleanup_and_panic_guards() {
1246        let mut cmd = tokio::process::Command::new("sleep");
1247        cmd.arg("0.1");
1248
1249        let mut process = crate::Process::new(cmd)
1250            .name("sleep")
1251            .spawn_broadcast()
1252            .unwrap();
1253
1254        process
1255            .wait_for_completion(Some(Duration::from_secs(2)))
1256            .await
1257            .unwrap();
1258
1259        assert_that!(process.cleanup_on_drop).is_false();
1260        assert_that!(&process.panic_on_drop).is_none();
1261    }
1262
1263    #[cfg(unix)]
1264    #[tokio::test]
1265    async fn test_must_not_be_terminated_allows_process_to_survive_handle_drop() {
1266        use nix::errno::Errno;
1267        use nix::sys::signal::{self, Signal};
1268        use nix::sys::wait::waitpid;
1269        use nix::unistd::Pid;
1270
1271        let mut cmd = tokio::process::Command::new("sleep");
1272        cmd.arg("5");
1273
1274        let mut process = crate::Process::new(cmd)
1275            .name("sleep")
1276            .spawn_broadcast()
1277            .unwrap();
1278        let pid = process.id().unwrap();
1279
1280        process.must_not_be_terminated();
1281        assert_that!(process.cleanup_on_drop).is_false();
1282        assert_that!(&process.panic_on_drop).is_none();
1283        drop(process);
1284
1285        let pid = Pid::from_raw(pid.cast_signed());
1286        assert_that!(signal::kill(pid, None).is_ok()).is_true();
1287
1288        signal::kill(pid, Signal::SIGKILL).unwrap();
1289        match waitpid(pid, None) {
1290            Ok(_) | Err(Errno::ECHILD) => {}
1291            Err(err) => panic!("waitpid failed: {err}"),
1292        }
1293    }
1294
1295    #[cfg(unix)]
1296    #[tokio::test]
1297    async fn test_must_not_be_terminated_still_closes_stdin_on_drop() {
1298        use nix::sys::wait::waitpid;
1299        use nix::unistd::Pid;
1300        use tempfile::tempdir;
1301
1302        let temp_dir = tempdir().unwrap();
1303        let output_file = temp_dir.path().join("stdin-result.txt");
1304        let output_file = output_file.to_str().unwrap().replace('\'', "'\"'\"'");
1305
1306        let mut cmd = tokio::process::Command::new("sh");
1307        cmd.arg("-c")
1308            .arg(format!("cat >/dev/null; printf eof > '{output_file}'"));
1309
1310        let mut process = crate::Process::new(cmd)
1311            .name("sh")
1312            .spawn_broadcast()
1313            .unwrap();
1314        let pid = Pid::from_raw(process.id().unwrap().cast_signed());
1315
1316        process.must_not_be_terminated();
1317        drop(process);
1318
1319        tokio::time::timeout(
1320            Duration::from_secs(2),
1321            tokio::task::spawn_blocking(move || waitpid(pid, None)),
1322        )
1323        .await
1324        .unwrap()
1325        .unwrap()
1326        .unwrap();
1327
1328        assert_that!(fs::read_to_string(temp_dir.path().join("stdin-result.txt")).unwrap())
1329            .is_equal_to("eof");
1330    }
1331
1332    #[cfg(unix)]
1333    #[tokio::test]
1334    async fn test_must_not_be_terminated_does_not_keep_stdout_pipe_alive() {
1335        use nix::sys::wait::waitpid;
1336        use nix::unistd::Pid;
1337
1338        let mut cmd = tokio::process::Command::new("yes");
1339        cmd.arg("tick");
1340
1341        let mut process = crate::Process::new(cmd)
1342            .name("yes")
1343            .spawn_broadcast()
1344            .unwrap();
1345        let pid = Pid::from_raw(process.id().unwrap().cast_signed());
1346
1347        process.must_not_be_terminated();
1348        drop(process);
1349
1350        tokio::time::timeout(
1351            Duration::from_secs(2),
1352            tokio::task::spawn_blocking(move || waitpid(pid, None)),
1353        )
1354        .await
1355        .unwrap()
1356        .unwrap()
1357        .unwrap();
1358    }
1359
1360    #[tokio::test]
1361    async fn test_wait_for_completion_with_output_preserves_unterminated_final_line() {
1362        let mut cmd = tokio::process::Command::new("sh");
1363        cmd.arg("-c").arg("printf tail");
1364
1365        let mut process = crate::Process::new(cmd)
1366            .name("sh")
1367            .spawn_broadcast()
1368            .unwrap();
1369
1370        let output = process
1371            .wait_for_completion_with_output(
1372                Some(Duration::from_secs(2)),
1373                LineParsingOptions::default(),
1374            )
1375            .await
1376            .unwrap();
1377
1378        assert_that!(output.status.success()).is_true();
1379        assert_that!(output.stdout).contains_exactly(["tail"]);
1380        assert_that!(output.stderr).is_empty();
1381    }
1382
1383    #[tokio::test]
1384    async fn test_broadcast_wait_for_completion_with_raw_output_preserves_bytes() {
1385        let mut cmd = tokio::process::Command::new("sh");
1386        cmd.arg("-c")
1387            .arg("printf 'out\nraw'; printf 'err\nraw' >&2");
1388
1389        let mut process = crate::Process::new(cmd)
1390            .name("sh")
1391            .spawn_broadcast()
1392            .unwrap();
1393
1394        let output = process
1395            .wait_for_completion_with_raw_output(Some(Duration::from_secs(2)))
1396            .await
1397            .unwrap();
1398
1399        assert_that!(output.status.success()).is_true();
1400        assert_that!(output.stdout).is_equal_to(b"out\nraw".to_vec());
1401        assert_that!(output.stderr).is_equal_to(b"err\nraw".to_vec());
1402    }
1403
1404    #[tokio::test]
1405    async fn test_single_subscriber_wait_for_completion_with_raw_output_preserves_bytes() {
1406        let mut cmd = tokio::process::Command::new("sh");
1407        cmd.arg("-c")
1408            .arg("printf 'out\nraw'; printf 'err\nraw' >&2");
1409
1410        let mut process = crate::Process::new(cmd)
1411            .name("sh")
1412            .spawn_single_subscriber()
1413            .unwrap();
1414
1415        let output = process
1416            .wait_for_completion_with_raw_output(Some(Duration::from_secs(2)))
1417            .await
1418            .unwrap();
1419
1420        assert_that!(output.status.success()).is_true();
1421        assert_that!(output.stdout).is_equal_to(b"out\nraw".to_vec());
1422        assert_that!(output.stderr).is_equal_to(b"err\nraw".to_vec());
1423    }
1424
1425    #[tokio::test]
1426    async fn test_inspect_lines_async_preserves_unterminated_final_line() {
1427        let mut cmd = tokio::process::Command::new("sh");
1428        cmd.arg("-c").arg("printf tail");
1429
1430        let mut process = crate::Process::new(cmd)
1431            .name("sh")
1432            .spawn_broadcast()
1433            .unwrap();
1434
1435        let seen = Arc::new(Mutex::new(Vec::<String>::new()));
1436        let seen_in_task = Arc::clone(&seen);
1437        let inspector = process.stdout().inspect_lines_async(
1438            move |line| {
1439                let seen = Arc::clone(&seen_in_task);
1440                let line = line.into_owned();
1441                async move {
1442                    seen.lock().expect("lock").push(line);
1443                    Next::Continue
1444                }
1445            },
1446            LineParsingOptions::default(),
1447        );
1448
1449        process
1450            .wait_for_completion(Some(Duration::from_secs(2)))
1451            .await
1452            .unwrap();
1453        inspector.wait().await.unwrap();
1454
1455        let seen = seen.lock().expect("lock").clone();
1456        assert_that!(seen).contains_exactly(["tail"]);
1457    }
1458}