tokio_process_stream/
lib.rs

1// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
2// This file is subject to the terms and conditions defined in
3// file 'LICENSE', which is part of this source code package.
4
5#![deny(future_incompatible)]
6#![deny(nonstandard_style)]
7#![deny(missing_docs)]
8#![deny(rustdoc::broken_intra_doc_links)]
9
10//! tokio-process-stream is a simple crate that wraps a [`tokio::process`] into a
11//! [`tokio::stream`]
12//!
13//! Having a stream interface to processes is useful when we have multiple sources of data that
14//! we want to merge and start processing from a single entry point.
15//!
16//! This crate provides a [`futures::stream::Stream`] wrapper for [`tokio::process::Child`]. The
17//! main struct is [`ProcessLineStream`], which implements the trait, yielding one [`Item`] enum
18//! at a time, each containing one line from either stdout ([`Item::Stdout`]) or stderr
19//! ([`Item::Stderr`]) of the underlying process until it exits. At this point, the stream
20//! yields a single [`Item::Done`] and finishes.
21//!
22//! Example usage:
23//!
24//! ```rust
25//! use tokio_process_stream::ProcessLineStream;
26//! use tokio::process::Command;
27//! use tokio_stream::StreamExt;
28//! use std::error::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Box<dyn Error>> {
32//!     let mut sleep_cmd = Command::new("sleep");
33//!     sleep_cmd.args(&["1"]);
34//!     let ls_cmd = Command::new("ls");
35//!
36//!     let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
37//!     let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
38//!     let mut procstream = sleep_procstream.merge(ls_procstream);
39//!
40//!     while let Some(item) = procstream.next().await {
41//!         println!("{:?}", item);
42//!     }
43//!
44//!     Ok(())
45//! }
46//! ```
47//!
48//! # Streaming chunks
49//!
50//! It is also possible to stream `Item<Bytes>` chunks with [`ProcessChunkStream`].
51//!
52//! ```rust
53//! use tokio_process_stream::{Item, ProcessChunkStream};
54//! use tokio::process::Command;
55//! use tokio_stream::StreamExt;
56//! use std::error::Error;
57//!
58//! #[tokio::main]
59//! async fn main() -> Result<(), Box<dyn Error>> {
60//!     let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
61//!         .arg("-c")
62//!         .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
63//!         .try_into()?;
64//!
65//!     while let Some(item) = procstream.next().await {
66//!         println!("{:?}", item);
67//!     }
68//!     Ok(())
69//! }
70//! ```
71
72use pin_project_lite::pin_project;
73use std::{
74    fmt,
75    future::Future,
76    io,
77    pin::Pin,
78    process::{ExitStatus, Stdio},
79    task::{Context, Poll},
80};
81use tokio::{
82    io::{AsyncBufReadExt, BufReader},
83    process::{Child, ChildStderr, ChildStdout, Command},
84};
85use tokio_stream::{Stream, wrappers::LinesStream};
86use tokio_util::io::ReaderStream;
87
88/// [`ProcessStream`] output.
89#[derive(Debug)]
90pub enum Item<Out> {
91    /// A stdout chunk printed by the process.
92    Stdout(Out),
93    /// A stderr chunk printed by the process.
94    Stderr(Out),
95    /// The [`ExitStatus`], yielded after the process exits.
96    Done(io::Result<ExitStatus>),
97}
98
99impl<T> Item<T>
100where
101    T: std::ops::Deref,
102{
103    /// Returns a [`Item::Stdout`] dereference, otherwise `None`.
104    pub fn stdout(&self) -> Option<&T::Target> {
105        match self {
106            Self::Stdout(s) => Some(s),
107            _ => None,
108        }
109    }
110
111    /// Returns a [`Item::Stderr`] dereference, otherwise `None`.
112    pub fn stderr(&self) -> Option<&T::Target> {
113        match self {
114            Self::Stderr(s) => Some(s),
115            _ => None,
116        }
117    }
118}
119
120impl<Out: fmt::Display> fmt::Display for Item<Out> {
121    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122        match self {
123            Item::Stdout(s) => fmt::Display::fmt(&s, f),
124            Item::Stderr(s) => fmt::Display::fmt(&s, f),
125            _ => Ok(()),
126        }
127    }
128}
129
130pin_project! {
131/// The main tokio-process-stream struct, which implements the
132/// [`Stream`](tokio_stream::Stream) trait
133#[derive(Debug)]
134pub struct ChildStream<Sout, Serr> {
135    child: Option<Child>,
136    stdout: Option<Sout>,
137    stderr: Option<Serr>,
138}
139}
140
141impl<Sout, Serr> ChildStream<Sout, Serr> {
142    /// Return a reference to the child object
143    pub fn child(&self) -> Option<&Child> {
144        self.child.as_ref()
145    }
146
147    /// Return a mutable reference to the child object
148    pub fn child_mut(&mut self) -> Option<&mut Child> {
149        self.child.as_mut()
150    }
151}
152
153impl<Sout, Serr> TryFrom<Command> for ChildStream<Sout, Serr>
154where
155    ChildStream<Sout, Serr>: From<Child>,
156{
157    type Error = io::Error;
158    fn try_from(mut command: Command) -> io::Result<Self> {
159        Self::try_from(&mut command)
160    }
161}
162
163impl<Sout, Serr> TryFrom<&mut Command> for ChildStream<Sout, Serr>
164where
165    ChildStream<Sout, Serr>: From<Child>,
166{
167    type Error = io::Error;
168    fn try_from(command: &mut Command) -> io::Result<Self> {
169        Ok(command
170            .stdout(Stdio::piped())
171            .stderr(Stdio::piped())
172            .spawn()?
173            .into())
174    }
175}
176
177impl<T, Sout, Serr> Stream for ChildStream<Sout, Serr>
178where
179    Sout: Stream<Item = io::Result<T>> + std::marker::Unpin,
180    Serr: Stream<Item = io::Result<T>> + std::marker::Unpin,
181{
182    type Item = Item<T>;
183
184    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        if self.child.is_none() {
186            // Keep returning None after we are done and everything is dropped
187            return Poll::Ready(None);
188        }
189        let this = self.project();
190        if let Some(stderr) = this.stderr {
191            match Pin::new(stderr).poll_next(cx) {
192                Poll::Ready(Some(line)) => {
193                    return Poll::Ready(Some(Item::Stderr(line.unwrap())));
194                }
195                Poll::Ready(None) => {
196                    *this.stderr = None;
197                }
198                Poll::Pending => {}
199            }
200        }
201        if let Some(stdout) = this.stdout {
202            match Pin::new(stdout).poll_next(cx) {
203                Poll::Ready(Some(line)) => {
204                    return Poll::Ready(Some(Item::Stdout(line.unwrap())));
205                }
206                Poll::Ready(None) => {
207                    *this.stdout = None;
208                }
209                Poll::Pending => {}
210            }
211        }
212        if this.stdout.is_none() && this.stderr.is_none() {
213            // Streams closed, all that is left is waiting for the child to exit:
214            if let Some(mut child) = std::mem::take(&mut *this.child) {
215                if let Poll::Ready(sts) = Pin::new(&mut Box::pin(child.wait())).poll(cx) {
216                    return Poll::Ready(Some(Item::Done(sts)));
217                }
218                // Sometimes the process can close stdout+stderr before it's ready to be
219                // 'wait'ed. To handle that, we put child back in this:
220                *this.child = Some(child);
221            }
222        }
223        Poll::Pending
224    }
225}
226
227/// [`ChildStream`] that produces lines.
228pub type ProcessLineStream =
229    ChildStream<LinesStream<BufReader<ChildStdout>>, LinesStream<BufReader<ChildStderr>>>;
230
231/// Alias for [`ProcessLineStream`].
232pub type ProcessStream = ProcessLineStream;
233
234impl From<Child> for ProcessLineStream {
235    fn from(mut child: Child) -> Self {
236        let stdout = child
237            .stdout
238            .take()
239            .map(|s| LinesStream::new(BufReader::new(s).lines()));
240        let stderr = child
241            .stderr
242            .take()
243            .map(|s| LinesStream::new(BufReader::new(s).lines()));
244        Self {
245            child: Some(child),
246            stdout,
247            stderr,
248        }
249    }
250}
251
252/// [`ChildStream`] that produces chunks that may part of a line or multiple lines.
253///
254/// # Example
255/// ```
256/// use tokio_process_stream::{Item, ProcessChunkStream};
257/// use tokio::process::Command;
258/// use tokio_stream::StreamExt;
259///
260/// # #[tokio::main]
261/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
262/// // Example of a process that prints onto a single line using '\r'.
263/// let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
264///     .arg("-c")
265///     .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
266///     .try_into()?;
267///
268/// assert_eq!(
269///     procstream.next().await.as_ref().and_then(|n| n.stdout()),
270///     Some(b"1/2" as _)
271/// );
272/// assert_eq!(
273///     procstream.next().await.as_ref().and_then(|n| n.stdout()),
274///     Some(b"\r2/2 done\n" as _)
275/// );
276/// assert!(matches!(procstream.next().await, Some(Item::Done(_))));
277/// # Ok(()) }
278/// ```
279pub type ProcessChunkStream =
280    ChildStream<ReaderStream<BufReader<ChildStdout>>, ReaderStream<BufReader<ChildStderr>>>;
281
282impl From<Child> for ProcessChunkStream {
283    fn from(mut child: Child) -> Self {
284        let stdout = child
285            .stdout
286            .take()
287            .map(|s| ReaderStream::new(BufReader::new(s)));
288        let stderr = child
289            .stderr
290            .take()
291            .map(|s| ReaderStream::new(BufReader::new(s)));
292        Self {
293            child: Some(child),
294            stdout,
295            stderr,
296        }
297    }
298}