Skip to main content

tokio_process_tools/process_handle/
wait.rs

1use super::ProcessHandle;
2use super::WaitForCompletionOrTerminateOptions;
3use crate::error::{
4    WaitError, WaitForCompletionOrTerminateResult, WaitForCompletionResult, WaitOrTerminateError,
5};
6use crate::output_stream::OutputStream;
7use std::io;
8use std::process::ExitStatus;
9use std::time::Duration;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub(super) struct WaitOrTerminateOutcome {
13    pub(super) result: WaitForCompletionOrTerminateResult<ExitStatus>,
14    pub(super) output_collection_timeout_budget: Duration,
15}
16
17fn wait_or_terminate_base_budget(
18    wait_timeout: Duration,
19    interrupt_timeout: Duration,
20    terminate_timeout: Duration,
21) -> Duration {
22    wait_timeout
23        .saturating_add(interrupt_timeout)
24        .saturating_add(terminate_timeout)
25}
26
27impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
28where
29    Stdout: OutputStream,
30    Stderr: OutputStream,
31{
32    /// Successfully awaiting the completion of the process will unset the
33    /// "must be terminated" setting, as a successfully awaited process is already terminated.
34    /// Dropping this `ProcessHandle` after successfully calling `wait` should never lead to a
35    /// "must be terminated" panic being raised.
36    async fn wait(&mut self) -> io::Result<ExitStatus> {
37        self.stdin().close();
38        match self.child.wait().await {
39            Ok(status) => {
40                self.must_not_be_terminated();
41                Ok(status)
42            }
43            Err(err) => Err(err),
44        }
45    }
46
47    /// Wait for this process to run to completion within `timeout`.
48    ///
49    /// Any still-open stdin handle is closed before waiting begins, matching
50    /// [`tokio::process::Child::wait`] and helping avoid deadlocks where the child is waiting for
51    /// input while the parent is waiting for exit.
52    ///
53    /// If the timeout is reached before the process terminated, a timeout outcome is returned and the
54    /// process keeps running without being signalled or killed. Its stdin has still been closed
55    /// before the wait started.
56    /// Use [`ProcessHandle::wait_for_completion_or_terminate`] if you want immediate termination.
57    ///
58    /// This does not provide the processes output. Use [`ProcessHandle::stdout`] and
59    /// [`ProcessHandle::stderr`] to inspect, watch over, or capture the process output.
60    ///
61    /// # Errors
62    ///
63    /// Returns [`WaitError`] if waiting for the process fails.
64    pub async fn wait_for_completion(
65        &mut self,
66        timeout: Duration,
67    ) -> Result<WaitForCompletionResult, WaitError> {
68        self.wait_for_completion_inner(timeout).await
69    }
70
71    pub(super) async fn wait_for_completion_inner(
72        &mut self,
73        timeout: Duration,
74    ) -> Result<WaitForCompletionResult, WaitError> {
75        match tokio::time::timeout(timeout, self.wait()).await {
76            Ok(Ok(exit_status)) => Ok(WaitForCompletionResult::Completed(exit_status)),
77            Ok(Err(source)) => Err(WaitError::IoError {
78                process_name: self.name.clone(),
79                source,
80            }),
81            Err(_elapsed) => Ok(WaitForCompletionResult::Timeout { timeout }),
82        }
83    }
84
85    pub(super) async fn wait_for_completion_unbounded_inner(
86        &mut self,
87    ) -> Result<ExitStatus, WaitError> {
88        self.wait().await.map_err(|source| WaitError::IoError {
89            process_name: self.name.clone(),
90            source,
91        })
92    }
93
94    pub(super) async fn wait_for_exit_after_signal(
95        &mut self,
96        timeout: Duration,
97    ) -> Result<Option<ExitStatus>, WaitError> {
98        match self.wait_for_completion_inner(timeout).await? {
99            WaitForCompletionResult::Completed(exit_status) => Ok(Some(exit_status)),
100            WaitForCompletionResult::Timeout { .. } => Ok(None),
101        }
102    }
103
104    fn wait_timeout_error(timeout: Duration) -> io::Error {
105        io::Error::new(
106            io::ErrorKind::TimedOut,
107            format!("process did not complete within {timeout:?}"),
108        )
109    }
110
111    pub(super) fn wait_timeout_diagnostic(timeout: Duration) -> io::Error {
112        Self::wait_timeout_error(timeout)
113    }
114
115    fn terminated_after_timeout_result(
116        exit_status: ExitStatus,
117        timeout: Duration,
118        output_collection_timeout_budget: Duration,
119    ) -> WaitOrTerminateOutcome {
120        WaitOrTerminateOutcome {
121            result: WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
122                result: exit_status,
123                timeout,
124            },
125            output_collection_timeout_budget,
126        }
127    }
128
129    pub(super) fn exit_status_from_wait_or_terminate_result(
130        result: WaitForCompletionOrTerminateResult<ExitStatus>,
131    ) -> ExitStatus {
132        match result {
133            WaitForCompletionOrTerminateResult::Completed(exit_status)
134            | WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
135                result: exit_status,
136                ..
137            } => exit_status,
138        }
139    }
140
141    /// Wait for this process to run to completion within `timeout`.
142    ///
143    /// Any still-open stdin handle is closed before the initial wait begins, matching
144    /// [`tokio::process::Child::wait`].
145    ///
146    /// If the timeout is reached before the process terminated normally, external termination is
147    /// forced through [`ProcessHandle::terminate`] and a
148    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`] outcome is returned when
149    /// cleanup succeeds. If waiting fails for a non-timeout reason, cleanup termination is still
150    /// attempted and the original wait failure is preserved in the returned error.
151    /// Total wall-clock time can exceed
152    /// `wait_timeout + interrupt_timeout + terminate_timeout` by one additional fixed 3-second
153    /// wait when force-kill fallback is required.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`WaitOrTerminateError`] if waiting fails or if timeout-triggered termination is
158    /// required and then fails.
159    pub async fn wait_for_completion_or_terminate(
160        &mut self,
161        options: WaitForCompletionOrTerminateOptions,
162    ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
163        self.wait_for_completion_or_terminate_inner(
164            options.wait_timeout,
165            options.interrupt_timeout,
166            options.terminate_timeout,
167        )
168        .await
169    }
170
171    pub(super) async fn wait_for_completion_or_terminate_inner_detailed(
172        &mut self,
173        wait_timeout: Duration,
174        interrupt_timeout: Duration,
175        terminate_timeout: Duration,
176    ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
177        let output_collection_timeout_budget =
178            wait_or_terminate_base_budget(wait_timeout, interrupt_timeout, terminate_timeout);
179
180        match self.wait_for_completion_inner(wait_timeout).await {
181            Ok(WaitForCompletionResult::Completed(exit_status)) => Ok(WaitOrTerminateOutcome {
182                result: WaitForCompletionOrTerminateResult::Completed(exit_status),
183                output_collection_timeout_budget,
184            }),
185            Ok(WaitForCompletionResult::Timeout { timeout }) => {
186                self.terminate_after_wait_timeout_detailed(
187                    timeout,
188                    interrupt_timeout,
189                    terminate_timeout,
190                )
191                .await
192            }
193            Err(wait_error) => {
194                self.terminate_after_wait_error_detailed(
195                    wait_error,
196                    wait_timeout,
197                    interrupt_timeout,
198                    terminate_timeout,
199                )
200                .await
201            }
202        }
203    }
204
205    pub(super) async fn wait_for_completion_or_terminate_inner(
206        &mut self,
207        wait_timeout: Duration,
208        interrupt_timeout: Duration,
209        terminate_timeout: Duration,
210    ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
211        self.wait_for_completion_or_terminate_inner_detailed(
212            wait_timeout,
213            interrupt_timeout,
214            terminate_timeout,
215        )
216        .await
217        .map(|outcome| outcome.result)
218    }
219
220    pub(super) async fn terminate_after_wait_timeout_detailed(
221        &mut self,
222        wait_timeout: Duration,
223        interrupt_timeout: Duration,
224        terminate_timeout: Duration,
225    ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
226        let process_name = self.name.clone();
227        let output_collection_timeout_budget =
228            wait_or_terminate_base_budget(wait_timeout, interrupt_timeout, terminate_timeout);
229        match self
230            .terminate_detailed(interrupt_timeout, terminate_timeout)
231            .await
232        {
233            Ok(termination_outcome) => Ok(Self::terminated_after_timeout_result(
234                termination_outcome.exit_status,
235                wait_timeout,
236                output_collection_timeout_budget
237                    .saturating_add(termination_outcome.output_collection_timeout_extension),
238            )),
239            Err(termination_error) => Err(WaitOrTerminateError::TerminationAfterTimeoutFailed {
240                process_name,
241                timeout: wait_timeout,
242                termination_error,
243            }),
244        }
245    }
246
247    pub(super) async fn terminate_after_wait_error_detailed(
248        &mut self,
249        wait_error: WaitError,
250        _wait_timeout: Duration,
251        interrupt_timeout: Duration,
252        terminate_timeout: Duration,
253    ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
254        let process_name = self.name.clone();
255
256        match self
257            .terminate_detailed(interrupt_timeout, terminate_timeout)
258            .await
259        {
260            Ok(termination_outcome) => Err(WaitOrTerminateError::WaitFailed {
261                process_name,
262                wait_error: Box::new(wait_error),
263                termination_status: termination_outcome.exit_status,
264            }),
265            Err(termination_error) => Err(WaitOrTerminateError::TerminationFailed {
266                process_name,
267                wait_error: Box::new(wait_error),
268                termination_error,
269            }),
270        }
271    }
272
273    #[cfg_attr(not(test), allow(dead_code))]
274    pub(super) async fn terminate_after_wait_error(
275        &mut self,
276        wait_error: WaitError,
277        interrupt_timeout: Duration,
278        terminate_timeout: Duration,
279    ) -> Result<ExitStatus, WaitOrTerminateError> {
280        self.terminate_after_wait_error_detailed(
281            wait_error,
282            Duration::ZERO,
283            interrupt_timeout,
284            terminate_timeout,
285        )
286        .await
287        .map(|outcome| Self::exit_status_from_wait_or_terminate_result(outcome.result))
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use crate::test_support::{
295        line_collection_options, line_parsing_options, long_running_command,
296    };
297    use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, NumBytesExt};
298    use assertr::prelude::*;
299    use tokio::io::AsyncWriteExt;
300
301    fn wait_or_terminate_options(wait_timeout: Duration) -> WaitForCompletionOrTerminateOptions {
302        WaitForCompletionOrTerminateOptions {
303            wait_timeout,
304            interrupt_timeout: Duration::from_secs(1),
305            terminate_timeout: Duration::from_secs(1),
306        }
307    }
308
309    #[tokio::test]
310    async fn wait_for_completion_disarms_cleanup_and_panic_guards() {
311        let mut process = crate::Process::new(long_running_command(Duration::from_millis(100)))
312            .name("long-running")
313            .stdout_and_stderr(|stream| {
314                stream
315                    .broadcast()
316                    .best_effort_delivery()
317                    .no_replay()
318                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
319                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
320            })
321            .spawn()
322            .unwrap();
323
324        process
325            .wait_for_completion(Duration::from_secs(2))
326            .await
327            .unwrap()
328            .expect_completed("process should complete");
329
330        assert_that!(process.is_drop_disarmed()).is_true();
331    }
332
333    #[tokio::test]
334    async fn wait_for_completion_closes_stdin_before_waiting() {
335        let cmd = tokio::process::Command::new("cat");
336        let mut process = crate::Process::new(cmd)
337            .name("cat")
338            .stdout_and_stderr(|stream| {
339                stream
340                    .broadcast()
341                    .best_effort_delivery()
342                    .no_replay()
343                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
344                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
345            })
346            .spawn()
347            .unwrap();
348
349        let collector = process
350            .stdout()
351            .collect_lines_into_vec(line_parsing_options(), line_collection_options());
352
353        let Some(stdin) = process.stdin().as_mut() else {
354            assert_that!(process.stdin().is_open()).fail("stdin should start open");
355            return;
356        };
357        stdin.write_all(b"wait closes stdin\n").await.unwrap();
358        stdin.flush().await.unwrap();
359
360        let status = process
361            .wait_for_completion(Duration::from_secs(2))
362            .await
363            .unwrap()
364            .expect_completed("process should complete");
365
366        assert_that!(status.success()).is_true();
367        assert_that!(process.stdin().is_open()).is_false();
368
369        let collected = collector.wait().await.unwrap();
370        assert_that!(collected.lines().len()).is_equal_to(1);
371        assert_that!(collected[0].as_str()).is_equal_to("wait closes stdin");
372    }
373
374    #[tokio::test]
375    async fn or_terminate_returns_wait_failure_after_cleanup() {
376        let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
377            .name("long-running")
378            .stdout_and_stderr(|stream| {
379                stream
380                    .broadcast()
381                    .best_effort_delivery()
382                    .replay_last_bytes(1.megabytes())
383                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
384                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
385            })
386            .spawn()
387            .unwrap();
388
389        let wait_error = WaitError::IoError {
390            process_name: "long-running".into(),
391            source: io::Error::other("synthetic wait failure"),
392        };
393
394        let result = process
395            .terminate_after_wait_error(wait_error, Duration::from_secs(1), Duration::from_secs(1))
396            .await;
397
398        assert_that!(process.is_drop_disarmed()).is_true();
399
400        let (wait_error, termination_status) = match result {
401            Err(WaitOrTerminateError::WaitFailed {
402                wait_error,
403                termination_status,
404                ..
405            }) => (wait_error, termination_status),
406            other => {
407                assert_that!(&other).fail(format_args!(
408                    "expected wait failure preserved after successful cleanup, got {other:?}"
409                ));
410                return;
411            }
412        };
413
414        assert_that!(termination_status.code()).is_none();
415
416        let WaitError::IoError { source, .. } = *wait_error;
417        assert_that!(source.to_string().as_str()).is_equal_to("synthetic wait failure");
418    }
419
420    #[tokio::test]
421    async fn wait_for_completion_or_terminate_terminates_after_timeout() {
422        let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
423            .name("long-running")
424            .stdout_and_stderr(|stream| {
425                stream
426                    .broadcast()
427                    .best_effort_delivery()
428                    .no_replay()
429                    .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
430                    .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
431            })
432            .spawn()
433            .unwrap();
434
435        let result = process
436            .wait_for_completion_or_terminate(wait_or_terminate_options(Duration::from_millis(10)))
437            .await
438            .unwrap();
439        let status = match result {
440            WaitForCompletionOrTerminateResult::TerminatedAfterTimeout { result, timeout } => {
441                assert_that!(timeout).is_equal_to(Duration::from_millis(10));
442                result
443            }
444            other @ WaitForCompletionOrTerminateResult::Completed(_) => {
445                assert_that!(&other).fail(format_args!(
446                    "expected timeout-driven termination, got {other:?}"
447                ));
448                return;
449            }
450        };
451
452        assert_that!(status.success()).is_false();
453        assert_that!(process.is_drop_disarmed()).is_true();
454    }
455}