use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::time::Duration;
use crate::error::{WaitError, WaitForCompletionResult, WaitWithOutputError};
#[cfg(any(unix, windows))]
use crate::error::{WaitForCompletionOrTerminateResult, WaitOrTerminateError};
use crate::output_stream::line::options::LineParsingOptions;
use crate::output_stream::{OutputStream, Subscribable};
use crate::process_handle::ProcessHandle;
#[cfg(any(unix, windows))]
use crate::process_handle::output_collection::drain::wait_for_completion_or_terminate_with_collectors;
use crate::process_handle::output_collection::drain::wait_for_completion_with_collectors;
use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
use crate::process_handle::output_collection::{
ProcessOutput, spawn_chunk_collector, spawn_line_collector,
};
#[cfg(any(unix, windows))]
use crate::process_handle::termination::GracefulShutdown;
use crate::{CollectedBytes, CollectedLines};
pub mod state {
#[cfg(any(unix, windows))]
use super::GracefulShutdown;
use super::{Duration, LineOutputOptions, LineParsingOptions, RawOutputOptions};
#[derive(Debug)]
pub struct NoOutput;
#[derive(Debug)]
pub struct LineOutput {
pub(super) eof_timeout: Duration,
pub(super) line_parsing_options: LineParsingOptions,
pub(super) options: LineOutputOptions,
}
#[derive(Debug)]
pub struct RawOutput {
pub(super) eof_timeout: Duration,
pub(super) options: RawOutputOptions,
}
#[derive(Debug)]
pub struct NoTerminate;
#[cfg(any(unix, windows))]
#[derive(Debug)]
pub struct WithTerminate {
pub(super) shutdown: GracefulShutdown,
}
}
#[cfg(any(unix, windows))]
use state::WithTerminate;
use state::{LineOutput, NoOutput, NoTerminate, RawOutput};
#[must_use = "calling `wait_for_completion(...)` only configures the wait. \
`.await` the builder (or chain `.with_*_output(...)` / `.or_terminate(...)` first) \
to actually run it."]
pub struct WaitForCompletion<'a, Stdout, Stderr, Output, Terminate>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
handle: &'a mut ProcessHandle<Stdout, Stderr>,
timeout: Duration,
output: Output,
terminate: Terminate,
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub fn wait_for_completion(
&mut self,
timeout: Duration,
) -> WaitForCompletion<'_, Stdout, Stderr, NoOutput, NoTerminate> {
WaitForCompletion {
handle: self,
timeout,
output: NoOutput,
terminate: NoTerminate,
}
}
}
impl<'a, Stdout, Stderr> WaitForCompletion<'a, Stdout, Stderr, NoOutput, NoTerminate>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub fn with_line_output(
self,
eof_timeout: Duration,
line_parsing_options: LineParsingOptions,
options: LineOutputOptions,
) -> WaitForCompletion<'a, Stdout, Stderr, LineOutput, NoTerminate> {
WaitForCompletion {
handle: self.handle,
timeout: self.timeout,
output: LineOutput {
eof_timeout,
line_parsing_options,
options,
},
terminate: NoTerminate,
}
}
pub fn with_raw_output(
self,
eof_timeout: Duration,
options: RawOutputOptions,
) -> WaitForCompletion<'a, Stdout, Stderr, RawOutput, NoTerminate> {
WaitForCompletion {
handle: self.handle,
timeout: self.timeout,
output: RawOutput {
eof_timeout,
options,
},
terminate: NoTerminate,
}
}
}
#[cfg(any(unix, windows))]
impl<'a, Stdout, Stderr, Output> WaitForCompletion<'a, Stdout, Stderr, Output, NoTerminate>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub fn or_terminate(
self,
shutdown: GracefulShutdown,
) -> WaitForCompletion<'a, Stdout, Stderr, Output, WithTerminate> {
WaitForCompletion {
handle: self.handle,
timeout: self.timeout,
output: self.output,
terminate: WithTerminate { shutdown },
}
}
}
type BoxFut<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
impl<'a, Stdout, Stderr> IntoFuture for WaitForCompletion<'a, Stdout, Stderr, NoOutput, NoTerminate>
where
Stdout: OutputStream + Send + 'a,
Stderr: OutputStream + Send + 'a,
{
type Output = Result<WaitForCompletionResult, WaitError>;
type IntoFuture = BoxFut<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { self.handle.wait_for_completion_inner(self.timeout).await })
}
}
#[cfg(any(unix, windows))]
impl<'a, Stdout, Stderr> IntoFuture
for WaitForCompletion<'a, Stdout, Stderr, NoOutput, WithTerminate>
where
Stdout: OutputStream + Send + 'a,
Stderr: OutputStream + Send + 'a,
{
type Output = Result<WaitForCompletionOrTerminateResult, WaitOrTerminateError>;
type IntoFuture = BoxFut<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
self.handle
.wait_for_completion_or_terminate_inner(self.timeout, self.terminate.shutdown)
.await
})
}
}
impl<'a, Stdout, Stderr> IntoFuture
for WaitForCompletion<'a, Stdout, Stderr, LineOutput, NoTerminate>
where
Stdout: OutputStream + Subscribable + Send + 'a,
Stderr: OutputStream + Subscribable + Send + 'a,
{
type Output =
Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError>;
type IntoFuture = BoxFut<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let LineOutputOptions {
stdout_collection_options,
stderr_collection_options,
} = self.output.options;
let line_parsing_options = self.output.line_parsing_options;
let (out_collector, err_collector) = self
.handle
.try_spawn_output_collectors(
|name, sub| {
spawn_line_collector(
name,
sub,
line_parsing_options,
stdout_collection_options,
)
},
|name, sub| {
spawn_line_collector(
name,
sub,
line_parsing_options,
stderr_collection_options,
)
},
)
.await?;
wait_for_completion_with_collectors(
self.handle,
self.timeout,
self.output.eof_timeout,
out_collector,
err_collector,
)
.await
})
}
}
impl<'a, Stdout, Stderr> IntoFuture
for WaitForCompletion<'a, Stdout, Stderr, RawOutput, NoTerminate>
where
Stdout: OutputStream + Subscribable + Send + 'a,
Stderr: OutputStream + Subscribable + Send + 'a,
{
type Output =
Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError>;
type IntoFuture = BoxFut<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let RawOutputOptions {
stdout_collection_options,
stderr_collection_options,
} = self.output.options;
let (out_collector, err_collector) = self
.handle
.try_spawn_output_collectors(
|name, sub| spawn_chunk_collector(name, sub, stdout_collection_options),
|name, sub| spawn_chunk_collector(name, sub, stderr_collection_options),
)
.await?;
wait_for_completion_with_collectors(
self.handle,
self.timeout,
self.output.eof_timeout,
out_collector,
err_collector,
)
.await
})
}
}
#[cfg(any(unix, windows))]
impl<'a, Stdout, Stderr> IntoFuture
for WaitForCompletion<'a, Stdout, Stderr, LineOutput, WithTerminate>
where
Stdout: OutputStream + Subscribable + Send + 'a,
Stderr: OutputStream + Subscribable + Send + 'a,
{
type Output = Result<
WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
WaitWithOutputError,
>;
type IntoFuture = BoxFut<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let LineOutputOptions {
stdout_collection_options,
stderr_collection_options,
} = self.output.options;
let line_parsing_options = self.output.line_parsing_options;
let (out_collector, err_collector) = self
.handle
.try_spawn_output_collectors(
|name, sub| {
spawn_line_collector(
name,
sub,
line_parsing_options,
stdout_collection_options,
)
},
|name, sub| {
spawn_line_collector(
name,
sub,
line_parsing_options,
stderr_collection_options,
)
},
)
.await?;
wait_for_completion_or_terminate_with_collectors(
self.handle,
self.timeout,
self.terminate.shutdown,
self.output.eof_timeout,
out_collector,
err_collector,
)
.await
})
}
}
#[cfg(any(unix, windows))]
impl<'a, Stdout, Stderr> IntoFuture
for WaitForCompletion<'a, Stdout, Stderr, RawOutput, WithTerminate>
where
Stdout: OutputStream + Subscribable + Send + 'a,
Stderr: OutputStream + Subscribable + Send + 'a,
{
type Output = Result<
WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
WaitWithOutputError,
>;
type IntoFuture = BoxFut<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let RawOutputOptions {
stdout_collection_options,
stderr_collection_options,
} = self.output.options;
let (out_collector, err_collector) = self
.handle
.try_spawn_output_collectors(
|name, sub| spawn_chunk_collector(name, sub, stdout_collection_options),
|name, sub| spawn_chunk_collector(name, sub, stderr_collection_options),
)
.await?;
wait_for_completion_or_terminate_with_collectors(
self.handle,
self.timeout,
self.terminate.shutdown,
self.output.eof_timeout,
out_collector,
err_collector,
)
.await
})
}
}