Skip to main content

tokio_process_tools/process_handle/output_collection/
mod.rs

1//! Terminal `wait_for_completion_with_*output*` methods that drain stdout/stderr alongside
2//! process exit.
3
4mod drain;
5pub(crate) mod options;
6pub(crate) mod output;
7#[cfg(test)]
8mod tests;
9
10#[cfg(any(unix, windows))]
11use self::drain::wait_for_completion_or_terminate_with_collectors;
12use self::drain::wait_for_completion_with_collectors;
13use self::output::ProcessOutput;
14use super::ProcessHandle;
15#[cfg(any(unix, windows))]
16use crate::error::WaitForCompletionOrTerminateResult;
17use crate::error::{WaitForCompletionResult, WaitWithOutputError};
18use crate::output_stream::consumer::{Consumer, spawn_consumer_sync};
19use crate::output_stream::event::Chunk;
20use crate::output_stream::line::adapter::LineAdapter;
21use crate::output_stream::line::options::LineParsingOptions;
22use crate::output_stream::visitors::collect::{CollectChunks, CollectLineSink};
23use crate::output_stream::{Next, Subscription, TrySubscribable};
24#[cfg(any(unix, windows))]
25use crate::process_handle::WaitForCompletionOrTerminateOptions;
26use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
27use crate::{CollectedBytes, CollectedLines, LineCollectionOptions, RawCollectionOptions};
28use std::borrow::Cow;
29
30/// Spawn a consumer that collects line output into a [`CollectedLines`] sink. Used by the
31/// generic `wait_for_completion_with_*output*` paths that take a `TrySubscribable` and can't
32/// call backend-specific methods.
33fn spawn_lines_into_vec_consumer<S>(
34    stream_name: &'static str,
35    subscription: S,
36    parsing_options: LineParsingOptions,
37    collection_options: LineCollectionOptions,
38) -> Consumer<CollectedLines>
39where
40    S: Subscription,
41{
42    spawn_consumer_sync(
43        stream_name,
44        subscription,
45        LineAdapter::new(
46            parsing_options,
47            CollectLineSink::new(
48                CollectedLines::new(),
49                move |line: Cow<'_, str>, sink: &mut CollectedLines| {
50                    sink.push_line(line.into_owned(), collection_options);
51                    Next::Continue
52                },
53            ),
54        ),
55    )
56}
57
58/// Spawn a consumer that collects raw byte output into a [`CollectedBytes`] sink.
59fn spawn_chunks_into_vec_consumer<S>(
60    stream_name: &'static str,
61    subscription: S,
62    options: RawCollectionOptions,
63) -> Consumer<CollectedBytes>
64where
65    S: Subscription,
66{
67    spawn_consumer_sync(
68        stream_name,
69        subscription,
70        CollectChunks::builder()
71            .sink(CollectedBytes::new())
72            .f(move |chunk: Chunk, sink: &mut CollectedBytes| {
73                sink.push_chunk(chunk.as_ref(), options);
74            })
75            .build(),
76    )
77}
78
79impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
80where
81    Stdout: TrySubscribable,
82    Stderr: TrySubscribable,
83{
84    /// Waits for the process to complete while collecting line output.
85    ///
86    /// Collectors are attached when this method is called. If the stream was configured with
87    /// `.no_replay()`, output produced before attachment may be discarded; configure replay before
88    /// spawning when startup output must be included.
89    /// Any still-open stdin handle is closed before the terminal wait begins, matching
90    /// [`tokio::process::Child::wait`].
91    /// `timeout` bounds both process completion and stdout/stderr collection.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`WaitWithOutputError`] if waiting for the process or collecting output
96    /// fails after the process completes. Process timeout is returned as
97    /// [`WaitForCompletionResult::Timeout`].
98    pub async fn wait_for_completion_with_output(
99        &mut self,
100        timeout: std::time::Duration,
101        output_options: LineOutputOptions,
102    ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError> {
103        let LineOutputOptions {
104            line_parsing_options,
105            stdout_collection_options,
106            stderr_collection_options,
107        } = output_options;
108        let stdout = self.stdout();
109        let out_subscription = stdout.try_subscribe().map_err(|source| {
110            WaitWithOutputError::OutputCollectionStartFailed {
111                process_name: self.name.clone(),
112                source,
113            }
114        })?;
115        let out_collector = spawn_lines_into_vec_consumer(
116            stdout.name(),
117            out_subscription,
118            line_parsing_options,
119            stdout_collection_options,
120        );
121        let stderr = self.stderr();
122        let err_subscription = match stderr.try_subscribe() {
123            Ok(subscription) => subscription,
124            Err(source) => {
125                out_collector.abort().await;
126                return Err(WaitWithOutputError::OutputCollectionStartFailed {
127                    process_name: self.name.clone(),
128                    source,
129                });
130            }
131        };
132        let err_collector = spawn_lines_into_vec_consumer(
133            stderr.name(),
134            err_subscription,
135            line_parsing_options,
136            stderr_collection_options,
137        );
138
139        let result =
140            wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
141                .await?;
142
143        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
144            status,
145            stdout,
146            stderr,
147        }))
148    }
149
150    /// Waits for the process to complete while collecting raw byte output.
151    ///
152    /// Any still-open stdin handle is closed before the terminal wait begins, matching
153    /// [`tokio::process::Child::wait`].
154    /// `timeout` bounds both process completion and stdout/stderr collection.
155    ///
156    /// # Errors
157    ///
158    /// Returns [`WaitWithOutputError`] if waiting for the process or collecting output
159    /// fails after the process completes. Process timeout is returned as
160    /// [`WaitForCompletionResult::Timeout`].
161    pub async fn wait_for_completion_with_raw_output(
162        &mut self,
163        timeout: std::time::Duration,
164        output_options: RawOutputOptions,
165    ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError> {
166        let RawOutputOptions {
167            stdout_collection_options,
168            stderr_collection_options,
169        } = output_options;
170        let stdout = self.stdout();
171        let out_subscription = stdout.try_subscribe().map_err(|source| {
172            WaitWithOutputError::OutputCollectionStartFailed {
173                process_name: self.name.clone(),
174                source,
175            }
176        })?;
177        let out_collector = spawn_chunks_into_vec_consumer(
178            stdout.name(),
179            out_subscription,
180            stdout_collection_options,
181        );
182        let stderr = self.stderr();
183        let err_subscription = match stderr.try_subscribe() {
184            Ok(subscription) => subscription,
185            Err(source) => {
186                out_collector.abort().await;
187                return Err(WaitWithOutputError::OutputCollectionStartFailed {
188                    process_name: self.name.clone(),
189                    source,
190                });
191            }
192        };
193        let err_collector = spawn_chunks_into_vec_consumer(
194            stderr.name(),
195            err_subscription,
196            stderr_collection_options,
197        );
198
199        let result =
200            wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
201                .await?;
202
203        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
204            status,
205            stdout,
206            stderr,
207        }))
208    }
209
210    /// Waits for completion within `wait_timeout`, terminating the process if needed, while
211    /// collecting line output.
212    ///
213    /// Any still-open stdin handle is closed before the initial terminal wait begins, matching
214    /// [`tokio::process::Child::wait`].
215    /// Output collection is bounded by `wait_timeout` plus the per-platform graceful budget
216    /// (`interrupt_timeout + terminate_timeout` on Unix, `graceful_timeout` on Windows), plus a
217    /// fixed 3-second post-kill confirmation wait when force-kill fallback is required.
218    ///
219    /// # Errors
220    ///
221    /// Returns [`WaitWithOutputError`] if waiting, termination, or output
222    /// collection fails. Timeout-triggered cleanup success is returned as
223    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`].
224    #[cfg(any(unix, windows))]
225    pub async fn wait_for_completion_with_output_or_terminate(
226        &mut self,
227        options: WaitForCompletionOrTerminateOptions,
228        output_options: LineOutputOptions,
229    ) -> Result<
230        WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
231        WaitWithOutputError,
232    > {
233        let LineOutputOptions {
234            line_parsing_options,
235            stdout_collection_options,
236            stderr_collection_options,
237        } = output_options;
238        let stdout = self.stdout();
239        let out_subscription = stdout.try_subscribe().map_err(|source| {
240            WaitWithOutputError::OutputCollectionStartFailed {
241                process_name: self.name.clone(),
242                source,
243            }
244        })?;
245        let out_collector = spawn_lines_into_vec_consumer(
246            stdout.name(),
247            out_subscription,
248            line_parsing_options,
249            stdout_collection_options,
250        );
251        let stderr = self.stderr();
252        let err_subscription = match stderr.try_subscribe() {
253            Ok(subscription) => subscription,
254            Err(source) => {
255                out_collector.abort().await;
256                return Err(WaitWithOutputError::OutputCollectionStartFailed {
257                    process_name: self.name.clone(),
258                    source,
259                });
260            }
261        };
262        let err_collector = spawn_lines_into_vec_consumer(
263            stderr.name(),
264            err_subscription,
265            line_parsing_options,
266            stderr_collection_options,
267        );
268
269        let result = wait_for_completion_or_terminate_with_collectors(
270            self,
271            options.wait_timeout,
272            options.graceful_timeouts,
273            out_collector,
274            err_collector,
275        )
276        .await?;
277
278        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
279            status,
280            stdout,
281            stderr,
282        }))
283    }
284
285    /// Waits for completion within `wait_timeout`, terminating the process if needed, while
286    /// collecting raw byte output.
287    ///
288    /// Any still-open stdin handle is closed before the initial terminal wait begins, matching
289    /// [`tokio::process::Child::wait`].
290    /// Output collection is bounded by `wait_timeout` plus the per-platform graceful budget
291    /// (`interrupt_timeout + terminate_timeout` on Unix, `graceful_timeout` on Windows), plus a
292    /// fixed 3-second post-kill confirmation wait when force-kill fallback is required.
293    ///
294    /// # Errors
295    ///
296    /// Returns [`WaitWithOutputError`] if waiting, termination, or output
297    /// collection fails. Timeout-triggered cleanup success is returned as
298    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`].
299    #[cfg(any(unix, windows))]
300    pub async fn wait_for_completion_with_raw_output_or_terminate(
301        &mut self,
302        options: WaitForCompletionOrTerminateOptions,
303        output_options: RawOutputOptions,
304    ) -> Result<
305        WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
306        WaitWithOutputError,
307    > {
308        let RawOutputOptions {
309            stdout_collection_options,
310            stderr_collection_options,
311        } = output_options;
312        let stdout = self.stdout();
313        let out_subscription = stdout.try_subscribe().map_err(|source| {
314            WaitWithOutputError::OutputCollectionStartFailed {
315                process_name: self.name.clone(),
316                source,
317            }
318        })?;
319        let out_collector = spawn_chunks_into_vec_consumer(
320            stdout.name(),
321            out_subscription,
322            stdout_collection_options,
323        );
324        let stderr = self.stderr();
325        let err_subscription = match stderr.try_subscribe() {
326            Ok(subscription) => subscription,
327            Err(source) => {
328                out_collector.abort().await;
329                return Err(WaitWithOutputError::OutputCollectionStartFailed {
330                    process_name: self.name.clone(),
331                    source,
332                });
333            }
334        };
335        let err_collector = spawn_chunks_into_vec_consumer(
336            stderr.name(),
337            err_subscription,
338            stderr_collection_options,
339        );
340
341        let result = wait_for_completion_or_terminate_with_collectors(
342            self,
343            options.wait_timeout,
344            options.graceful_timeouts,
345            out_collector,
346            err_collector,
347        )
348        .await?;
349
350        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
351            status,
352            stdout,
353            stderr,
354        }))
355    }
356}