pub(crate) mod drain;
pub(crate) mod options;
use super::ProcessHandle;
use crate::error::WaitWithOutputError;
use crate::output_stream::consumer::{Consumer, Sink, spawn_consumer_sync};
use crate::output_stream::event::Chunk;
use crate::output_stream::line::adapter::ParseLines;
use crate::output_stream::line::options::LineParsingOptions;
use crate::output_stream::visitors::collect::CollectChunks;
use crate::output_stream::{Next, Subscription};
use crate::{
CollectedBytes, CollectedLines, LineCollectionOptions, OutputStream, RawCollectionOptions,
Subscribable,
};
use std::borrow::Cow;
use std::process::ExitStatus;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProcessOutput<Stdout, Stderr = Stdout> {
pub status: ExitStatus,
pub stdout: Stdout,
pub stderr: Stderr,
}
pub(crate) fn spawn_line_collector<S>(
stream_name: &'static str,
subscription: S,
parsing_options: LineParsingOptions,
collection_options: LineCollectionOptions,
) -> Consumer<CollectedLines>
where
S: Subscription,
{
spawn_consumer_sync(
stream_name,
subscription,
ParseLines::collect(
parsing_options,
CollectedLines::new(),
move |line: Cow<'_, str>, sink: &mut CollectedLines| {
sink.push_line(line.into_owned(), collection_options);
Next::Continue
},
),
)
}
pub(crate) fn spawn_chunk_collector<S>(
stream_name: &'static str,
subscription: S,
options: RawCollectionOptions,
) -> Consumer<CollectedBytes>
where
S: Subscription,
{
spawn_consumer_sync(
stream_name,
subscription,
CollectChunks::builder()
.sink(CollectedBytes::new())
.f(move |chunk: Chunk, sink: &mut CollectedBytes| {
sink.push_chunk(chunk.as_ref(), options);
})
.build(),
)
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream + Subscribable,
Stderr: OutputStream + Subscribable,
{
pub(crate) async fn try_spawn_output_collectors<StdoutSink, StderrSink, FOut, FErr>(
&mut self,
spawn_stdout: FOut,
spawn_stderr: FErr,
) -> Result<(Consumer<StdoutSink>, Consumer<StderrSink>), WaitWithOutputError>
where
StdoutSink: Sink,
StderrSink: Sink,
FOut: FnOnce(&'static str, Stdout::Subscription) -> Consumer<StdoutSink>,
FErr: FnOnce(&'static str, Stderr::Subscription) -> Consumer<StderrSink>,
{
let out_subscription = self.stdout().try_subscribe().map_err(|source| {
WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source: Box::new(source),
}
})?;
let out_collector = spawn_stdout(self.stdout().name(), out_subscription);
let err_subscription = match self.stderr().try_subscribe() {
Ok(subscription) => subscription,
Err(source) => {
out_collector.abort().await;
return Err(WaitWithOutputError::OutputCollectionStartFailed {
process_name: self.name.clone(),
source: Box::new(source),
});
}
};
let err_collector = spawn_stderr(self.stderr().name(), err_subscription);
Ok((out_collector, err_collector))
}
}