Skip to main content

tokio_process_tools/process_handle/output_collection/
mod.rs

1//! Helpers used by the staged [`crate::WaitForCompletion`] builder to drain stdout/stderr
2//! alongside process exit. The builder lives in `process_handle::wait_builder`; this module
3//! owns the supporting collector spawn and the timed drain orchestration.
4
5pub(crate) mod drain;
6pub(crate) mod options;
7
8use super::ProcessHandle;
9use crate::error::WaitWithOutputError;
10use crate::output_stream::consumer::{Consumer, Sink, spawn_consumer_sync};
11use crate::output_stream::event::Chunk;
12use crate::output_stream::line::adapter::ParseLines;
13use crate::output_stream::line::options::LineParsingOptions;
14use crate::output_stream::visitors::collect::CollectChunks;
15use crate::output_stream::{Next, Subscription};
16use crate::{
17    CollectedBytes, CollectedLines, LineCollectionOptions, OutputStream, RawCollectionOptions,
18    Subscribable,
19};
20use std::borrow::Cow;
21use std::process::ExitStatus;
22
23/// Full output of a process that terminated.
24///
25/// `Stdout` and `Stderr` describe the collected payload type for each stream. For example,
26/// line collection uses `ProcessOutput<CollectedLines>` and raw byte collection uses
27/// `ProcessOutput<CollectedBytes>`.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct ProcessOutput<Stdout, Stderr = Stdout> {
30    /// Status the process exited with.
31    pub status: ExitStatus,
32
33    /// The process's collected output on its `stdout` stream.
34    pub stdout: Stdout,
35
36    /// The process's collected output on its `stderr` stream.
37    pub stderr: Stderr,
38}
39
40/// Spawn a consumer that collects line output into a [`CollectedLines`] sink. Used by the
41/// staged builder paths that take a `Subscribable` and can't call backend-specific methods.
42pub(crate) fn spawn_line_collector<S>(
43    stream_name: &'static str,
44    subscription: S,
45    parsing_options: LineParsingOptions,
46    collection_options: LineCollectionOptions,
47) -> Consumer<CollectedLines>
48where
49    S: Subscription,
50{
51    spawn_consumer_sync(
52        stream_name,
53        subscription,
54        ParseLines::collect(
55            parsing_options,
56            CollectedLines::new(),
57            move |line: Cow<'_, str>, sink: &mut CollectedLines| {
58                sink.push_line(line.into_owned(), collection_options);
59                Next::Continue
60            },
61        ),
62    )
63}
64
65/// Spawn a consumer that collects raw byte output into a [`CollectedBytes`] sink.
66pub(crate) fn spawn_chunk_collector<S>(
67    stream_name: &'static str,
68    subscription: S,
69    options: RawCollectionOptions,
70) -> Consumer<CollectedBytes>
71where
72    S: Subscription,
73{
74    spawn_consumer_sync(
75        stream_name,
76        subscription,
77        CollectChunks::builder()
78            .sink(CollectedBytes::new())
79            .f(move |chunk: Chunk, sink: &mut CollectedBytes| {
80                sink.push_chunk(chunk.as_ref(), options);
81            })
82            .build(),
83    )
84}
85
86impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
87where
88    Stdout: OutputStream + Subscribable,
89    Stderr: OutputStream + Subscribable,
90{
91    /// Subscribes to stdout and stderr, spawning a collector on each via the caller-supplied
92    /// factories. If the stderr subscription fails after the stdout collector has started, the
93    /// stdout collector is aborted (awaiting task join) before returning, so the stdout slot is
94    /// released by the time the error reaches the caller. `Drop` alone is not sufficient here:
95    /// for `SingleSubscriberOutputStream`, the consumer claim is only fully released after the
96    /// task has been joined.
97    pub(crate) async fn try_spawn_output_collectors<StdoutSink, StderrSink, FOut, FErr>(
98        &mut self,
99        spawn_stdout: FOut,
100        spawn_stderr: FErr,
101    ) -> Result<(Consumer<StdoutSink>, Consumer<StderrSink>), WaitWithOutputError>
102    where
103        StdoutSink: Sink,
104        StderrSink: Sink,
105        FOut: FnOnce(&'static str, Stdout::Subscription) -> Consumer<StdoutSink>,
106        FErr: FnOnce(&'static str, Stderr::Subscription) -> Consumer<StderrSink>,
107    {
108        let out_subscription = self.stdout().try_subscribe().map_err(|source| {
109            WaitWithOutputError::OutputCollectionStartFailed {
110                process_name: self.name.clone(),
111                source: Box::new(source),
112            }
113        })?;
114        let out_collector = spawn_stdout(self.stdout().name(), out_subscription);
115
116        let err_subscription = match self.stderr().try_subscribe() {
117            Ok(subscription) => subscription,
118            Err(source) => {
119                out_collector.abort().await;
120                return Err(WaitWithOutputError::OutputCollectionStartFailed {
121                    process_name: self.name.clone(),
122                    source: Box::new(source),
123                });
124            }
125        };
126        let err_collector = spawn_stderr(self.stderr().name(), err_subscription);
127
128        Ok((out_collector, err_collector))
129    }
130}