Skip to main content

tokio_process_tools/process_handle/output_collection/
mod.rs

1//! Terminal `wait_for_completion_with_*output*` methods that drain stdout/stderr alongside
2//! process exit.
3
4mod drain;
5pub(crate) mod options;
6pub(crate) mod output;
7#[cfg(test)]
8mod tests;
9
10use self::drain::{
11    wait_for_completion_or_terminate_with_collectors, wait_for_completion_with_collectors,
12};
13use self::output::ProcessOutput;
14use super::ProcessHandle;
15use crate::error::{
16    WaitForCompletionOrTerminateResult, WaitForCompletionResult, WaitWithOutputError,
17};
18use crate::output_stream::consumer::{Consumer, spawn_consumer_sync};
19use crate::output_stream::event::Chunk;
20use crate::output_stream::line::adapter::LineAdapter;
21use crate::output_stream::line::options::LineParsingOptions;
22use crate::output_stream::visitors::collect::{CollectChunks, CollectLineSink};
23use crate::output_stream::{Next, Subscription, TrySubscribable};
24use crate::process_handle::WaitForCompletionOrTerminateOptions;
25use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
26use crate::{CollectedBytes, CollectedLines, LineCollectionOptions, RawCollectionOptions};
27use std::borrow::Cow;
28
29/// Spawn a consumer that collects line output into a [`CollectedLines`] sink. Used by the
30/// generic `wait_for_completion_with_*output*` paths that take a `TrySubscribable` and can't
31/// call backend-specific methods.
32fn spawn_lines_into_vec_consumer<S>(
33    stream_name: &'static str,
34    subscription: S,
35    parsing_options: LineParsingOptions,
36    collection_options: LineCollectionOptions,
37) -> Consumer<CollectedLines>
38where
39    S: Subscription,
40{
41    spawn_consumer_sync(
42        stream_name,
43        subscription,
44        LineAdapter::new(
45            parsing_options,
46            CollectLineSink::new(
47                CollectedLines::new(),
48                move |line: Cow<'_, str>, sink: &mut CollectedLines| {
49                    sink.push_line(line.into_owned(), collection_options);
50                    Next::Continue
51                },
52            ),
53        ),
54    )
55}
56
57/// Spawn a consumer that collects raw byte output into a [`CollectedBytes`] sink.
58fn spawn_chunks_into_vec_consumer<S>(
59    stream_name: &'static str,
60    subscription: S,
61    options: RawCollectionOptions,
62) -> Consumer<CollectedBytes>
63where
64    S: Subscription,
65{
66    spawn_consumer_sync(
67        stream_name,
68        subscription,
69        CollectChunks::builder()
70            .sink(CollectedBytes::new())
71            .f(move |chunk: Chunk, sink: &mut CollectedBytes| {
72                sink.push_chunk(chunk.as_ref(), options);
73            })
74            .build(),
75    )
76}
77
78impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
79where
80    Stdout: TrySubscribable,
81    Stderr: TrySubscribable,
82{
83    /// Waits for the process to complete while collecting line output.
84    ///
85    /// Collectors are attached when this method is called. If the stream was configured with
86    /// `.no_replay()`, output produced before attachment may be discarded; configure replay before
87    /// spawning when startup output must be included.
88    /// Any still-open stdin handle is closed before the terminal wait begins, matching
89    /// [`tokio::process::Child::wait`].
90    /// `timeout` bounds both process completion and stdout/stderr collection.
91    ///
92    /// # Errors
93    ///
94    /// Returns [`WaitWithOutputError`] if waiting for the process or collecting output
95    /// fails after the process completes. Process timeout is returned as
96    /// [`WaitForCompletionResult::Timeout`].
97    pub async fn wait_for_completion_with_output(
98        &mut self,
99        timeout: std::time::Duration,
100        output_options: LineOutputOptions,
101    ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError> {
102        let LineOutputOptions {
103            line_parsing_options,
104            stdout_collection_options,
105            stderr_collection_options,
106        } = output_options;
107        let stdout = self.stdout();
108        let out_subscription = stdout.try_subscribe().map_err(|source| {
109            WaitWithOutputError::OutputCollectionStartFailed {
110                process_name: self.name.clone(),
111                source,
112            }
113        })?;
114        let out_collector = spawn_lines_into_vec_consumer(
115            stdout.name(),
116            out_subscription,
117            line_parsing_options,
118            stdout_collection_options,
119        );
120        let stderr = self.stderr();
121        let err_subscription = match stderr.try_subscribe() {
122            Ok(subscription) => subscription,
123            Err(source) => {
124                out_collector.abort().await;
125                return Err(WaitWithOutputError::OutputCollectionStartFailed {
126                    process_name: self.name.clone(),
127                    source,
128                });
129            }
130        };
131        let err_collector = spawn_lines_into_vec_consumer(
132            stderr.name(),
133            err_subscription,
134            line_parsing_options,
135            stderr_collection_options,
136        );
137
138        let result =
139            wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
140                .await?;
141
142        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
143            status,
144            stdout,
145            stderr,
146        }))
147    }
148
149    /// Waits for the process to complete while collecting raw byte output.
150    ///
151    /// Any still-open stdin handle is closed before the terminal wait begins, matching
152    /// [`tokio::process::Child::wait`].
153    /// `timeout` bounds both process completion and stdout/stderr collection.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`WaitWithOutputError`] if waiting for the process or collecting output
158    /// fails after the process completes. Process timeout is returned as
159    /// [`WaitForCompletionResult::Timeout`].
160    pub async fn wait_for_completion_with_raw_output(
161        &mut self,
162        timeout: std::time::Duration,
163        output_options: RawOutputOptions,
164    ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError> {
165        let RawOutputOptions {
166            stdout_collection_options,
167            stderr_collection_options,
168        } = output_options;
169        let stdout = self.stdout();
170        let out_subscription = stdout.try_subscribe().map_err(|source| {
171            WaitWithOutputError::OutputCollectionStartFailed {
172                process_name: self.name.clone(),
173                source,
174            }
175        })?;
176        let out_collector = spawn_chunks_into_vec_consumer(
177            stdout.name(),
178            out_subscription,
179            stdout_collection_options,
180        );
181        let stderr = self.stderr();
182        let err_subscription = match stderr.try_subscribe() {
183            Ok(subscription) => subscription,
184            Err(source) => {
185                out_collector.abort().await;
186                return Err(WaitWithOutputError::OutputCollectionStartFailed {
187                    process_name: self.name.clone(),
188                    source,
189                });
190            }
191        };
192        let err_collector = spawn_chunks_into_vec_consumer(
193            stderr.name(),
194            err_subscription,
195            stderr_collection_options,
196        );
197
198        let result =
199            wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
200                .await?;
201
202        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
203            status,
204            stdout,
205            stderr,
206        }))
207    }
208
209    /// Waits for completion within `wait_timeout`, terminating the process if needed, while
210    /// collecting line output.
211    ///
212    /// Any still-open stdin handle is closed before the initial terminal wait begins, matching
213    /// [`tokio::process::Child::wait`].
214    /// Output collection is bounded by
215    /// `wait_timeout + interrupt_timeout + terminate_timeout`, plus a fixed 3-second post-kill
216    /// confirmation wait when force-kill fallback is required.
217    ///
218    /// # Errors
219    ///
220    /// Returns [`WaitWithOutputError`] if waiting, termination, or output
221    /// collection fails. Timeout-triggered cleanup success is returned as
222    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`].
223    pub async fn wait_for_completion_with_output_or_terminate(
224        &mut self,
225        options: WaitForCompletionOrTerminateOptions,
226        output_options: LineOutputOptions,
227    ) -> Result<
228        WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
229        WaitWithOutputError,
230    > {
231        let LineOutputOptions {
232            line_parsing_options,
233            stdout_collection_options,
234            stderr_collection_options,
235        } = output_options;
236        let stdout = self.stdout();
237        let out_subscription = stdout.try_subscribe().map_err(|source| {
238            WaitWithOutputError::OutputCollectionStartFailed {
239                process_name: self.name.clone(),
240                source,
241            }
242        })?;
243        let out_collector = spawn_lines_into_vec_consumer(
244            stdout.name(),
245            out_subscription,
246            line_parsing_options,
247            stdout_collection_options,
248        );
249        let stderr = self.stderr();
250        let err_subscription = match stderr.try_subscribe() {
251            Ok(subscription) => subscription,
252            Err(source) => {
253                out_collector.abort().await;
254                return Err(WaitWithOutputError::OutputCollectionStartFailed {
255                    process_name: self.name.clone(),
256                    source,
257                });
258            }
259        };
260        let err_collector = spawn_lines_into_vec_consumer(
261            stderr.name(),
262            err_subscription,
263            line_parsing_options,
264            stderr_collection_options,
265        );
266
267        let result = wait_for_completion_or_terminate_with_collectors(
268            self,
269            options.wait_timeout,
270            options.interrupt_timeout,
271            options.terminate_timeout,
272            out_collector,
273            err_collector,
274        )
275        .await?;
276
277        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
278            status,
279            stdout,
280            stderr,
281        }))
282    }
283
284    /// Waits for completion within `wait_timeout`, terminating the process if needed, while
285    /// collecting raw byte output.
286    ///
287    /// Any still-open stdin handle is closed before the initial terminal wait begins, matching
288    /// [`tokio::process::Child::wait`].
289    /// Output collection is bounded by
290    /// `wait_timeout + interrupt_timeout + terminate_timeout`, plus a fixed 3-second post-kill
291    /// confirmation wait when force-kill fallback is required.
292    ///
293    /// # Errors
294    ///
295    /// Returns [`WaitWithOutputError`] if waiting, termination, or output
296    /// collection fails. Timeout-triggered cleanup success is returned as
297    /// [`WaitForCompletionOrTerminateResult::TerminatedAfterTimeout`].
298    pub async fn wait_for_completion_with_raw_output_or_terminate(
299        &mut self,
300        options: WaitForCompletionOrTerminateOptions,
301        output_options: RawOutputOptions,
302    ) -> Result<
303        WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
304        WaitWithOutputError,
305    > {
306        let RawOutputOptions {
307            stdout_collection_options,
308            stderr_collection_options,
309        } = output_options;
310        let stdout = self.stdout();
311        let out_subscription = stdout.try_subscribe().map_err(|source| {
312            WaitWithOutputError::OutputCollectionStartFailed {
313                process_name: self.name.clone(),
314                source,
315            }
316        })?;
317        let out_collector = spawn_chunks_into_vec_consumer(
318            stdout.name(),
319            out_subscription,
320            stdout_collection_options,
321        );
322        let stderr = self.stderr();
323        let err_subscription = match stderr.try_subscribe() {
324            Ok(subscription) => subscription,
325            Err(source) => {
326                out_collector.abort().await;
327                return Err(WaitWithOutputError::OutputCollectionStartFailed {
328                    process_name: self.name.clone(),
329                    source,
330                });
331            }
332        };
333        let err_collector = spawn_chunks_into_vec_consumer(
334            stderr.name(),
335            err_subscription,
336            stderr_collection_options,
337        );
338
339        let result = wait_for_completion_or_terminate_with_collectors(
340            self,
341            options.wait_timeout,
342            options.interrupt_timeout,
343            options.terminate_timeout,
344            out_collector,
345            err_collector,
346        )
347        .await?;
348
349        Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
350            status,
351            stdout,
352            stderr,
353        }))
354    }
355}