Skip to main content

tokio_process_tools/process_handle/
wait_builder.rs

1//! Staged builder fusing plain wait, output collection, and graceful termination on a single
2//! entry point: [`ProcessHandle::wait_for_completion`].
3//!
4//! The builder is configured by chaining `.with_line_output(...)` or `.with_raw_output(...)`
5//! and / or `.or_terminate(...)` (in that order, output before terminate) and is run by
6//! `.await`-ing it. The matrix of six combinations (no / line / raw output, plain wait /
7//! wait-or-terminate) is encoded in the type system so each chain compiles to exactly one
8//! terminal action.
9//!
10//! ```ignore
11//! // no output, no terminate
12//! handle.wait_for_completion(timeout).await?;
13//!
14//! // line output
15//! handle.wait_for_completion(timeout)
16//!     .with_line_output(eof_timeout, parsing_options, line_options)
17//!     .await?;
18//!
19//! // line output + graceful termination on timeout
20//! handle.wait_for_completion(timeout)
21//!     .with_line_output(eof_timeout, parsing_options, line_options)
22//!     .or_terminate(shutdown)
23//!     .await?;
24//! ```
25
26use std::future::{Future, IntoFuture};
27use std::pin::Pin;
28use std::time::Duration;
29
30use crate::error::{WaitError, WaitForCompletionResult, WaitWithOutputError};
31#[cfg(any(unix, windows))]
32use crate::error::{WaitForCompletionOrTerminateResult, WaitOrTerminateError};
33use crate::output_stream::line::options::LineParsingOptions;
34use crate::output_stream::{OutputStream, Subscribable};
35use crate::process_handle::ProcessHandle;
36#[cfg(any(unix, windows))]
37use crate::process_handle::output_collection::drain::wait_for_completion_or_terminate_with_collectors;
38use crate::process_handle::output_collection::drain::wait_for_completion_with_collectors;
39use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
40use crate::process_handle::output_collection::{
41    ProcessOutput, spawn_chunk_collector, spawn_line_collector,
42};
43#[cfg(any(unix, windows))]
44use crate::process_handle::termination::GracefulShutdown;
45use crate::{CollectedBytes, CollectedLines};
46
47/// Type-state markers for [`WaitForCompletion`].
48///
49/// The two state axes (`Output` and `Terminate`) gate which builder methods compile and which
50/// `IntoFuture` impl the chain resolves to. The carried data on the populated states (eof
51/// timeout, options, shutdown) is internal; users construct these states via
52/// [`WaitForCompletion::with_line_output`], [`WaitForCompletion::with_raw_output`], and
53/// [`WaitForCompletion::or_terminate`].
54pub mod state {
55    #[cfg(any(unix, windows))]
56    use super::GracefulShutdown;
57    use super::{Duration, LineOutputOptions, LineParsingOptions, RawOutputOptions};
58
59    /// No output collection has been configured.
60    #[derive(Debug)]
61    pub struct NoOutput;
62
63    /// Line-mode output collection has been configured.
64    #[derive(Debug)]
65    pub struct LineOutput {
66        pub(super) eof_timeout: Duration,
67        pub(super) line_parsing_options: LineParsingOptions,
68        pub(super) options: LineOutputOptions,
69    }
70
71    /// Raw byte output collection has been configured.
72    #[derive(Debug)]
73    pub struct RawOutput {
74        pub(super) eof_timeout: Duration,
75        pub(super) options: RawOutputOptions,
76    }
77
78    /// No graceful termination on timeout.
79    #[derive(Debug)]
80    pub struct NoTerminate;
81
82    /// Graceful termination on timeout has been configured.
83    #[cfg(any(unix, windows))]
84    #[derive(Debug)]
85    pub struct WithTerminate {
86        pub(super) shutdown: GracefulShutdown,
87    }
88}
89
90#[cfg(any(unix, windows))]
91use state::WithTerminate;
92use state::{LineOutput, NoOutput, NoTerminate, RawOutput};
93
94/// Staged builder returned by [`ProcessHandle::wait_for_completion`].
95///
96/// Compose one wait, optionally one output mode, and optionally graceful termination on
97/// timeout, then `.await` to run it. See the module-level docs for examples.
98///
99/// Stdin is closed before the wait begins (matching [`tokio::process::Child::wait`]) on
100/// every variant. If the wait times out without `.or_terminate(...)`, the process keeps
101/// running; with `.or_terminate(...)`, cleanup is forced through
102/// [`ProcessHandle::terminate`].
103#[must_use = "calling `wait_for_completion(...)` only configures the wait. \
104              `.await` the builder (or chain `.with_*_output(...)` / `.or_terminate(...)` first) \
105              to actually run it."]
106pub struct WaitForCompletion<'a, Stdout, Stderr, Output, Terminate>
107where
108    Stdout: OutputStream,
109    Stderr: OutputStream,
110{
111    handle: &'a mut ProcessHandle<Stdout, Stderr>,
112    timeout: Duration,
113    output: Output,
114    terminate: Terminate,
115}
116
117impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
118where
119    Stdout: OutputStream,
120    Stderr: OutputStream,
121{
122    /// Begin a staged wait for the process to run to completion within `timeout`.
123    ///
124    /// `.await` the returned builder to wait without collecting output. Chain
125    /// `.with_line_output(...)` or `.with_raw_output(...)` to also drain stdout / stderr into a
126    /// [`ProcessOutput`], and / or `.or_terminate(...)` to force graceful cleanup if the wait
127    /// times out. Output collection must be configured before termination.
128    ///
129    /// Any still-open stdin handle is closed before the terminal wait begins, matching
130    /// [`tokio::process::Child::wait`] and helping avoid deadlocks where the child is waiting
131    /// for input while the parent is waiting for exit.
132    pub fn wait_for_completion(
133        &mut self,
134        timeout: Duration,
135    ) -> WaitForCompletion<'_, Stdout, Stderr, NoOutput, NoTerminate> {
136        WaitForCompletion {
137            handle: self,
138            timeout,
139            output: NoOutput,
140            terminate: NoTerminate,
141        }
142    }
143}
144
145impl<'a, Stdout, Stderr> WaitForCompletion<'a, Stdout, Stderr, NoOutput, NoTerminate>
146where
147    Stdout: OutputStream,
148    Stderr: OutputStream,
149{
150    /// Configure line-mode output collection.
151    ///
152    /// Collectors are attached when this method is called. If the stream was configured with
153    /// `.no_replay()`, output produced before attachment may be discarded; configure replay
154    /// before spawning when startup output must be included.
155    ///
156    /// `eof_timeout` bounds the additional post-exit wait for stdout / stderr consumers to
157    /// observe EOF after process completion (or, when paired with `.or_terminate(...)`, after
158    /// cleanup termination). It is a single budget shared by stdout and stderr: when one
159    /// stream finishes early, the surviving stream's drain is still bounded by the original
160    /// `eof_timeout` measured from process exit, not restarted from the first stream's EOF.
161    /// A stream still producing output once the budget is exhausted is aborted and
162    /// [`WaitWithOutputError::OutputCollectionTimeout`] is returned.
163    pub fn with_line_output(
164        self,
165        eof_timeout: Duration,
166        line_parsing_options: LineParsingOptions,
167        options: LineOutputOptions,
168    ) -> WaitForCompletion<'a, Stdout, Stderr, LineOutput, NoTerminate> {
169        WaitForCompletion {
170            handle: self.handle,
171            timeout: self.timeout,
172            output: LineOutput {
173                eof_timeout,
174                line_parsing_options,
175                options,
176            },
177            terminate: NoTerminate,
178        }
179    }
180
181    /// Configure raw byte output collection.
182    ///
183    /// Use this when the child's output is not UTF-8 line-oriented (binary blobs, framed
184    /// protocols, anything where line parsing would corrupt bytes). `eof_timeout` behaves the
185    /// same as in [`Self::with_line_output`].
186    pub fn with_raw_output(
187        self,
188        eof_timeout: Duration,
189        options: RawOutputOptions,
190    ) -> WaitForCompletion<'a, Stdout, Stderr, RawOutput, NoTerminate> {
191        WaitForCompletion {
192            handle: self.handle,
193            timeout: self.timeout,
194            output: RawOutput {
195                eof_timeout,
196                options,
197            },
198            terminate: NoTerminate,
199        }
200    }
201}
202
203#[cfg(any(unix, windows))]
204impl<'a, Stdout, Stderr, Output> WaitForCompletion<'a, Stdout, Stderr, Output, NoTerminate>
205where
206    Stdout: OutputStream,
207    Stderr: OutputStream,
208{
209    /// Force graceful termination if the wait times out.
210    ///
211    /// On a wait timeout (or a non-timeout wait failure), [`ProcessHandle::terminate`] runs
212    /// the supplied `shutdown` sequence and the result is reported as
213    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`] on success, or as a
214    /// [`WaitOrTerminateError`] / [`WaitWithOutputError`] when termination itself fails.
215    ///
216    /// Total wall-clock time can exceed the wait timeout plus the per-platform graceful
217    /// budget carried by `shutdown` (the sum of every Unix phase timeout, or `timeout` on
218    /// Windows) by one additional fixed 3-second wait when the force-kill fallback is
219    /// required.
220    pub fn or_terminate(
221        self,
222        shutdown: GracefulShutdown,
223    ) -> WaitForCompletion<'a, Stdout, Stderr, Output, WithTerminate> {
224        WaitForCompletion {
225            handle: self.handle,
226            timeout: self.timeout,
227            output: self.output,
228            terminate: WithTerminate { shutdown },
229        }
230    }
231}
232
233type BoxFut<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
234
235impl<'a, Stdout, Stderr> IntoFuture for WaitForCompletion<'a, Stdout, Stderr, NoOutput, NoTerminate>
236where
237    Stdout: OutputStream + Send + 'a,
238    Stderr: OutputStream + Send + 'a,
239{
240    type Output = Result<WaitForCompletionResult, WaitError>;
241    type IntoFuture = BoxFut<'a, Self::Output>;
242
243    fn into_future(self) -> Self::IntoFuture {
244        Box::pin(async move { self.handle.wait_for_completion_inner(self.timeout).await })
245    }
246}
247
248#[cfg(any(unix, windows))]
249impl<'a, Stdout, Stderr> IntoFuture
250    for WaitForCompletion<'a, Stdout, Stderr, NoOutput, WithTerminate>
251where
252    Stdout: OutputStream + Send + 'a,
253    Stderr: OutputStream + Send + 'a,
254{
255    type Output = Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError>;
256    type IntoFuture = BoxFut<'a, Self::Output>;
257
258    fn into_future(self) -> Self::IntoFuture {
259        Box::pin(async move {
260            self.handle
261                .wait_for_completion_or_terminate_inner(self.timeout, self.terminate.shutdown)
262                .await
263        })
264    }
265}
266
267impl<'a, Stdout, Stderr> IntoFuture
268    for WaitForCompletion<'a, Stdout, Stderr, LineOutput, NoTerminate>
269where
270    Stdout: OutputStream + Subscribable + Send + 'a,
271    Stderr: OutputStream + Subscribable + Send + 'a,
272{
273    type Output =
274        Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError>;
275    type IntoFuture = BoxFut<'a, Self::Output>;
276
277    fn into_future(self) -> Self::IntoFuture {
278        Box::pin(async move {
279            let LineOutputOptions {
280                stdout_collection_options,
281                stderr_collection_options,
282            } = self.output.options;
283            let line_parsing_options = self.output.line_parsing_options;
284
285            let (out_collector, err_collector) = self
286                .handle
287                .try_spawn_output_collectors(
288                    |name, sub| {
289                        spawn_line_collector(
290                            name,
291                            sub,
292                            line_parsing_options,
293                            stdout_collection_options,
294                        )
295                    },
296                    |name, sub| {
297                        spawn_line_collector(
298                            name,
299                            sub,
300                            line_parsing_options,
301                            stderr_collection_options,
302                        )
303                    },
304                )
305                .await?;
306
307            wait_for_completion_with_collectors(
308                self.handle,
309                self.timeout,
310                self.output.eof_timeout,
311                out_collector,
312                err_collector,
313            )
314            .await
315        })
316    }
317}
318
319impl<'a, Stdout, Stderr> IntoFuture
320    for WaitForCompletion<'a, Stdout, Stderr, RawOutput, NoTerminate>
321where
322    Stdout: OutputStream + Subscribable + Send + 'a,
323    Stderr: OutputStream + Subscribable + Send + 'a,
324{
325    type Output =
326        Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError>;
327    type IntoFuture = BoxFut<'a, Self::Output>;
328
329    fn into_future(self) -> Self::IntoFuture {
330        Box::pin(async move {
331            let RawOutputOptions {
332                stdout_collection_options,
333                stderr_collection_options,
334            } = self.output.options;
335
336            let (out_collector, err_collector) = self
337                .handle
338                .try_spawn_output_collectors(
339                    |name, sub| spawn_chunk_collector(name, sub, stdout_collection_options),
340                    |name, sub| spawn_chunk_collector(name, sub, stderr_collection_options),
341                )
342                .await?;
343
344            wait_for_completion_with_collectors(
345                self.handle,
346                self.timeout,
347                self.output.eof_timeout,
348                out_collector,
349                err_collector,
350            )
351            .await
352        })
353    }
354}
355
356#[cfg(any(unix, windows))]
357impl<'a, Stdout, Stderr> IntoFuture
358    for WaitForCompletion<'a, Stdout, Stderr, LineOutput, WithTerminate>
359where
360    Stdout: OutputStream + Subscribable + Send + 'a,
361    Stderr: OutputStream + Subscribable + Send + 'a,
362{
363    type Output = Result<
364        WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
365        WaitWithOutputError,
366    >;
367    type IntoFuture = BoxFut<'a, Self::Output>;
368
369    fn into_future(self) -> Self::IntoFuture {
370        Box::pin(async move {
371            let LineOutputOptions {
372                stdout_collection_options,
373                stderr_collection_options,
374            } = self.output.options;
375            let line_parsing_options = self.output.line_parsing_options;
376
377            let (out_collector, err_collector) = self
378                .handle
379                .try_spawn_output_collectors(
380                    |name, sub| {
381                        spawn_line_collector(
382                            name,
383                            sub,
384                            line_parsing_options,
385                            stdout_collection_options,
386                        )
387                    },
388                    |name, sub| {
389                        spawn_line_collector(
390                            name,
391                            sub,
392                            line_parsing_options,
393                            stderr_collection_options,
394                        )
395                    },
396                )
397                .await?;
398
399            wait_for_completion_or_terminate_with_collectors(
400                self.handle,
401                self.timeout,
402                self.terminate.shutdown,
403                self.output.eof_timeout,
404                out_collector,
405                err_collector,
406            )
407            .await
408        })
409    }
410}
411
412#[cfg(any(unix, windows))]
413impl<'a, Stdout, Stderr> IntoFuture
414    for WaitForCompletion<'a, Stdout, Stderr, RawOutput, WithTerminate>
415where
416    Stdout: OutputStream + Subscribable + Send + 'a,
417    Stderr: OutputStream + Subscribable + Send + 'a,
418{
419    type Output = Result<
420        WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
421        WaitWithOutputError,
422    >;
423    type IntoFuture = BoxFut<'a, Self::Output>;
424
425    fn into_future(self) -> Self::IntoFuture {
426        Box::pin(async move {
427            let RawOutputOptions {
428                stdout_collection_options,
429                stderr_collection_options,
430            } = self.output.options;
431
432            let (out_collector, err_collector) = self
433                .handle
434                .try_spawn_output_collectors(
435                    |name, sub| spawn_chunk_collector(name, sub, stdout_collection_options),
436                    |name, sub| spawn_chunk_collector(name, sub, stderr_collection_options),
437                )
438                .await?;
439
440            wait_for_completion_or_terminate_with_collectors(
441                self.handle,
442                self.timeout,
443                self.terminate.shutdown,
444                self.output.eof_timeout,
445                out_collector,
446                err_collector,
447            )
448            .await
449        })
450    }
451}