#![deny(future_incompatible)]
#![deny(nonstandard_style)]
#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
use pin_project_lite::pin_project;
use std::{
fmt,
future::Future,
io,
pin::Pin,
process::{ExitStatus, Stdio},
task::{Context, Poll},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, ChildStderr, ChildStdout, Command},
};
use tokio_stream::{Stream, wrappers::LinesStream};
use tokio_util::io::ReaderStream;
#[derive(Debug)]
pub enum Item<Out> {
Stdout(Out),
Stderr(Out),
Done(io::Result<ExitStatus>),
}
impl<T> Item<T>
where
T: std::ops::Deref,
{
pub fn stdout(&self) -> Option<&T::Target> {
match self {
Self::Stdout(s) => Some(s),
_ => None,
}
}
pub fn stderr(&self) -> Option<&T::Target> {
match self {
Self::Stderr(s) => Some(s),
_ => None,
}
}
}
impl<Out: fmt::Display> fmt::Display for Item<Out> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Item::Stdout(s) => fmt::Display::fmt(&s, f),
Item::Stderr(s) => fmt::Display::fmt(&s, f),
_ => Ok(()),
}
}
}
pin_project! {
#[derive(Debug)]
pub struct ChildStream<Sout, Serr> {
child: Option<Child>,
stdout: Option<Sout>,
stderr: Option<Serr>,
}
}
impl<Sout, Serr> ChildStream<Sout, Serr> {
pub fn child(&self) -> Option<&Child> {
self.child.as_ref()
}
pub fn child_mut(&mut self) -> Option<&mut Child> {
self.child.as_mut()
}
}
impl<Sout, Serr> TryFrom<Command> for ChildStream<Sout, Serr>
where
ChildStream<Sout, Serr>: From<Child>,
{
type Error = io::Error;
fn try_from(mut command: Command) -> io::Result<Self> {
Self::try_from(&mut command)
}
}
impl<Sout, Serr> TryFrom<&mut Command> for ChildStream<Sout, Serr>
where
ChildStream<Sout, Serr>: From<Child>,
{
type Error = io::Error;
fn try_from(command: &mut Command) -> io::Result<Self> {
Ok(command
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?
.into())
}
}
impl<T, Sout, Serr> Stream for ChildStream<Sout, Serr>
where
Sout: Stream<Item = io::Result<T>> + std::marker::Unpin,
Serr: Stream<Item = io::Result<T>> + std::marker::Unpin,
{
type Item = Item<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.child.is_none() {
return Poll::Ready(None);
}
let this = self.project();
if let Some(stderr) = this.stderr {
match Pin::new(stderr).poll_next(cx) {
Poll::Ready(Some(line)) => {
return Poll::Ready(Some(Item::Stderr(line.unwrap())));
}
Poll::Ready(None) => {
*this.stderr = None;
}
Poll::Pending => {}
}
}
if let Some(stdout) = this.stdout {
match Pin::new(stdout).poll_next(cx) {
Poll::Ready(Some(line)) => {
return Poll::Ready(Some(Item::Stdout(line.unwrap())));
}
Poll::Ready(None) => {
*this.stdout = None;
}
Poll::Pending => {}
}
}
if this.stdout.is_none() && this.stderr.is_none() {
if let Some(mut child) = std::mem::take(&mut *this.child) {
if let Poll::Ready(sts) = Pin::new(&mut Box::pin(child.wait())).poll(cx) {
return Poll::Ready(Some(Item::Done(sts)));
}
*this.child = Some(child);
}
}
Poll::Pending
}
}
pub type ProcessLineStream =
ChildStream<LinesStream<BufReader<ChildStdout>>, LinesStream<BufReader<ChildStderr>>>;
pub type ProcessStream = ProcessLineStream;
impl From<Child> for ProcessLineStream {
fn from(mut child: Child) -> Self {
let stdout = child
.stdout
.take()
.map(|s| LinesStream::new(BufReader::new(s).lines()));
let stderr = child
.stderr
.take()
.map(|s| LinesStream::new(BufReader::new(s).lines()));
Self {
child: Some(child),
stdout,
stderr,
}
}
}
pub type ProcessChunkStream =
ChildStream<ReaderStream<BufReader<ChildStdout>>, ReaderStream<BufReader<ChildStderr>>>;
impl From<Child> for ProcessChunkStream {
fn from(mut child: Child) -> Self {
let stdout = child
.stdout
.take()
.map(|s| ReaderStream::new(BufReader::new(s)));
let stderr = child
.stderr
.take()
.map(|s| ReaderStream::new(BufReader::new(s)));
Self {
child: Some(child),
stdout,
stderr,
}
}
}