mod drain;
pub(crate) mod options;
pub(crate) mod output;
#[cfg(test)]
mod tests;
use self::drain::{
wait_for_completion_or_terminate_with_collectors, wait_for_completion_with_collectors,
};
use self::output::ProcessOutput;
use super::ProcessHandle;
use crate::error::{
WaitForCompletionOrTerminateResult, WaitForCompletionResult, WaitWithOutputError,
};
use crate::output_stream::consumer::{Consumer, spawn_consumer_sync};
use crate::output_stream::event::Chunk;
use crate::output_stream::line::adapter::LineAdapter;
use crate::output_stream::line::options::LineParsingOptions;
use crate::output_stream::visitors::collect::{CollectChunks, CollectLineSink};
use crate::output_stream::{Next, Subscription, TrySubscribable};
use crate::process_handle::WaitForCompletionOrTerminateOptions;
use crate::process_handle::output_collection::options::{LineOutputOptions, RawOutputOptions};
use crate::{CollectedBytes, CollectedLines, LineCollectionOptions, RawCollectionOptions};
use std::borrow::Cow;
fn spawn_lines_into_vec_consumer<S>(
stream_name: &'static str,
subscription: S,
parsing_options: LineParsingOptions,
collection_options: LineCollectionOptions,
) -> Consumer<CollectedLines>
where
S: Subscription,
{
spawn_consumer_sync(
stream_name,
subscription,
LineAdapter::new(
parsing_options,
CollectLineSink::new(
CollectedLines::new(),
move |line: Cow<'_, str>, sink: &mut CollectedLines| {
sink.push_line(line.into_owned(), collection_options);
Next::Continue
},
),
),
)
}
fn spawn_chunks_into_vec_consumer<S>(
stream_name: &'static str,
subscription: S,
options: RawCollectionOptions,
) -> Consumer<CollectedBytes>
where
S: Subscription,
{
spawn_consumer_sync(
stream_name,
subscription,
CollectChunks::builder()
.sink(CollectedBytes::new())
.f(move |chunk: Chunk, sink: &mut CollectedBytes| {
sink.push_chunk(chunk.as_ref(), options);
})
.build(),
)
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: TrySubscribable,
Stderr: TrySubscribable,
{
pub async fn wait_for_completion_with_output(
&mut self,
timeout: std::time::Duration,
output_options: LineOutputOptions,
) -> Result<WaitForCompletionResult<ProcessOutput<CollectedLines>>, WaitWithOutputError> {
let LineOutputOptions {
line_parsing_options,
stdout_collection_options,
stderr_collection_options,
} = output_options;
let stdout = self.stdout();
let out_subscription = stdout.try_subscribe().map_err(|source| {
WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
}
})?;
let out_collector = spawn_lines_into_vec_consumer(
stdout.name(),
out_subscription,
line_parsing_options,
stdout_collection_options,
);
let stderr = self.stderr();
let err_subscription = match stderr.try_subscribe() {
Ok(subscription) => subscription,
Err(source) => {
out_collector.abort().await;
return Err(WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
});
}
};
let err_collector = spawn_lines_into_vec_consumer(
stderr.name(),
err_subscription,
line_parsing_options,
stderr_collection_options,
);
let result =
wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
.await?;
Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
status,
stdout,
stderr,
}))
}
pub async fn wait_for_completion_with_raw_output(
&mut self,
timeout: std::time::Duration,
output_options: RawOutputOptions,
) -> Result<WaitForCompletionResult<ProcessOutput<CollectedBytes>>, WaitWithOutputError> {
let RawOutputOptions {
stdout_collection_options,
stderr_collection_options,
} = output_options;
let stdout = self.stdout();
let out_subscription = stdout.try_subscribe().map_err(|source| {
WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
}
})?;
let out_collector = spawn_chunks_into_vec_consumer(
stdout.name(),
out_subscription,
stdout_collection_options,
);
let stderr = self.stderr();
let err_subscription = match stderr.try_subscribe() {
Ok(subscription) => subscription,
Err(source) => {
out_collector.abort().await;
return Err(WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
});
}
};
let err_collector = spawn_chunks_into_vec_consumer(
stderr.name(),
err_subscription,
stderr_collection_options,
);
let result =
wait_for_completion_with_collectors(self, timeout, out_collector, err_collector)
.await?;
Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
status,
stdout,
stderr,
}))
}
pub async fn wait_for_completion_with_output_or_terminate(
&mut self,
options: WaitForCompletionOrTerminateOptions,
output_options: LineOutputOptions,
) -> Result<
WaitForCompletionOrTerminateResult<ProcessOutput<CollectedLines>>,
WaitWithOutputError,
> {
let LineOutputOptions {
line_parsing_options,
stdout_collection_options,
stderr_collection_options,
} = output_options;
let stdout = self.stdout();
let out_subscription = stdout.try_subscribe().map_err(|source| {
WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
}
})?;
let out_collector = spawn_lines_into_vec_consumer(
stdout.name(),
out_subscription,
line_parsing_options,
stdout_collection_options,
);
let stderr = self.stderr();
let err_subscription = match stderr.try_subscribe() {
Ok(subscription) => subscription,
Err(source) => {
out_collector.abort().await;
return Err(WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
});
}
};
let err_collector = spawn_lines_into_vec_consumer(
stderr.name(),
err_subscription,
line_parsing_options,
stderr_collection_options,
);
let result = wait_for_completion_or_terminate_with_collectors(
self,
options.wait_timeout,
options.interrupt_timeout,
options.terminate_timeout,
out_collector,
err_collector,
)
.await?;
Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
status,
stdout,
stderr,
}))
}
pub async fn wait_for_completion_with_raw_output_or_terminate(
&mut self,
options: WaitForCompletionOrTerminateOptions,
output_options: RawOutputOptions,
) -> Result<
WaitForCompletionOrTerminateResult<ProcessOutput<CollectedBytes>>,
WaitWithOutputError,
> {
let RawOutputOptions {
stdout_collection_options,
stderr_collection_options,
} = output_options;
let stdout = self.stdout();
let out_subscription = stdout.try_subscribe().map_err(|source| {
WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
}
})?;
let out_collector = spawn_chunks_into_vec_consumer(
stdout.name(),
out_subscription,
stdout_collection_options,
);
let stderr = self.stderr();
let err_subscription = match stderr.try_subscribe() {
Ok(subscription) => subscription,
Err(source) => {
out_collector.abort().await;
return Err(WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source,
});
}
};
let err_collector = spawn_chunks_into_vec_consumer(
stderr.name(),
err_subscription,
stderr_collection_options,
);
let result = wait_for_completion_or_terminate_with_collectors(
self,
options.wait_timeout,
options.interrupt_timeout,
options.terminate_timeout,
out_collector,
err_collector,
)
.await?;
Ok(result.map(|(status, stdout, stderr)| ProcessOutput {
status,
stdout,
stderr,
}))
}
}