Skip to main content

tokio_process_tools/process_handle/
wait.rs

1use super::ProcessHandle;
2#[cfg(any(unix, windows))]
3use super::WaitForCompletionOrTerminateOptions;
4#[cfg(any(unix, windows))]
5use super::termination::GracefulTimeouts;
6use crate::error::{WaitError, WaitForCompletionResult};
7#[cfg(any(unix, windows))]
8use crate::error::{WaitForCompletionOrTerminateResult, WaitOrTerminateError};
9use crate::output_stream::OutputStream;
10use std::io;
11use std::process::ExitStatus;
12use std::time::Duration;
13
14#[cfg(any(unix, windows))]
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub(super) struct WaitOrTerminateOutcome {
17    pub(super) result: WaitForCompletionOrTerminateResult<ExitStatus>,
18    pub(super) output_collection_timeout_budget: Duration,
19}
20
21#[cfg(any(unix, windows))]
22fn wait_or_terminate_base_budget(wait_timeout: Duration, timeouts: GracefulTimeouts) -> Duration {
23    wait_timeout.saturating_add(timeouts.total())
24}
25
26impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
27where
28    Stdout: OutputStream,
29    Stderr: OutputStream,
30{
31    /// Successfully awaiting the completion of the process will unset the
32    /// "must be terminated" setting, as a successfully awaited process is already terminated.
33    /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
34    /// "must be terminated" panic being raised.
35    async fn wait(&mut self) -> io::Result<ExitStatus> {
36        self.stdin().close();
37        match self.child.wait().await {
38            Ok(status) => {
39                self.must_not_be_terminated();
40                Ok(status)
41            }
42            Err(err) => Err(err),
43        }
44    }
45
46    /// Wait for this process to run to completion within `timeout`.
47    ///
48    /// Any still-open stdin handle is closed before waiting begins, matching
49    /// [`tokio::process::Child::wait`] and helping avoid deadlocks where the child is waiting for
50    /// input while the parent is waiting for exit.
51    ///
52    /// If the timeout is reached before the process terminated, a timeout outcome is returned and the
53    /// process keeps running without being signalled or killed. Its stdin has still been closed
54    /// before the wait started.
55    /// Use [`ProcessHandle::wait_for_completion_or_terminate`] if you want immediate termination.
56    ///
57    /// This does not provide the processes output. Use [`ProcessHandle::stdout`] and
58    /// [`ProcessHandle::stderr`] to inspect, watch over, or capture the process output.
59    ///
60    /// # Errors
61    ///
62    /// Returns [`WaitError`] if waiting for the process fails.
63    pub async fn wait_for_completion(
64        &mut self,
65        timeout: Duration,
66    ) -> Result<WaitForCompletionResult, WaitError> {
67        self.wait_for_completion_inner(timeout).await
68    }
69
70    pub(super) async fn wait_for_completion_inner(
71        &mut self,
72        timeout: Duration,
73    ) -> Result<WaitForCompletionResult, WaitError> {
74        match tokio::time::timeout(timeout, self.wait()).await {
75            Ok(Ok(exit_status)) => Ok(WaitForCompletionResult::Completed(exit_status)),
76            Ok(Err(source)) => Err(WaitError::IoError {
77                process_name: self.name.clone(),
78                source,
79            }),
80            Err(_elapsed) => Ok(WaitForCompletionResult::Timeout { timeout }),
81        }
82    }
83
84    pub(super) async fn wait_for_completion_unbounded_inner(
85        &mut self,
86    ) -> Result<ExitStatus, WaitError> {
87        self.wait().await.map_err(|source| WaitError::IoError {
88            process_name: self.name.clone(),
89            source,
90        })
91    }
92
93    pub(super) async fn wait_for_exit_after_signal(
94        &mut self,
95        timeout: Duration,
96    ) -> Result<Option<ExitStatus>, WaitError> {
97        match self.wait_for_completion_inner(timeout).await? {
98            WaitForCompletionResult::Completed(exit_status) => Ok(Some(exit_status)),
99            WaitForCompletionResult::Timeout { .. } => Ok(None),
100        }
101    }
102
103    fn wait_timeout_error(timeout: Duration) -> io::Error {
104        io::Error::new(
105            io::ErrorKind::TimedOut,
106            format!("process did not complete within {timeout:?}"),
107        )
108    }
109
110    pub(super) fn wait_timeout_diagnostic(timeout: Duration) -> io::Error {
111        Self::wait_timeout_error(timeout)
112    }
113}
114
115#[cfg(any(unix, windows))]
116impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
117where
118    Stdout: OutputStream,
119    Stderr: OutputStream,
120{
121    fn terminated_after_timeout_result(
122        exit_status: ExitStatus,
123        timeout: Duration,
124        output_collection_timeout_budget: Duration,
125    ) -> WaitOrTerminateOutcome {
126        WaitOrTerminateOutcome {
127            result: WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
128                result: exit_status,
129                timeout,
130            },
131            output_collection_timeout_budget,
132        }
133    }
134
135    pub(super) fn exit_status_from_wait_or_terminate_result(
136        result: WaitForCompletionOrTerminateResult<ExitStatus>,
137    ) -> ExitStatus {
138        match result {
139            WaitForCompletionOrTerminateResult::Completed(exit_status)
140            | WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
141                result: exit_status,
142                ..
143            } => exit_status,
144        }
145    }
146
147    /// Wait for this process to run to completion within `wait_timeout`.
148    ///
149    /// Any still-open stdin handle is closed before the initial wait begins, matching
150    /// [`tokio::process::Child::wait`].
151    ///
152    /// If the timeout is reached before the process terminated normally, external termination is
153    /// forced through [`ProcessHandle::terminate`] and a
154    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`] outcome is returned when
155    /// cleanup succeeds. If waiting fails for a non-timeout reason, cleanup termination is still
156    /// attempted and the original wait failure is preserved in the returned error.
157    ///
158    /// Total wall-clock time can exceed `wait_timeout` plus the per-platform graceful budget
159    /// carried by `options.graceful_timeouts` (`interrupt_timeout + terminate_timeout` on Unix,
160    /// `graceful_timeout` on Windows) by one additional fixed 3-second wait when the force-kill
161    /// fallback is required.
162    ///
163    /// # Errors
164    ///
165    /// Returns [`WaitOrTerminateError`] if waiting fails or if timeout-triggered termination is
166    /// required and then fails.
167    pub async fn wait_for_completion_or_terminate(
168        &mut self,
169        options: WaitForCompletionOrTerminateOptions,
170    ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
171        self.wait_for_completion_or_terminate_inner(options.wait_timeout, options.graceful_timeouts)
172            .await
173    }
174
175    pub(super) async fn wait_for_completion_or_terminate_inner_detailed(
176        &mut self,
177        wait_timeout: Duration,
178        timeouts: GracefulTimeouts,
179    ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
180        let output_collection_timeout_budget =
181            wait_or_terminate_base_budget(wait_timeout, timeouts);
182
183        match self.wait_for_completion_inner(wait_timeout).await {
184            Ok(WaitForCompletionResult::Completed(exit_status)) => Ok(WaitOrTerminateOutcome {
185                result: WaitForCompletionOrTerminateResult::Completed(exit_status),
186                output_collection_timeout_budget,
187            }),
188            Ok(WaitForCompletionResult::Timeout { timeout }) => {
189                self.terminate_after_wait_timeout_detailed(timeout, timeouts)
190                    .await
191            }
192            Err(wait_error) => {
193                self.terminate_after_wait_error_detailed(wait_error, wait_timeout, timeouts)
194                    .await
195            }
196        }
197    }
198
199    pub(super) async fn wait_for_completion_or_terminate_inner(
200        &mut self,
201        wait_timeout: Duration,
202        timeouts: GracefulTimeouts,
203    ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
204        self.wait_for_completion_or_terminate_inner_detailed(wait_timeout, timeouts)
205            .await
206            .map(|outcome| outcome.result)
207    }
208
209    pub(super) async fn terminate_after_wait_timeout_detailed(
210        &mut self,
211        wait_timeout: Duration,
212        timeouts: GracefulTimeouts,
213    ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
214        let process_name = self.name.clone();
215        let output_collection_timeout_budget =
216            wait_or_terminate_base_budget(wait_timeout, timeouts);
217        match self.terminate_detailed(timeouts).await {
218            Ok(termination_outcome) => Ok(Self::terminated_after_timeout_result(
219                termination_outcome.exit_status,
220                wait_timeout,
221                output_collection_timeout_budget
222                    .saturating_add(termination_outcome.output_collection_timeout_extension),
223            )),
224            Err(termination_error) => Err(WaitOrTerminateError::TerminationAfterTimeoutFailed {
225                process_name,
226                timeout: wait_timeout,
227                termination_error,
228            }),
229        }
230    }
231
232    pub(super) async fn terminate_after_wait_error_detailed(
233        &mut self,
234        wait_error: WaitError,
235        _wait_timeout: Duration,
236        timeouts: GracefulTimeouts,
237    ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
238        let process_name = self.name.clone();
239
240        match self.terminate_detailed(timeouts).await {
241            Ok(termination_outcome) => Err(WaitOrTerminateError::WaitFailed {
242                process_name,
243                wait_error: Box::new(wait_error),
244                termination_status: termination_outcome.exit_status,
245            }),
246            Err(termination_error) => Err(WaitOrTerminateError::TerminationFailed {
247                process_name,
248                wait_error: Box::new(wait_error),
249                termination_error,
250            }),
251        }
252    }
253
254    #[cfg_attr(not(test), allow(dead_code))]
255    pub(super) async fn terminate_after_wait_error(
256        &mut self,
257        wait_error: WaitError,
258        timeouts: GracefulTimeouts,
259    ) -> Result<ExitStatus, WaitOrTerminateError> {
260        self.terminate_after_wait_error_detailed(wait_error, Duration::ZERO, timeouts)
261            .await
262            .map(|outcome| Self::exit_status_from_wait_or_terminate_result(outcome.result))
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    #[cfg(any(unix, windows))]
270    use crate::test_support::default_graceful_timeouts;
271    use crate::test_support::{
272        line_collection_options, line_parsing_options, long_running_command,
273    };
274    use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, NumBytesExt};
275    use assertr::prelude::*;
276    use tokio::io::AsyncWriteExt;
277
278    #[cfg(any(unix, windows))]
279    fn wait_or_terminate_options(wait_timeout: Duration) -> WaitForCompletionOrTerminateOptions {
280        WaitForCompletionOrTerminateOptions {
281            wait_timeout,
282            graceful_timeouts: default_graceful_timeouts(),
283        }
284    }
285
286    #[tokio::test]
287    async fn wait_for_completion_disarms_cleanup_and_panic_guards() {
288        let mut process = crate::Process::new(long_running_command(Duration::from_millis(100)))
289            .name("long-running")
290            .stdout_and_stderr(|stream| {
291                stream
292                    .broadcast()
293                    .best_effort_delivery()
294                    .no_replay()
295                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
296                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
297            })
298            .spawn()
299            .unwrap();
300
301        process
302            .wait_for_completion(Duration::from_secs(2))
303            .await
304            .unwrap()
305            .expect_completed("process should complete");
306
307        assert_that!(process.is_drop_disarmed()).is_true();
308    }
309
310    #[tokio::test]
311    async fn wait_for_completion_closes_stdin_before_waiting() {
312        let cmd = tokio::process::Command::new("cat");
313        let mut process = crate::Process::new(cmd)
314            .name("cat")
315            .stdout_and_stderr(|stream| {
316                stream
317                    .broadcast()
318                    .best_effort_delivery()
319                    .no_replay()
320                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
321                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
322            })
323            .spawn()
324            .unwrap();
325
326        let collector = process
327            .stdout()
328            .collect_lines_into_vec(line_parsing_options(), line_collection_options());
329
330        let Some(stdin) = process.stdin().as_mut() else {
331            assert_that!(process.stdin().is_open()).fail("stdin should start open");
332            return;
333        };
334        stdin.write_all(b"wait closes stdin\n").await.unwrap();
335        stdin.flush().await.unwrap();
336
337        let status = process
338            .wait_for_completion(Duration::from_secs(2))
339            .await
340            .unwrap()
341            .expect_completed("process should complete");
342
343        assert_that!(status.success()).is_true();
344        assert_that!(process.stdin().is_open()).is_false();
345
346        let collected = collector.wait().await.unwrap();
347        assert_that!(collected.lines().len()).is_equal_to(1);
348        assert_that!(collected[0].as_str()).is_equal_to("wait closes stdin");
349    }
350
351    #[cfg(any(unix, windows))]
352    #[tokio::test]
353    async fn or_terminate_returns_wait_failure_after_cleanup() {
354        let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
355            .name("long-running")
356            .stdout_and_stderr(|stream| {
357                stream
358                    .broadcast()
359                    .best_effort_delivery()
360                    .replay_last_bytes(1.megabytes())
361                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
362                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
363            })
364            .spawn()
365            .unwrap();
366
367        let wait_error = WaitError::IoError {
368            process_name: "long-running".into(),
369            source: io::Error::other("synthetic wait failure"),
370        };
371
372        let result = process
373            .terminate_after_wait_error(wait_error, default_graceful_timeouts())
374            .await;
375
376        assert_that!(process.is_drop_disarmed()).is_true();
377
378        let (wait_error, termination_status) = match result {
379            Err(WaitOrTerminateError::WaitFailed {
380                wait_error,
381                termination_status,
382                ..
383            }) => (wait_error, termination_status),
384            other => {
385                assert_that!(&other).fail(format_args!(
386                    "expected wait failure preserved after successful cleanup, got {other:?}"
387                ));
388                return;
389            }
390        };
391
392        assert_that!(termination_status.code()).is_none();
393
394        let WaitError::IoError { source, .. } = *wait_error;
395        assert_that!(source.to_string().as_str()).is_equal_to("synthetic wait failure");
396    }
397
398    #[cfg(any(unix, windows))]
399    #[tokio::test]
400    async fn wait_for_completion_or_terminate_terminates_after_timeout() {
401        let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
402            .name("long-running")
403            .stdout_and_stderr(|stream| {
404                stream
405                    .broadcast()
406                    .best_effort_delivery()
407                    .no_replay()
408                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
409                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
410            })
411            .spawn()
412            .unwrap();
413
414        let result = process
415            .wait_for_completion_or_terminate(wait_or_terminate_options(Duration::from_millis(10)))
416            .await
417            .unwrap();
418        let status = match result {
419            WaitForCompletionOrTerminateResult::TerminatedAfterTimeout { result, timeout } => {
420                assert_that!(timeout).is_equal_to(Duration::from_millis(10));
421                result
422            }
423            other @ WaitForCompletionOrTerminateResult::Completed(_) => {
424                assert_that!(&other).fail(format_args!(
425                    "expected timeout-driven termination, got {other:?}"
426                ));
427                return;
428            }
429        };
430
431        assert_that!(status.success()).is_false();
432        assert_that!(process.is_drop_disarmed()).is_true();
433    }
434}