tokio_process_stream/lib.rs
1// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
2// This file is subject to the terms and conditions defined in
3// file 'LICENSE', which is part of this source code package.
4
5#![deny(future_incompatible)]
6#![deny(nonstandard_style)]
7#![deny(missing_docs)]
8#![deny(rustdoc::broken_intra_doc_links)]
9
10//! tokio-process-stream is a simple crate that wraps a [`tokio::process`] into a
11//! [`tokio::stream`]
12//!
13//! Having a stream interface to processes is useful when we have multiple sources of data that
14//! we want to merge and start processing from a single entry point.
15//!
16//! This crate provides a [`futures::stream::Stream`] wrapper for [`tokio::process::Child`]. The
17//! main struct is [`ProcessLineStream`], which implements the trait, yielding one [`Item`] enum
18//! at a time, each containing one line from either stdout ([`Item::Stdout`]) or stderr
19//! ([`Item::Stderr`]) of the underlying process until it exits. At this point, the stream
20//! yields a single [`Item::Done`] and finishes.
21//!
22//! Example usage:
23//!
24//! ```rust
25//! use tokio_process_stream::ProcessLineStream;
26//! use tokio::process::Command;
27//! use tokio_stream::StreamExt;
28//! use std::error::Error;
29//!
30//! #[tokio::main]
31//! async fn main() -> Result<(), Box<dyn Error>> {
32//! let mut sleep_cmd = Command::new("sleep");
33//! sleep_cmd.args(&["1"]);
34//! let ls_cmd = Command::new("ls");
35//!
36//! let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
37//! let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
38//! let mut procstream = sleep_procstream.merge(ls_procstream);
39//!
40//! while let Some(item) = procstream.next().await {
41//! println!("{:?}", item);
42//! }
43//!
44//! Ok(())
45//! }
46//! ```
47//!
48//! # Streaming chunks
49//!
50//! It is also possible to stream `Item<Bytes>` chunks with [`ProcessChunkStream`].
51//!
52//! ```rust
53//! use tokio_process_stream::{Item, ProcessChunkStream};
54//! use tokio::process::Command;
55//! use tokio_stream::StreamExt;
56//! use std::error::Error;
57//!
58//! #[tokio::main]
59//! async fn main() -> Result<(), Box<dyn Error>> {
60//! let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
61//! .arg("-c")
62//! .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
63//! .try_into()?;
64//!
65//! while let Some(item) = procstream.next().await {
66//! println!("{:?}", item);
67//! }
68//! Ok(())
69//! }
70//! ```
71
72use pin_project_lite::pin_project;
73use std::{
74 fmt,
75 future::Future,
76 io,
77 pin::Pin,
78 process::{ExitStatus, Stdio},
79 task::{Context, Poll},
80};
81use tokio::{
82 io::{AsyncBufReadExt, BufReader},
83 process::{Child, ChildStderr, ChildStdout, Command},
84};
85use tokio_stream::{Stream, wrappers::LinesStream};
86use tokio_util::io::ReaderStream;
87
88/// [`ProcessStream`] output.
89#[derive(Debug)]
90pub enum Item<Out> {
91 /// A stdout chunk printed by the process.
92 Stdout(Out),
93 /// A stderr chunk printed by the process.
94 Stderr(Out),
95 /// The [`ExitStatus`], yielded after the process exits.
96 Done(io::Result<ExitStatus>),
97}
98
99impl<T> Item<T>
100where
101 T: std::ops::Deref,
102{
103 /// Returns a [`Item::Stdout`] dereference, otherwise `None`.
104 pub fn stdout(&self) -> Option<&T::Target> {
105 match self {
106 Self::Stdout(s) => Some(s),
107 _ => None,
108 }
109 }
110
111 /// Returns a [`Item::Stderr`] dereference, otherwise `None`.
112 pub fn stderr(&self) -> Option<&T::Target> {
113 match self {
114 Self::Stderr(s) => Some(s),
115 _ => None,
116 }
117 }
118}
119
120impl<Out: fmt::Display> fmt::Display for Item<Out> {
121 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122 match self {
123 Item::Stdout(s) => fmt::Display::fmt(&s, f),
124 Item::Stderr(s) => fmt::Display::fmt(&s, f),
125 _ => Ok(()),
126 }
127 }
128}
129
130pin_project! {
131/// The main tokio-process-stream struct, which implements the
132/// [`Stream`](tokio_stream::Stream) trait
133#[derive(Debug)]
134pub struct ChildStream<Sout, Serr> {
135 child: Option<Child>,
136 stdout: Option<Sout>,
137 stderr: Option<Serr>,
138}
139}
140
141impl<Sout, Serr> ChildStream<Sout, Serr> {
142 /// Return a reference to the child object
143 pub fn child(&self) -> Option<&Child> {
144 self.child.as_ref()
145 }
146
147 /// Return a mutable reference to the child object
148 pub fn child_mut(&mut self) -> Option<&mut Child> {
149 self.child.as_mut()
150 }
151}
152
153impl<Sout, Serr> TryFrom<Command> for ChildStream<Sout, Serr>
154where
155 ChildStream<Sout, Serr>: From<Child>,
156{
157 type Error = io::Error;
158 fn try_from(mut command: Command) -> io::Result<Self> {
159 Self::try_from(&mut command)
160 }
161}
162
163impl<Sout, Serr> TryFrom<&mut Command> for ChildStream<Sout, Serr>
164where
165 ChildStream<Sout, Serr>: From<Child>,
166{
167 type Error = io::Error;
168 fn try_from(command: &mut Command) -> io::Result<Self> {
169 Ok(command
170 .stdout(Stdio::piped())
171 .stderr(Stdio::piped())
172 .spawn()?
173 .into())
174 }
175}
176
177impl<T, Sout, Serr> Stream for ChildStream<Sout, Serr>
178where
179 Sout: Stream<Item = io::Result<T>> + std::marker::Unpin,
180 Serr: Stream<Item = io::Result<T>> + std::marker::Unpin,
181{
182 type Item = Item<T>;
183
184 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 if self.child.is_none() {
186 // Keep returning None after we are done and everything is dropped
187 return Poll::Ready(None);
188 }
189 let this = self.project();
190 if let Some(stderr) = this.stderr {
191 match Pin::new(stderr).poll_next(cx) {
192 Poll::Ready(Some(line)) => {
193 return Poll::Ready(Some(Item::Stderr(line.unwrap())));
194 }
195 Poll::Ready(None) => {
196 *this.stderr = None;
197 }
198 Poll::Pending => {}
199 }
200 }
201 if let Some(stdout) = this.stdout {
202 match Pin::new(stdout).poll_next(cx) {
203 Poll::Ready(Some(line)) => {
204 return Poll::Ready(Some(Item::Stdout(line.unwrap())));
205 }
206 Poll::Ready(None) => {
207 *this.stdout = None;
208 }
209 Poll::Pending => {}
210 }
211 }
212 if this.stdout.is_none() && this.stderr.is_none() {
213 // Streams closed, all that is left is waiting for the child to exit:
214 if let Some(mut child) = std::mem::take(&mut *this.child) {
215 if let Poll::Ready(sts) = Pin::new(&mut Box::pin(child.wait())).poll(cx) {
216 return Poll::Ready(Some(Item::Done(sts)));
217 }
218 // Sometimes the process can close stdout+stderr before it's ready to be
219 // 'wait'ed. To handle that, we put child back in this:
220 *this.child = Some(child);
221 }
222 }
223 Poll::Pending
224 }
225}
226
227/// [`ChildStream`] that produces lines.
228pub type ProcessLineStream =
229 ChildStream<LinesStream<BufReader<ChildStdout>>, LinesStream<BufReader<ChildStderr>>>;
230
231/// Alias for [`ProcessLineStream`].
232pub type ProcessStream = ProcessLineStream;
233
234impl From<Child> for ProcessLineStream {
235 fn from(mut child: Child) -> Self {
236 let stdout = child
237 .stdout
238 .take()
239 .map(|s| LinesStream::new(BufReader::new(s).lines()));
240 let stderr = child
241 .stderr
242 .take()
243 .map(|s| LinesStream::new(BufReader::new(s).lines()));
244 Self {
245 child: Some(child),
246 stdout,
247 stderr,
248 }
249 }
250}
251
252/// [`ChildStream`] that produces chunks that may part of a line or multiple lines.
253///
254/// # Example
255/// ```
256/// use tokio_process_stream::{Item, ProcessChunkStream};
257/// use tokio::process::Command;
258/// use tokio_stream::StreamExt;
259///
260/// # #[tokio::main]
261/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
262/// // Example of a process that prints onto a single line using '\r'.
263/// let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
264/// .arg("-c")
265/// .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
266/// .try_into()?;
267///
268/// assert_eq!(
269/// procstream.next().await.as_ref().and_then(|n| n.stdout()),
270/// Some(b"1/2" as _)
271/// );
272/// assert_eq!(
273/// procstream.next().await.as_ref().and_then(|n| n.stdout()),
274/// Some(b"\r2/2 done\n" as _)
275/// );
276/// assert!(matches!(procstream.next().await, Some(Item::Done(_))));
277/// # Ok(()) }
278/// ```
279pub type ProcessChunkStream =
280 ChildStream<ReaderStream<BufReader<ChildStdout>>, ReaderStream<BufReader<ChildStderr>>>;
281
282impl From<Child> for ProcessChunkStream {
283 fn from(mut child: Child) -> Self {
284 let stdout = child
285 .stdout
286 .take()
287 .map(|s| ReaderStream::new(BufReader::new(s)));
288 let stderr = child
289 .stderr
290 .take()
291 .map(|s| ReaderStream::new(BufReader::new(s)));
292 Self {
293 child: Some(child),
294 stdout,
295 stderr,
296 }
297 }
298}