tokio_process_tools/process_handle/
wait.rs1use super::ProcessHandle;
2#[cfg(any(unix, windows))]
3use super::WaitForCompletionOrTerminateOptions;
4#[cfg(any(unix, windows))]
5use super::termination::GracefulTimeouts;
6use crate::error::{WaitError, WaitForCompletionResult};
7#[cfg(any(unix, windows))]
8use crate::error::{WaitForCompletionOrTerminateResult, WaitOrTerminateError};
9use crate::output_stream::OutputStream;
10use std::io;
11use std::process::ExitStatus;
12use std::time::Duration;
13
14#[cfg(any(unix, windows))]
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub(super) struct WaitOrTerminateOutcome {
17 pub(super) result: WaitForCompletionOrTerminateResult<ExitStatus>,
18 pub(super) output_collection_timeout_budget: Duration,
19}
20
21#[cfg(any(unix, windows))]
22fn wait_or_terminate_base_budget(wait_timeout: Duration, timeouts: GracefulTimeouts) -> Duration {
23 wait_timeout.saturating_add(timeouts.total())
24}
25
26impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
27where
28 Stdout: OutputStream,
29 Stderr: OutputStream,
30{
31 async fn wait(&mut self) -> io::Result<ExitStatus> {
36 self.stdin().close();
37 match self.child.wait().await {
38 Ok(status) => {
39 self.must_not_be_terminated();
40 Ok(status)
41 }
42 Err(err) => Err(err),
43 }
44 }
45
46 pub async fn wait_for_completion(
64 &mut self,
65 timeout: Duration,
66 ) -> Result<WaitForCompletionResult, WaitError> {
67 self.wait_for_completion_inner(timeout).await
68 }
69
70 pub(super) async fn wait_for_completion_inner(
71 &mut self,
72 timeout: Duration,
73 ) -> Result<WaitForCompletionResult, WaitError> {
74 match tokio::time::timeout(timeout, self.wait()).await {
75 Ok(Ok(exit_status)) => Ok(WaitForCompletionResult::Completed(exit_status)),
76 Ok(Err(source)) => Err(WaitError::IoError {
77 process_name: self.name.clone(),
78 source,
79 }),
80 Err(_elapsed) => Ok(WaitForCompletionResult::Timeout { timeout }),
81 }
82 }
83
84 pub(super) async fn wait_for_completion_unbounded_inner(
85 &mut self,
86 ) -> Result<ExitStatus, WaitError> {
87 self.wait().await.map_err(|source| WaitError::IoError {
88 process_name: self.name.clone(),
89 source,
90 })
91 }
92
93 pub(super) async fn wait_for_exit_after_signal(
94 &mut self,
95 timeout: Duration,
96 ) -> Result<Option<ExitStatus>, WaitError> {
97 match self.wait_for_completion_inner(timeout).await? {
98 WaitForCompletionResult::Completed(exit_status) => Ok(Some(exit_status)),
99 WaitForCompletionResult::Timeout { .. } => Ok(None),
100 }
101 }
102
103 fn wait_timeout_error(timeout: Duration) -> io::Error {
104 io::Error::new(
105 io::ErrorKind::TimedOut,
106 format!("process did not complete within {timeout:?}"),
107 )
108 }
109
110 pub(super) fn wait_timeout_diagnostic(timeout: Duration) -> io::Error {
111 Self::wait_timeout_error(timeout)
112 }
113}
114
115#[cfg(any(unix, windows))]
116impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
117where
118 Stdout: OutputStream,
119 Stderr: OutputStream,
120{
121 fn terminated_after_timeout_result(
122 exit_status: ExitStatus,
123 timeout: Duration,
124 output_collection_timeout_budget: Duration,
125 ) -> WaitOrTerminateOutcome {
126 WaitOrTerminateOutcome {
127 result: WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
128 result: exit_status,
129 timeout,
130 },
131 output_collection_timeout_budget,
132 }
133 }
134
135 pub(super) fn exit_status_from_wait_or_terminate_result(
136 result: WaitForCompletionOrTerminateResult<ExitStatus>,
137 ) -> ExitStatus {
138 match result {
139 WaitForCompletionOrTerminateResult::Completed(exit_status)
140 | WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
141 result: exit_status,
142 ..
143 } => exit_status,
144 }
145 }
146
147 pub async fn wait_for_completion_or_terminate(
168 &mut self,
169 options: WaitForCompletionOrTerminateOptions,
170 ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
171 self.wait_for_completion_or_terminate_inner(options.wait_timeout, options.graceful_timeouts)
172 .await
173 }
174
175 pub(super) async fn wait_for_completion_or_terminate_inner_detailed(
176 &mut self,
177 wait_timeout: Duration,
178 timeouts: GracefulTimeouts,
179 ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
180 let output_collection_timeout_budget =
181 wait_or_terminate_base_budget(wait_timeout, timeouts);
182
183 match self.wait_for_completion_inner(wait_timeout).await {
184 Ok(WaitForCompletionResult::Completed(exit_status)) => Ok(WaitOrTerminateOutcome {
185 result: WaitForCompletionOrTerminateResult::Completed(exit_status),
186 output_collection_timeout_budget,
187 }),
188 Ok(WaitForCompletionResult::Timeout { timeout }) => {
189 self.terminate_after_wait_timeout_detailed(timeout, timeouts)
190 .await
191 }
192 Err(wait_error) => {
193 self.terminate_after_wait_error_detailed(wait_error, wait_timeout, timeouts)
194 .await
195 }
196 }
197 }
198
199 pub(super) async fn wait_for_completion_or_terminate_inner(
200 &mut self,
201 wait_timeout: Duration,
202 timeouts: GracefulTimeouts,
203 ) -> Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError> {
204 self.wait_for_completion_or_terminate_inner_detailed(wait_timeout, timeouts)
205 .await
206 .map(|outcome| outcome.result)
207 }
208
209 pub(super) async fn terminate_after_wait_timeout_detailed(
210 &mut self,
211 wait_timeout: Duration,
212 timeouts: GracefulTimeouts,
213 ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
214 let process_name = self.name.clone();
215 let output_collection_timeout_budget =
216 wait_or_terminate_base_budget(wait_timeout, timeouts);
217 match self.terminate_detailed(timeouts).await {
218 Ok(termination_outcome) => Ok(Self::terminated_after_timeout_result(
219 termination_outcome.exit_status,
220 wait_timeout,
221 output_collection_timeout_budget
222 .saturating_add(termination_outcome.output_collection_timeout_extension),
223 )),
224 Err(termination_error) => Err(WaitOrTerminateError::TerminationAfterTimeoutFailed {
225 process_name,
226 timeout: wait_timeout,
227 termination_error,
228 }),
229 }
230 }
231
232 pub(super) async fn terminate_after_wait_error_detailed(
233 &mut self,
234 wait_error: WaitError,
235 _wait_timeout: Duration,
236 timeouts: GracefulTimeouts,
237 ) -> Result<WaitOrTerminateOutcome, WaitOrTerminateError> {
238 let process_name = self.name.clone();
239
240 match self.terminate_detailed(timeouts).await {
241 Ok(termination_outcome) => Err(WaitOrTerminateError::WaitFailed {
242 process_name,
243 wait_error: Box::new(wait_error),
244 termination_status: termination_outcome.exit_status,
245 }),
246 Err(termination_error) => Err(WaitOrTerminateError::TerminationFailed {
247 process_name,
248 wait_error: Box::new(wait_error),
249 termination_error,
250 }),
251 }
252 }
253
254 #[cfg_attr(not(test), allow(dead_code))]
255 pub(super) async fn terminate_after_wait_error(
256 &mut self,
257 wait_error: WaitError,
258 timeouts: GracefulTimeouts,
259 ) -> Result<ExitStatus, WaitOrTerminateError> {
260 self.terminate_after_wait_error_detailed(wait_error, Duration::ZERO, timeouts)
261 .await
262 .map(|outcome| Self::exit_status_from_wait_or_terminate_result(outcome.result))
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 #[cfg(any(unix, windows))]
270 use crate::test_support::default_graceful_timeouts;
271 use crate::test_support::{
272 line_collection_options, line_parsing_options, long_running_command,
273 };
274 use crate::{DEFAULT_MAX_BUFFERED_CHUNKS, DEFAULT_READ_CHUNK_SIZE, NumBytesExt};
275 use assertr::prelude::*;
276 use tokio::io::AsyncWriteExt;
277
278 #[cfg(any(unix, windows))]
279 fn wait_or_terminate_options(wait_timeout: Duration) -> WaitForCompletionOrTerminateOptions {
280 WaitForCompletionOrTerminateOptions {
281 wait_timeout,
282 graceful_timeouts: default_graceful_timeouts(),
283 }
284 }
285
286 #[tokio::test]
287 async fn wait_for_completion_disarms_cleanup_and_panic_guards() {
288 let mut process = crate::Process::new(long_running_command(Duration::from_millis(100)))
289 .name("long-running")
290 .stdout_and_stderr(|stream| {
291 stream
292 .broadcast()
293 .best_effort_delivery()
294 .no_replay()
295 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
296 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
297 })
298 .spawn()
299 .unwrap();
300
301 process
302 .wait_for_completion(Duration::from_secs(2))
303 .await
304 .unwrap()
305 .expect_completed("process should complete");
306
307 assert_that!(process.is_drop_disarmed()).is_true();
308 }
309
310 #[tokio::test]
311 async fn wait_for_completion_closes_stdin_before_waiting() {
312 let cmd = tokio::process::Command::new("cat");
313 let mut process = crate::Process::new(cmd)
314 .name("cat")
315 .stdout_and_stderr(|stream| {
316 stream
317 .broadcast()
318 .best_effort_delivery()
319 .no_replay()
320 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
321 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
322 })
323 .spawn()
324 .unwrap();
325
326 let collector = process
327 .stdout()
328 .collect_lines_into_vec(line_parsing_options(), line_collection_options());
329
330 let Some(stdin) = process.stdin().as_mut() else {
331 assert_that!(process.stdin().is_open()).fail("stdin should start open");
332 return;
333 };
334 stdin.write_all(b"wait closes stdin\n").await.unwrap();
335 stdin.flush().await.unwrap();
336
337 let status = process
338 .wait_for_completion(Duration::from_secs(2))
339 .await
340 .unwrap()
341 .expect_completed("process should complete");
342
343 assert_that!(status.success()).is_true();
344 assert_that!(process.stdin().is_open()).is_false();
345
346 let collected = collector.wait().await.unwrap();
347 assert_that!(collected.lines().len()).is_equal_to(1);
348 assert_that!(collected[0].as_str()).is_equal_to("wait closes stdin");
349 }
350
351 #[cfg(any(unix, windows))]
352 #[tokio::test]
353 async fn or_terminate_returns_wait_failure_after_cleanup() {
354 let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
355 .name("long-running")
356 .stdout_and_stderr(|stream| {
357 stream
358 .broadcast()
359 .best_effort_delivery()
360 .replay_last_bytes(1.megabytes())
361 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
362 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
363 })
364 .spawn()
365 .unwrap();
366
367 let wait_error = WaitError::IoError {
368 process_name: "long-running".into(),
369 source: io::Error::other("synthetic wait failure"),
370 };
371
372 let result = process
373 .terminate_after_wait_error(wait_error, default_graceful_timeouts())
374 .await;
375
376 assert_that!(process.is_drop_disarmed()).is_true();
377
378 let (wait_error, termination_status) = match result {
379 Err(WaitOrTerminateError::WaitFailed {
380 wait_error,
381 termination_status,
382 ..
383 }) => (wait_error, termination_status),
384 other => {
385 assert_that!(&other).fail(format_args!(
386 "expected wait failure preserved after successful cleanup, got {other:?}"
387 ));
388 return;
389 }
390 };
391
392 assert_that!(termination_status.code()).is_none();
393
394 let WaitError::IoError { source, .. } = *wait_error;
395 assert_that!(source.to_string().as_str()).is_equal_to("synthetic wait failure");
396 }
397
398 #[cfg(any(unix, windows))]
399 #[tokio::test]
400 async fn wait_for_completion_or_terminate_terminates_after_timeout() {
401 let mut process = crate::Process::new(long_running_command(Duration::from_secs(5)))
402 .name("long-running")
403 .stdout_and_stderr(|stream| {
404 stream
405 .broadcast()
406 .best_effort_delivery()
407 .no_replay()
408 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
409 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
410 })
411 .spawn()
412 .unwrap();
413
414 let result = process
415 .wait_for_completion_or_terminate(wait_or_terminate_options(Duration::from_millis(10)))
416 .await
417 .unwrap();
418 let status = match result {
419 WaitForCompletionOrTerminateResult::TerminatedAfterTimeout { result, timeout } => {
420 assert_that!(timeout).is_equal_to(Duration::from_millis(10));
421 result
422 }
423 other @ WaitForCompletionOrTerminateResult::Completed(_) => {
424 assert_that!(&other).fail(format_args!(
425 "expected timeout-driven termination, got {other:?}"
426 ));
427 return;
428 }
429 };
430
431 assert_that!(status.success()).is_false();
432 assert_that!(process.is_drop_disarmed()).is_true();
433 }
434}