tokio_process_tools/process_handle/
wait.rs1use super::ProcessHandle;
2use super::WaitForCompletionOrTerminateOptions;
3use crate::error::{
4 WaitError, WaitForCompletionOrTerminateResult, WaitForCompletionResult, WaitOrTerminateError,
5};
6use crate::output_stream::OutputStream;
7use std::io;
8use std::process::ExitStatus;
9use std::time::Duration;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub(super) struct WaitOrTerminateOutcome {
13 pub(super) result: WaitForCompletionOrTerminateResult<ExitStatus>,
14 pub(super) output_collection_timeout_budget: Duration,
15}
16
17fn wait_or_terminate_base_budget(
18 wait_timeout: Duration,
19 interrupt_timeout: Duration,
20 terminate_timeout: Duration,
21) -> Duration {
22 wait_timeout
23 .saturating_add(interrupt_timeout)
24 .saturating_add(terminate_timeout)
25}
26
27impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
28where
29 Stdout: OutputStream,
30 Stderr: OutputStream,
31{
32 async fn wait(&mut self) -> io::Result<ExitStatus> {
37 self.stdin().close();
38 match self.child.wait().await {
39 Ok(status) => {
40 self.must_not_be_terminated();
41 Ok(status)
42 }
43 Err(err) => Err(err),
44 }
45 }
46
47 pub async fn wait_for_completion(
65 &mut self,
66 timeout: Duration,
67 ) -> Result<WaitForCompletionResult, WaitError> {
68 self.wait_for_completion_inner(timeout).await
69 }
70
71 pub(super) async fn wait_for_completion_inner(
72 &mut self,
73 timeout: Duration,
74 ) -> Result<WaitForCompletionResult, WaitError> {
75 match tokio::time::timeout(timeout, self.wait()).await {
76 Ok(Ok(exit_status)) => Ok(WaitForCompletionResult::Completed(exit_status)),
77 Ok(Err(source)) => Err(WaitError::IoError {
78 process_name: self.name.clone(),
79 source,
80 }),
81 Err(_elapsed) => Ok(WaitForCompletionResult::Timeout { timeout }),
82 }
83 }
84
85 pub(super) async fn wait_for_completion_unbounded_inner(
86 &mut self,
87 ) -> Result<ExitStatus, WaitError> {
88 self.wait().await.map_err(|source| WaitError::IoError {
89 process_name: self.name.clone(),
90 source,
91 })
92 }
93
94 pub(super) async fn wait_for_exit_after_signal(
95 &mut self,
96 timeout: Duration,
97 ) -> Result<Option<ExitStatus>, WaitError> {
98 match self.wait_for_completion_inner(timeout).await? {
99 WaitForCompletionResult::Completed(exit_status) => Ok(Some(exit_status)),
100 WaitForCompletionResult::Timeout { .. } => Ok(None),
101 }
102 }
103
104 fn wait_timeout_error(timeout: Duration) -> io::Error {
105 io::Error::new(
106 io::ErrorKind::TimedOut,
107 format!("process did not complete within {timeout:?}"),
108 )
109 }
110
111 pub(super) fn wait_timeout_diagnostic(timeout: Duration) -> io::Error {
112 Self::wait_timeout_error(timeout)
113 }
114
115 fn terminated_after_timeout_result(
116 exit_status: ExitStatus,
117 timeout: Duration,
118 output_collection_timeout_budget: Duration,
119 ) -> WaitOrTerminateOutcome {
120 WaitOrTerminateOutcome {
121 result: WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
122 result: exit_status,
123 timeout,
124 },
125 output_collection_timeout_budget,
126 }
127 }
128
129 pub(super) fn exit_status_from_wait_or_terminate_result(
130 result: WaitForCompletionOrTerminateResult<ExitStatus>,
131 ) -> ExitStatus {
132 match result {
133 WaitForCompletionOrTerminateResult::Completed(exit_status)
134 | WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
135 result: exit_status,
136 ..
137 } => exit_status,
138 }
139 }
140
141 pub async fn wait_for_completion_or_terminate(
160 &mut self,
161 options: WaitForCompletionOrTerminateOptions,
162 ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
163 self.wait_for_completion_or_terminate_inner(
164 options.wait_timeout,
165 options.interrupt_timeout,
166 options.terminate_timeout,
167 )
168 .await
169 }
170
171 pub(super) async fn wait_for_completion_or_terminate_inner_detailed(
172 &mut self,
173 wait_timeout: Duration,
174 interrupt_timeout: Duration,
175 terminate_timeout: Duration,
176 ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
177 let output_collection_timeout_budget =
178 wait_or_terminate_base_budget(wait_timeout, interrupt_timeout, terminate_timeout);
179
180 match self.wait_for_completion_inner(wait_timeout).await {
181 Ok(WaitForCompletionResult::Completed(exit_status)) => Ok(WaitOrTerminateOutcome {
182 result: WaitForCompletionOrTerminateResult::Completed(exit_status),
183 output_collection_timeout_budget,
184 }),
185 Ok(WaitForCompletionResult::Timeout { timeout }) => {
186 self.terminate_after_wait_timeout_detailed(
187 timeout,
188 interrupt_timeout,
189 terminate_timeout,
190 )
191 .await
192 }
193 Err(wait_error) => {
194 self.terminate_after_wait_error_detailed(
195 wait_error,
196 wait_timeout,
197 interrupt_timeout,
198 terminate_timeout,
199 )
200 .await
201 }
202 }
203 }
204
205 pub(super) async fn wait_for_completion_or_terminate_inner(
206 &mut self,
207 wait_timeout: Duration,
208 interrupt_timeout: Duration,
209 terminate_timeout: Duration,
210 ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
211 self.wait_for_completion_or_terminate_inner_detailed(
212 wait_timeout,
213 interrupt_timeout,
214 terminate_timeout,
215 )
216 .await
217 .map(|outcome| outcome.result)
218 }
219
220 pub(super) async fn terminate_after_wait_timeout_detailed(
221 &mut self,
222 wait_timeout: Duration,
223 interrupt_timeout: Duration,
224 terminate_timeout: Duration,
225 ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
226 let process_name = self.name.clone();
227 let output_collection_timeout_budget =
228 wait_or_terminate_base_budget(wait_timeout, interrupt_timeout, terminate_timeout);
229 match self
230 .terminate_detailed(interrupt_timeout, terminate_timeout)
231 .await
232 {
233 Ok(termination_outcome) => Ok(Self::terminated_after_timeout_result(
234 termination_outcome.exit_status,
235 wait_timeout,
236 output_collection_timeout_budget
237 .saturating_add(termination_outcome.output_collection_timeout_extension),
238 )),
239 Err(termination_error) => Err(WaitOrTerminateError::TerminationAfterTimeoutFailed {
240 process_name,
241 timeout: wait_timeout,
242 termination_error,
243 }),
244 }
245 }
246
247 pub(super) async fn terminate_after_wait_error_detailed(
248 &mut self,
249 wait_error: WaitError,
250 _wait_timeout: Duration,
251 interrupt_timeout: Duration,
252 terminate_timeout: Duration,
253 ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
254 let process_name = self.name.clone();
255
256 match self
257 .terminate_detailed(interrupt_timeout, terminate_timeout)
258 .await
259 {
260 Ok(termination_outcome) => Err(WaitOrTerminateError::WaitFailed {
261 process_name,
262 wait_error: Box::new(wait_error),
263 termination_status: termination_outcome.exit_status,
264 }),
265 Err(termination_error) => Err(WaitOrTerminateError::TerminationFailed {
266 process_name,
267 wait_error: Box::new(wait_error),
268 termination_error,
269 }),
270 }
271 }
272
273 #[cfg_attr(not(test), allow(dead_code))]
274 pub(super) async fn terminate_after_wait_error(
275 &mut self,
276 wait_error: WaitError,
277 interrupt_timeout: Duration,
278 terminate_timeout: Duration,
279 ) -> Result<ExitStatus, WaitOrTerminateError> {
280 self.terminate_after_wait_error_detailed(
281 wait_error,
282 Duration::ZERO,
283 interrupt_timeout,
284 terminate_timeout,
285 )
286 .await
287 .map(|outcome| Self::exit_status_from_wait_or_terminate_result(outcome.result))
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use crate::test_support::{
295 line_collection_options, line_parsing_options, long_running_command,
296 };
297 use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, NumBytesExt};
298 use assertr::prelude::*;
299 use tokio::io::AsyncWriteExt;
300
301 fn wait_or_terminate_options(wait_timeout: Duration) -> WaitForCompletionOrTerminateOptions {
302 WaitForCompletionOrTerminateOptions {
303 wait_timeout,
304 interrupt_timeout: Duration::from_secs(1),
305 terminate_timeout: Duration::from_secs(1),
306 }
307 }
308
309 #[tokio::test]
310 async fn wait_for_completion_disarms_cleanup_and_panic_guards() {
311 let mut process = crate::Process::new(long_running_command(Duration::from_millis(100)))
312 .name("long-running")
313 .stdout_and_stderr(|stream| {
314 stream
315 .broadcast()
316 .best_effort_delivery()
317 .no_replay()
318 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
319 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
320 })
321 .spawn()
322 .unwrap();
323
324 process
325 .wait_for_completion(Duration::from_secs(2))
326 .await
327 .unwrap()
328 .expect_completed("process should complete");
329
330 assert_that!(process.is_drop_disarmed()).is_true();
331 }
332
333 #[tokio::test]
334 async fn wait_for_completion_closes_stdin_before_waiting() {
335 let cmd = tokio::process::Command::new("cat");
336 let mut process = crate::Process::new(cmd)
337 .name("cat")
338 .stdout_and_stderr(|stream| {
339 stream
340 .broadcast()
341 .best_effort_delivery()
342 .no_replay()
343 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
344 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
345 })
346 .spawn()
347 .unwrap();
348
349 let collector = process
350 .stdout()
351 .collect_lines_into_vec(line_parsing_options(), line_collection_options());
352
353 let Some(stdin) = process.stdin().as_mut() else {
354 assert_that!(process.stdin().is_open()).fail("stdin should start open");
355 return;
356 };
357 stdin.write_all(b"wait closes stdin\n").await.unwrap();
358 stdin.flush().await.unwrap();
359
360 let status = process
361 .wait_for_completion(Duration::from_secs(2))
362 .await
363 .unwrap()
364 .expect_completed("process should complete");
365
366 assert_that!(status.success()).is_true();
367 assert_that!(process.stdin().is_open()).is_false();
368
369 let collected = collector.wait().await.unwrap();
370 assert_that!(collected.lines().len()).is_equal_to(1);
371 assert_that!(collected[0].as_str()).is_equal_to("wait closes stdin");
372 }
373
374 #[tokio::test]
375 async fn or_terminate_returns_wait_failure_after_cleanup() {
376 let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
377 .name("long-running")
378 .stdout_and_stderr(|stream| {
379 stream
380 .broadcast()
381 .best_effort_delivery()
382 .replay_last_bytes(1.megabytes())
383 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
384 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
385 })
386 .spawn()
387 .unwrap();
388
389 let wait_error = WaitError::IoError {
390 process_name: "long-running".into(),
391 source: io::Error::other("synthetic wait failure"),
392 };
393
394 let result = process
395 .terminate_after_wait_error(wait_error, Duration::from_secs(1), Duration::from_secs(1))
396 .await;
397
398 assert_that!(process.is_drop_disarmed()).is_true();
399
400 let (wait_error, termination_status) = match result {
401 Err(WaitOrTerminateError::WaitFailed {
402 wait_error,
403 termination_status,
404 ..
405 }) => (wait_error, termination_status),
406 other => {
407 assert_that!(&other).fail(format_args!(
408 "expected wait failure preserved after successful cleanup, got {other:?}"
409 ));
410 return;
411 }
412 };
413
414 assert_that!(termination_status.code()).is_none();
415
416 let WaitError::IoError { source, .. } = *wait_error;
417 assert_that!(source.to_string().as_str()).is_equal_to("synthetic wait failure");
418 }
419
420 #[tokio::test]
421 async fn wait_for_completion_or_terminate_terminates_after_timeout() {
422 let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
423 .name("long-running")
424 .stdout_and_stderr(|stream| {
425 stream
426 .broadcast()
427 .best_effort_delivery()
428 .no_replay()
429 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
430 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
431 })
432 .spawn()
433 .unwrap();
434
435 let result = process
436 .wait_for_completion_or_terminate(wait_or_terminate_options(Duration::from_millis(10)))
437 .await
438 .unwrap();
439 let status = match result {
440 WaitForCompletionOrTerminateResult::TerminatedAfterTimeout { result, timeout } => {
441 assert_that!(timeout).is_equal_to(Duration::from_millis(10));
442 result
443 }
444 other @ WaitForCompletionOrTerminateResult::Completed(_) => {
445 assert_that!(&other).fail(format_args!(
446 "expected timeout-driven termination, got {other:?}"
447 ));
448 return;
449 }
450 };
451
452 assert_that!(status.success()).is_false();
453 assert_that!(process.is_drop_disarmed()).is_true();
454 }
455}