tokio_process_tools/process_handle/output_collection/
mod.rs1mod drain;
5pub(crate) mod options;
6pub(crate) mod output;
7#[cfg(test)]
8mod tests;
9
10#[cfg(any(unix, windows))]
11use self::drain::wait_for_completion_or_terminate_with_collectors;
12use self::drain::wait_for_completion_with_collectors;
13use self::output::ProcessOutput;
14use super::ProcessHandle;
15#[cfg(any(unix, windows))]
16use crate::error::WaitForCompletionOrTerminateResult;
17use crate::error::{WaitForCompletionResult, WaitWithOutputError};
18use crate::output_stream::consumer::{Consumer, spawn_consumer_sync};
19use crate::output_stream::event::Chunk;
20use crate::output_stream::line::adapter::LineAdapter;
21use crate::output_stream::line::options::LineParsingOptions;
22use crate::output_stream::visitors::collect::{CollectChunks, CollectLineSink};
23use crate::output_stream::{Next, Subscription, TrySubscribable};
24#[cfg(any(unix, windows))]
25use crate::process_handle::WaitForCompletionOrTerminateOptions;
26use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
27use crate::{CollectedBytes, CollectedLines, LineCollectionOptions, RawCollectionOptions};
28use std::borrow::Cow;
29
30fn spawn_lines_into_vec_consumer<S>(
34 stream_name: &'static str,
35 subscription: S,
36 parsing_options: LineParsingOptions,
37 collection_options: LineCollectionOptions,
38) -> Consumer<CollectedLines>
39where
40 S: Subscription,
41{
42 spawn_consumer_sync(
43 stream_name,
44 subscription,
45 LineAdapter::new(
46 parsing_options,
47 CollectLineSink::new(
48 CollectedLines::new(),
49 move |line: Cow<'_, str>, sink: &mut CollectedLines| {
50 sink.push_line(line.into_owned(), collection_options);
51 Next::Continue
52 },
53 ),
54 ),
55 )
56}
57
58fn spawn_chunks_into_vec_consumer<S>(
60 stream_name: &'static str,
61 subscription: S,
62 options: RawCollectionOptions,
63) -> Consumer<CollectedBytes>
64where
65 S: Subscription,
66{
67 spawn_consumer_sync(
68 stream_name,
69 subscription,
70 CollectChunks::builder()
71 .sink(CollectedBytes::new())
72 .f(move |chunk: Chunk, sink: &mut CollectedBytes| {
73 sink.push_chunk(chunk.as_ref(), options);
74 })
75 .build(),
76 )
77}
78
79impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
80where
81 Stdout: TrySubscribable,
82 Stderr: TrySubscribable,
83{
84 pub async fn wait_for_completion_with_output(
99 &mut self,
100 timeout: std::time::Duration,
101 output_options: LineOutputOptions,
102 ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError> {
103 let LineOutputOptions {
104 line_parsing_options,
105 stdout_collection_options,
106 stderr_collection_options,
107 } = output_options;
108 let stdout = self.stdout();
109 let out_subscription = stdout.try_subscribe().map_err(|source| {
110 WaitWithOutputError::OutputCollectionStartFailed {
111 process_name: self.name.clone(),
112 source,
113 }
114 })?;
115 let out_collector = spawn_lines_into_vec_consumer(
116 stdout.name(),
117 out_subscription,
118 line_parsing_options,
119 stdout_collection_options,
120 );
121 let stderr = self.stderr();
122 let err_subscription = match stderr.try_subscribe() {
123 Ok(subscription) => subscription,
124 Err(source) => {
125 out_collector.abort().await;
126 return Err(WaitWithOutputError::OutputCollectionStartFailed {
127 process_name: self.name.clone(),
128 source,
129 });
130 }
131 };
132 let err_collector = spawn_lines_into_vec_consumer(
133 stderr.name(),
134 err_subscription,
135 line_parsing_options,
136 stderr_collection_options,
137 );
138
139 let result =
140 wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
141 .await?;
142
143 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
144 status,
145 stdout,
146 stderr,
147 }))
148 }
149
150 pub async fn wait_for_completion_with_raw_output(
162 &mut self,
163 timeout: std::time::Duration,
164 output_options: RawOutputOptions,
165 ) -> Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError> {
166 let RawOutputOptions {
167 stdout_collection_options,
168 stderr_collection_options,
169 } = output_options;
170 let stdout = self.stdout();
171 let out_subscription = stdout.try_subscribe().map_err(|source| {
172 WaitWithOutputError::OutputCollectionStartFailed {
173 process_name: self.name.clone(),
174 source,
175 }
176 })?;
177 let out_collector = spawn_chunks_into_vec_consumer(
178 stdout.name(),
179 out_subscription,
180 stdout_collection_options,
181 );
182 let stderr = self.stderr();
183 let err_subscription = match stderr.try_subscribe() {
184 Ok(subscription) => subscription,
185 Err(source) => {
186 out_collector.abort().await;
187 return Err(WaitWithOutputError::OutputCollectionStartFailed {
188 process_name: self.name.clone(),
189 source,
190 });
191 }
192 };
193 let err_collector = spawn_chunks_into_vec_consumer(
194 stderr.name(),
195 err_subscription,
196 stderr_collection_options,
197 );
198
199 let result =
200 wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
201 .await?;
202
203 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
204 status,
205 stdout,
206 stderr,
207 }))
208 }
209
210 #[cfg(any(unix, windows))]
225 pub async fn wait_for_completion_with_output_or_terminate(
226 &mut self,
227 options: WaitForCompletionOrTerminateOptions,
228 output_options: LineOutputOptions,
229 ) -> Result<
230 WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
231 WaitWithOutputError,
232 > {
233 let LineOutputOptions {
234 line_parsing_options,
235 stdout_collection_options,
236 stderr_collection_options,
237 } = output_options;
238 let stdout = self.stdout();
239 let out_subscription = stdout.try_subscribe().map_err(|source| {
240 WaitWithOutputError::OutputCollectionStartFailed {
241 process_name: self.name.clone(),
242 source,
243 }
244 })?;
245 let out_collector = spawn_lines_into_vec_consumer(
246 stdout.name(),
247 out_subscription,
248 line_parsing_options,
249 stdout_collection_options,
250 );
251 let stderr = self.stderr();
252 let err_subscription = match stderr.try_subscribe() {
253 Ok(subscription) => subscription,
254 Err(source) => {
255 out_collector.abort().await;
256 return Err(WaitWithOutputError::OutputCollectionStartFailed {
257 process_name: self.name.clone(),
258 source,
259 });
260 }
261 };
262 let err_collector = spawn_lines_into_vec_consumer(
263 stderr.name(),
264 err_subscription,
265 line_parsing_options,
266 stderr_collection_options,
267 );
268
269 let result = wait_for_completion_or_terminate_with_collectors(
270 self,
271 options.wait_timeout,
272 options.graceful_timeouts,
273 out_collector,
274 err_collector,
275 )
276 .await?;
277
278 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
279 status,
280 stdout,
281 stderr,
282 }))
283 }
284
285 #[cfg(any(unix, windows))]
300 pub async fn wait_for_completion_with_raw_output_or_terminate(
301 &mut self,
302 options: WaitForCompletionOrTerminateOptions,
303 output_options: RawOutputOptions,
304 ) -> Result<
305 WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
306 WaitWithOutputError,
307 > {
308 let RawOutputOptions {
309 stdout_collection_options,
310 stderr_collection_options,
311 } = output_options;
312 let stdout = self.stdout();
313 let out_subscription = stdout.try_subscribe().map_err(|source| {
314 WaitWithOutputError::OutputCollectionStartFailed {
315 process_name: self.name.clone(),
316 source,
317 }
318 })?;
319 let out_collector = spawn_chunks_into_vec_consumer(
320 stdout.name(),
321 out_subscription,
322 stdout_collection_options,
323 );
324 let stderr = self.stderr();
325 let err_subscription = match stderr.try_subscribe() {
326 Ok(subscription) => subscription,
327 Err(source) => {
328 out_collector.abort().await;
329 return Err(WaitWithOutputError::OutputCollectionStartFailed {
330 process_name: self.name.clone(),
331 source,
332 });
333 }
334 };
335 let err_collector = spawn_chunks_into_vec_consumer(
336 stderr.name(),
337 err_subscription,
338 stderr_collection_options,
339 );
340
341 let result = wait_for_completion_or_terminate_with_collectors(
342 self,
343 options.wait_timeout,
344 options.graceful_timeouts,
345 out_collector,
346 err_collector,
347 )
348 .await?;
349
350 Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
351 status,
352 stdout,
353 stderr,
354 }))
355 }
356}