1use std::future::{Future, IntoFuture};
27use std::pin::Pin;
28use std::time::Duration;
29
30use crate::error::{WaitError, WaitForCompletionResult, WaitWithOutputError};
31#[cfg(any(unix, windows))]
32use crate::error::{WaitForCompletionOrTerminateResult, WaitOrTerminateError};
33use crate::output_stream::line::options::LineParsingOptions;
34use crate::output_stream::{OutputStream, Subscribable};
35use crate::process_handle::ProcessHandle;
36#[cfg(any(unix, windows))]
37use crate::process_handle::output_collection::drain::wait_for_completion_or_terminate_with_collectors;
38use crate::process_handle::output_collection::drain::wait_for_completion_with_collectors;
39use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
40use crate::process_handle::output_collection::{
41 ProcessOutput, spawn_chunk_collector, spawn_line_collector,
42};
43#[cfg(any(unix, windows))]
44use crate::process_handle::termination::GracefulShutdown;
45use crate::{CollectedBytes, CollectedLines};
46
47pub mod state {
55 #[cfg(any(unix, windows))]
56 use super::GracefulShutdown;
57 use super::{Duration, LineOutputOptions, LineParsingOptions, RawOutputOptions};
58
59 #[derive(Debug)]
61 pub struct NoOutput;
62
63 #[derive(Debug)]
65 pub struct LineOutput {
66 pub(super) eof_timeout: Duration,
67 pub(super) line_parsing_options: LineParsingOptions,
68 pub(super) options: LineOutputOptions,
69 }
70
71 #[derive(Debug)]
73 pub struct RawOutput {
74 pub(super) eof_timeout: Duration,
75 pub(super) options: RawOutputOptions,
76 }
77
78 #[derive(Debug)]
80 pub struct NoTerminate;
81
82 #[cfg(any(unix, windows))]
84 #[derive(Debug)]
85 pub struct WithTerminate {
86 pub(super) shutdown: GracefulShutdown,
87 }
88}
89
90#[cfg(any(unix, windows))]
91use state::WithTerminate;
92use state::{LineOutput, NoOutput, NoTerminate, RawOutput};
93
94#[must_use = "calling `wait_for_completion(...)` only configures the wait. \
104 `.await` the builder (or chain `.with_*_output(...)` / `.or_terminate(...)` first) \
105 to actually run it."]
106pub struct WaitForCompletion<'a, Stdout, Stderr, Output, Terminate>
107where
108 Stdout: OutputStream,
109 Stderr: OutputStream,
110{
111 handle: &'a mut ProcessHandle<Stdout, Stderr>,
112 timeout: Duration,
113 output: Output,
114 terminate: Terminate,
115}
116
117impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
118where
119 Stdout: OutputStream,
120 Stderr: OutputStream,
121{
122 pub fn wait_for_completion(
133 &mut self,
134 timeout: Duration,
135 ) -> WaitForCompletion<'_, Stdout, Stderr, NoOutput, NoTerminate> {
136 WaitForCompletion {
137 handle: self,
138 timeout,
139 output: NoOutput,
140 terminate: NoTerminate,
141 }
142 }
143}
144
145impl<'a, Stdout, Stderr> WaitForCompletion<'a, Stdout, Stderr, NoOutput, NoTerminate>
146where
147 Stdout: OutputStream,
148 Stderr: OutputStream,
149{
150 pub fn with_line_output(
164 self,
165 eof_timeout: Duration,
166 line_parsing_options: LineParsingOptions,
167 options: LineOutputOptions,
168 ) -> WaitForCompletion<'a, Stdout, Stderr, LineOutput, NoTerminate> {
169 WaitForCompletion {
170 handle: self.handle,
171 timeout: self.timeout,
172 output: LineOutput {
173 eof_timeout,
174 line_parsing_options,
175 options,
176 },
177 terminate: NoTerminate,
178 }
179 }
180
181 pub fn with_raw_output(
187 self,
188 eof_timeout: Duration,
189 options: RawOutputOptions,
190 ) -> WaitForCompletion<'a, Stdout, Stderr, RawOutput, NoTerminate> {
191 WaitForCompletion {
192 handle: self.handle,
193 timeout: self.timeout,
194 output: RawOutput {
195 eof_timeout,
196 options,
197 },
198 terminate: NoTerminate,
199 }
200 }
201}
202
203#[cfg(any(unix, windows))]
204impl<'a, Stdout, Stderr, Output> WaitForCompletion<'a, Stdout, Stderr, Output, NoTerminate>
205where
206 Stdout: OutputStream,
207 Stderr: OutputStream,
208{
209 pub fn or_terminate(
221 self,
222 shutdown: GracefulShutdown,
223 ) -> WaitForCompletion<'a, Stdout, Stderr, Output, WithTerminate> {
224 WaitForCompletion {
225 handle: self.handle,
226 timeout: self.timeout,
227 output: self.output,
228 terminate: WithTerminate { shutdown },
229 }
230 }
231}
232
233type BoxFut<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
234
235impl<'a, Stdout, Stderr> IntoFuture for WaitForCompletion<'a, Stdout, Stderr, NoOutput, NoTerminate>
236where
237 Stdout: OutputStream + Send + 'a,
238 Stderr: OutputStream + Send + 'a,
239{
240 type Output = Result<WaitForCompletionResult, WaitError>;
241 type IntoFuture = BoxFut<'a, Self::Output>;
242
243 fn into_future(self) -> Self::IntoFuture {
244 Box::pin(async move { self.handle.wait_for_completion_inner(self.timeout).await })
245 }
246}
247
248#[cfg(any(unix, windows))]
249impl<'a, Stdout, Stderr> IntoFuture
250 for WaitForCompletion<'a, Stdout, Stderr, NoOutput, WithTerminate>
251where
252 Stdout: OutputStream + Send + 'a,
253 Stderr: OutputStream + Send + 'a,
254{
255 type Output = Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError>;
256 type IntoFuture = BoxFut<'a, Self::Output>;
257
258 fn into_future(self) -> Self::IntoFuture {
259 Box::pin(async move {
260 self.handle
261 .wait_for_completion_or_terminate_inner(self.timeout, self.terminate.shutdown)
262 .await
263 })
264 }
265}
266
267impl<'a, Stdout, Stderr> IntoFuture
268 for WaitForCompletion<'a, Stdout, Stderr, LineOutput, NoTerminate>
269where
270 Stdout: OutputStream + Subscribable + Send + 'a,
271 Stderr: OutputStream + Subscribable + Send + 'a,
272{
273 type Output =
274 Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError>;
275 type IntoFuture = BoxFut<'a, Self::Output>;
276
277 fn into_future(self) -> Self::IntoFuture {
278 Box::pin(async move {
279 let LineOutputOptions {
280 stdout_collection_options,
281 stderr_collection_options,
282 } = self.output.options;
283 let line_parsing_options = self.output.line_parsing_options;
284
285 let (out_collector, err_collector) = self
286 .handle
287 .try_spawn_output_collectors(
288 |name, sub| {
289 spawn_line_collector(
290 name,
291 sub,
292 line_parsing_options,
293 stdout_collection_options,
294 )
295 },
296 |name, sub| {
297 spawn_line_collector(
298 name,
299 sub,
300 line_parsing_options,
301 stderr_collection_options,
302 )
303 },
304 )
305 .await?;
306
307 wait_for_completion_with_collectors(
308 self.handle,
309 self.timeout,
310 self.output.eof_timeout,
311 out_collector,
312 err_collector,
313 )
314 .await
315 })
316 }
317}
318
319impl<'a, Stdout, Stderr> IntoFuture
320 for WaitForCompletion<'a, Stdout, Stderr, RawOutput, NoTerminate>
321where
322 Stdout: OutputStream + Subscribable + Send + 'a,
323 Stderr: OutputStream + Subscribable + Send + 'a,
324{
325 type Output =
326 Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError>;
327 type IntoFuture = BoxFut<'a, Self::Output>;
328
329 fn into_future(self) -> Self::IntoFuture {
330 Box::pin(async move {
331 let RawOutputOptions {
332 stdout_collection_options,
333 stderr_collection_options,
334 } = self.output.options;
335
336 let (out_collector, err_collector) = self
337 .handle
338 .try_spawn_output_collectors(
339 |name, sub| spawn_chunk_collector(name, sub, stdout_collection_options),
340 |name, sub| spawn_chunk_collector(name, sub, stderr_collection_options),
341 )
342 .await?;
343
344 wait_for_completion_with_collectors(
345 self.handle,
346 self.timeout,
347 self.output.eof_timeout,
348 out_collector,
349 err_collector,
350 )
351 .await
352 })
353 }
354}
355
356#[cfg(any(unix, windows))]
357impl<'a, Stdout, Stderr> IntoFuture
358 for WaitForCompletion<'a, Stdout, Stderr, LineOutput, WithTerminate>
359where
360 Stdout: OutputStream + Subscribable + Send + 'a,
361 Stderr: OutputStream + Subscribable + Send + 'a,
362{
363 type Output = Result<
364 WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
365 WaitWithOutputError,
366 >;
367 type IntoFuture = BoxFut<'a, Self::Output>;
368
369 fn into_future(self) -> Self::IntoFuture {
370 Box::pin(async move {
371 let LineOutputOptions {
372 stdout_collection_options,
373 stderr_collection_options,
374 } = self.output.options;
375 let line_parsing_options = self.output.line_parsing_options;
376
377 let (out_collector, err_collector) = self
378 .handle
379 .try_spawn_output_collectors(
380 |name, sub| {
381 spawn_line_collector(
382 name,
383 sub,
384 line_parsing_options,
385 stdout_collection_options,
386 )
387 },
388 |name, sub| {
389 spawn_line_collector(
390 name,
391 sub,
392 line_parsing_options,
393 stderr_collection_options,
394 )
395 },
396 )
397 .await?;
398
399 wait_for_completion_or_terminate_with_collectors(
400 self.handle,
401 self.timeout,
402 self.terminate.shutdown,
403 self.output.eof_timeout,
404 out_collector,
405 err_collector,
406 )
407 .await
408 })
409 }
410}
411
412#[cfg(any(unix, windows))]
413impl<'a, Stdout, Stderr> IntoFuture
414 for WaitForCompletion<'a, Stdout, Stderr, RawOutput, WithTerminate>
415where
416 Stdout: OutputStream + Subscribable + Send + 'a,
417 Stderr: OutputStream + Subscribable + Send + 'a,
418{
419 type Output = Result<
420 WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
421 WaitWithOutputError,
422 >;
423 type IntoFuture = BoxFut<'a, Self::Output>;
424
425 fn into_future(self) -> Self::IntoFuture {
426 Box::pin(async move {
427 let RawOutputOptions {
428 stdout_collection_options,
429 stderr_collection_options,
430 } = self.output.options;
431
432 let (out_collector, err_collector) = self
433 .handle
434 .try_spawn_output_collectors(
435 |name, sub| spawn_chunk_collector(name, sub, stdout_collection_options),
436 |name, sub| spawn_chunk_collector(name, sub, stderr_collection_options),
437 )
438 .await?;
439
440 wait_for_completion_or_terminate_with_collectors(
441 self.handle,
442 self.timeout,
443 self.terminate.shutdown,
444 self.output.eof_timeout,
445 out_collector,
446 err_collector,
447 )
448 .await
449 })
450 }
451}