tokio_process_tools/process_handle/output_collection/
mod.rs1pub(crate) mod drain;
6pub(crate) mod options;
7
8use super::ProcessHandle;
9use crate::error::WaitWithOutputError;
10use crate::output_stream::consumer::{Consumer, Sink, spawn_consumer_sync};
11use crate::output_stream::event::Chunk;
12use crate::output_stream::line::adapter::ParseLines;
13use crate::output_stream::line::options::LineParsingOptions;
14use crate::output_stream::visitors::collect::CollectChunks;
15use crate::output_stream::{Next, Subscription};
16use crate::{
17 CollectedBytes, CollectedLines, LineCollectionOptions, OutputStream, RawCollectionOptions,
18 Subscribable,
19};
20use std::borrow::Cow;
21use std::process::ExitStatus;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct ProcessOutput<Stdout, Stderr = Stdout> {
30 pub status: ExitStatus,
32
33 pub stdout: Stdout,
35
36 pub stderr: Stderr,
38}
39
40pub(crate) fn spawn_line_collector<S>(
43 stream_name: &'static str,
44 subscription: S,
45 parsing_options: LineParsingOptions,
46 collection_options: LineCollectionOptions,
47) -> Consumer<CollectedLines>
48where
49 S: Subscription,
50{
51 spawn_consumer_sync(
52 stream_name,
53 subscription,
54 ParseLines::collect(
55 parsing_options,
56 CollectedLines::new(),
57 move |line: Cow<'_, str>, sink: &mut CollectedLines| {
58 sink.push_line(line.into_owned(), collection_options);
59 Next::Continue
60 },
61 ),
62 )
63}
64
65pub(crate) fn spawn_chunk_collector<S>(
67 stream_name: &'static str,
68 subscription: S,
69 options: RawCollectionOptions,
70) -> Consumer<CollectedBytes>
71where
72 S: Subscription,
73{
74 spawn_consumer_sync(
75 stream_name,
76 subscription,
77 CollectChunks::builder()
78 .sink(CollectedBytes::new())
79 .f(move |chunk: Chunk, sink: &mut CollectedBytes| {
80 sink.push_chunk(chunk.as_ref(), options);
81 })
82 .build(),
83 )
84}
85
86impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
87where
88 Stdout: OutputStream + Subscribable,
89 Stderr: OutputStream + Subscribable,
90{
91 pub(crate) async fn try_spawn_output_collectors<StdoutSink, StderrSink, FOut, FErr>(
98 &mut self,
99 spawn_stdout: FOut,
100 spawn_stderr: FErr,
101 ) -> Result<(Consumer<StdoutSink>, Consumer<StderrSink>), WaitWithOutputError>
102 where
103 StdoutSink: Sink,
104 StderrSink: Sink,
105 FOut: FnOnce(&'static str, Stdout::Subscription) -> Consumer<StdoutSink>,
106 FErr: FnOnce(&'static str, Stderr::Subscription) -> Consumer<StderrSink>,
107 {
108 let out_subscription = self.stdout().try_subscribe().map_err(|source| {
109 WaitWithOutputError::OutputCollectionStartFailed {
110 process_name: self.name.clone(),
111 source: Box::new(source),
112 }
113 })?;
114 let out_collector = spawn_stdout(self.stdout().name(), out_subscription);
115
116 let err_subscription = match self.stderr().try_subscribe() {
117 Ok(subscription) => subscription,
118 Err(source) => {
119 out_collector.abort().await;
120 return Err(WaitWithOutputError::OutputCollectionStartFailed {
121 process_name: self.name.clone(),
122 source: Box::new(source),
123 });
124 }
125 };
126 let err_collector = spawn_stderr(self.stderr().name(), err_subscription);
127
128 Ok((out_collector, err_collector))
129 }
130}