tokio_process_tools/process_handle/output_collection/
mod.rs1mod drain;
5pub(crate) mod options;
6pub(crate) mod output;
7#[cfg(test)]
8mod tests;
9
10use self::drain::{
11 wait_for_completion_or_terminate_with_collectors, wait_for_completion_with_collectors,
12};
13use self::output::ProcessOutput;
14use super::ProcessHandle;
15use crate::error::{
16 WaitForCompletionOrTerminateResult, WaitForCompletionResult, WaitWithOutputError,
17};
18use crate::output_stream::consumer::{Consumer, spawn_consumer_sync};
19use crate::output_stream::event::Chunk;
20use crate::output_stream::line::adapter::LineAdapter;
21use crate::output_stream::line::options::LineParsingOptions;
22use crate::output_stream::visitors::collect::{CollectChunks, CollectLineSink};
23use crate::output_stream::{Next, Subscription, TrySubscribable};
24use crate::process_handle::WaitForCompletionOrTerminateOptions;
25use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
26use crate::{CollectedBytes, CollectedLines, LineCollectionOptions, RawCollectionOptions};
27use std::borrow::Cow;
28
29fn spawn_lines_into_vec_consumer<S>(
33 stream_name: &'static str,
34 subscription: S,
35 parsing_options: LineParsingOptions,
36 collection_options: LineCollectionOptions,
37) -> Consumer<CollectedLines>
38where
39 S: Subscription,
40{
41 spawn_consumer_sync(
42 stream_name,
43 subscription,
44 LineAdapter::new(
45 parsing_options,
46 CollectLineSink::new(
47 CollectedLines::new(),
48 move |line: Cow<'_, str>, sink: &mut CollectedLines| {
49 sink.push_line(line.into_owned(), collection_options);
50 Next::Continue
51 },
52 ),
53 ),
54 )
55}
56
57fn spawn_chunks_into_vec_consumer<S>(
59 stream_name: &'static str,
60 subscription: S,
61 options: RawCollectionOptions,
62) -> Consumer<CollectedBytes>
63where
64 S: Subscription,
65{
66 spawn_consumer_sync(
67 stream_name,
68 subscription,
69 CollectChunks::builder()
70 .sink(CollectedBytes::new())
71 .f(move |chunk: Chunk, sink: &mut CollectedBytes| {
72 sink.push_chunk(chunk.as_ref(), options);
73 })
74 .build(),
75 )
76}
77
78impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
79where
80 Stdout: TrySubscribable,
81 Stderr: TrySubscribable,
82{
83 pub async fn wait_for_completion_with_output(
98 &mut self,
99 timeout: std::time::Duration,
100 output_options: LineOutputOptions,
101 ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError> {
102 let LineOutputOptions {
103 line_parsing_options,
104 stdout_collection_options,
105 stderr_collection_options,
106 } = output_options;
107 let stdout = self.stdout();
108 let out_subscription = stdout.try_subscribe().map_err(|source| {
109 WaitWithOutputError::OutputCollectionStartFailed {
110 process_name: self.name.clone(),
111 source,
112 }
113 })?;
114 let out_collector = spawn_lines_into_vec_consumer(
115 stdout.name(),
116 out_subscription,
117 line_parsing_options,
118 stdout_collection_options,
119 );
120 let stderr = self.stderr();
121 let err_subscription = match stderr.try_subscribe() {
122 Ok(subscription) => subscription,
123 Err(source) => {
124 out_collector.abort().await;
125 return Err(WaitWithOutputError::OutputCollectionStartFailed {
126 process_name: self.name.clone(),
127 source,
128 });
129 }
130 };
131 let err_collector = spawn_lines_into_vec_consumer(
132 stderr.name(),
133 err_subscription,
134 line_parsing_options,
135 stderr_collection_options,
136 );
137
138 let result =
139 wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
140 .await?;
141
142 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
143 status,
144 stdout,
145 stderr,
146 }))
147 }
148
149 pub async fn wait_for_completion_with_raw_output(
161 &mut self,
162 timeout: std::time::Duration,
163 output_options: RawOutputOptions,
164 ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError> {
165 let RawOutputOptions {
166 stdout_collection_options,
167 stderr_collection_options,
168 } = output_options;
169 let stdout = self.stdout();
170 let out_subscription = stdout.try_subscribe().map_err(|source| {
171 WaitWithOutputError::OutputCollectionStartFailed {
172 process_name: self.name.clone(),
173 source,
174 }
175 })?;
176 let out_collector = spawn_chunks_into_vec_consumer(
177 stdout.name(),
178 out_subscription,
179 stdout_collection_options,
180 );
181 let stderr = self.stderr();
182 let err_subscription = match stderr.try_subscribe() {
183 Ok(subscription) => subscription,
184 Err(source) => {
185 out_collector.abort().await;
186 return Err(WaitWithOutputError::OutputCollectionStartFailed {
187 process_name: self.name.clone(),
188 source,
189 });
190 }
191 };
192 let err_collector = spawn_chunks_into_vec_consumer(
193 stderr.name(),
194 err_subscription,
195 stderr_collection_options,
196 );
197
198 let result =
199 wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
200 .await?;
201
202 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
203 status,
204 stdout,
205 stderr,
206 }))
207 }
208
209 pub async fn wait_for_completion_with_output_or_terminate(
224 &mut self,
225 options: WaitForCompletionOrTerminateOptions,
226 output_options: LineOutputOptions,
227 ) -> Result<
228 WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
229 WaitWithOutputError,
230 > {
231 let LineOutputOptions {
232 line_parsing_options,
233 stdout_collection_options,
234 stderr_collection_options,
235 } = output_options;
236 let stdout = self.stdout();
237 let out_subscription = stdout.try_subscribe().map_err(|source| {
238 WaitWithOutputError::OutputCollectionStartFailed {
239 process_name: self.name.clone(),
240 source,
241 }
242 })?;
243 let out_collector = spawn_lines_into_vec_consumer(
244 stdout.name(),
245 out_subscription,
246 line_parsing_options,
247 stdout_collection_options,
248 );
249 let stderr = self.stderr();
250 let err_subscription = match stderr.try_subscribe() {
251 Ok(subscription) => subscription,
252 Err(source) => {
253 out_collector.abort().await;
254 return Err(WaitWithOutputError::OutputCollectionStartFailed {
255 process_name: self.name.clone(),
256 source,
257 });
258 }
259 };
260 let err_collector = spawn_lines_into_vec_consumer(
261 stderr.name(),
262 err_subscription,
263 line_parsing_options,
264 stderr_collection_options,
265 );
266
267 let result = wait_for_completion_or_terminate_with_collectors(
268 self,
269 options.wait_timeout,
270 options.interrupt_timeout,
271 options.terminate_timeout,
272 out_collector,
273 err_collector,
274 )
275 .await?;
276
277 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
278 status,
279 stdout,
280 stderr,
281 }))
282 }
283
284 pub async fn wait_for_completion_with_raw_output_or_terminate(
299 &mut self,
300 options: WaitForCompletionOrTerminateOptions,
301 output_options: RawOutputOptions,
302 ) -> Result<
303 WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
304 WaitWithOutputError,
305 > {
306 let RawOutputOptions {
307 stdout_collection_options,
308 stderr_collection_options,
309 } = output_options;
310 let stdout = self.stdout();
311 let out_subscription = stdout.try_subscribe().map_err(|source| {
312 WaitWithOutputError::OutputCollectionStartFailed {
313 process_name: self.name.clone(),
314 source,
315 }
316 })?;
317 let out_collector = spawn_chunks_into_vec_consumer(
318 stdout.name(),
319 out_subscription,
320 stdout_collection_options,
321 );
322 let stderr = self.stderr();
323 let err_subscription = match stderr.try_subscribe() {
324 Ok(subscription) => subscription,
325 Err(source) => {
326 out_collector.abort().await;
327 return Err(WaitWithOutputError::OutputCollectionStartFailed {
328 process_name: self.name.clone(),
329 source,
330 });
331 }
332 };
333 let err_collector = spawn_chunks_into_vec_consumer(
334 stderr.name(),
335 err_subscription,
336 stderr_collection_options,
337 );
338
339 let result = wait_for_completion_or_terminate_with_collectors(
340 self,
341 options.wait_timeout,
342 options.interrupt_timeout,
343 options.terminate_timeout,
344 out_collector,
345 err_collector,
346 )
347 .await?;
348
349 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
350 status,
351 stdout,
352 stderr,
353 }))
354 }
355}