use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::sync::mpsc;
pub enum PipeEvent {
Stdout(String),
Stderr(String),
StdoutEof,
StderrEof,
StdoutErr(std::io::Error),
StderrErr(std::io::Error),
}
pub fn spawn_pipe_reader<O, E>(
stdout: O,
stderr: E,
) -> mpsc::UnboundedReceiver<PipeEvent>
where
O: AsyncRead + Unpin + Send + 'static,
E: AsyncRead + Unpin + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let stdout_tx = tx.clone();
tokio::spawn(async move {
read_lines(
stdout,
stdout_tx,
PipeEvent::Stdout,
PipeEvent::StdoutEof,
PipeEvent::StdoutErr,
)
.await;
});
tokio::spawn(async move {
read_lines(
stderr,
tx,
PipeEvent::Stderr,
PipeEvent::StderrEof,
PipeEvent::StderrErr,
)
.await;
});
rx
}
async fn read_lines<R, F, G>(
reader: R,
tx: mpsc::UnboundedSender<PipeEvent>,
line_event: F,
eof_event: PipeEvent,
err_event_ctor: G,
) where
R: AsyncRead + Unpin,
F: Fn(String) -> PipeEvent,
G: FnOnce(std::io::Error) -> PipeEvent,
{
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
let _ = tx.send(eof_event);
return;
}
Ok(_) => {
let trimmed = line.trim_end_matches(['\r', '\n']).to_string();
if tx.send(line_event(trimmed)).is_err() {
return;
}
}
Err(e) => {
let _ = tx.send(err_event_ctor(e));
return;
}
}
}
}