grep_cli/
process.rs

1use std::{
2    io::{self, Read},
3    process,
4};
5
6/// An error that can occur while running a command and reading its output.
7///
8/// This error can be seamlessly converted to an `io::Error` via a `From`
9/// implementation.
10#[derive(Debug)]
11pub struct CommandError {
12    kind: CommandErrorKind,
13}
14
15#[derive(Debug)]
16enum CommandErrorKind {
17    Io(io::Error),
18    Stderr(Vec<u8>),
19}
20
21impl CommandError {
22    /// Create an error from an I/O error.
23    pub(crate) fn io(ioerr: io::Error) -> CommandError {
24        CommandError { kind: CommandErrorKind::Io(ioerr) }
25    }
26
27    /// Create an error from the contents of stderr (which may be empty).
28    pub(crate) fn stderr(bytes: Vec<u8>) -> CommandError {
29        CommandError { kind: CommandErrorKind::Stderr(bytes) }
30    }
31
32    /// Returns true if and only if this error has empty data from stderr.
33    pub(crate) fn is_empty(&self) -> bool {
34        match self.kind {
35            CommandErrorKind::Stderr(ref bytes) => bytes.is_empty(),
36            _ => false,
37        }
38    }
39}
40
41impl std::error::Error for CommandError {}
42
43impl std::fmt::Display for CommandError {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self.kind {
46            CommandErrorKind::Io(ref e) => e.fmt(f),
47            CommandErrorKind::Stderr(ref bytes) => {
48                let msg = String::from_utf8_lossy(bytes);
49                if msg.trim().is_empty() {
50                    write!(f, "<stderr is empty>")
51                } else {
52                    let div = "-".repeat(79);
53                    write!(
54                        f,
55                        "\n{div}\n{msg}\n{div}",
56                        div = div,
57                        msg = msg.trim()
58                    )
59                }
60            }
61        }
62    }
63}
64
65impl From<io::Error> for CommandError {
66    fn from(ioerr: io::Error) -> CommandError {
67        CommandError { kind: CommandErrorKind::Io(ioerr) }
68    }
69}
70
71impl From<CommandError> for io::Error {
72    fn from(cmderr: CommandError) -> io::Error {
73        match cmderr.kind {
74            CommandErrorKind::Io(ioerr) => ioerr,
75            CommandErrorKind::Stderr(_) => {
76                io::Error::new(io::ErrorKind::Other, cmderr)
77            }
78        }
79    }
80}
81
82/// Configures and builds a streaming reader for process output.
83#[derive(Clone, Debug, Default)]
84pub struct CommandReaderBuilder {
85    async_stderr: bool,
86}
87
88impl CommandReaderBuilder {
89    /// Create a new builder with the default configuration.
90    pub fn new() -> CommandReaderBuilder {
91        CommandReaderBuilder::default()
92    }
93
94    /// Build a new streaming reader for the given command's output.
95    ///
96    /// The caller should set everything that's required on the given command
97    /// before building a reader, such as its arguments, environment and
98    /// current working directory. Settings such as the stdout and stderr (but
99    /// not stdin) pipes will be overridden so that they can be controlled by
100    /// the reader.
101    ///
102    /// If there was a problem spawning the given command, then its error is
103    /// returned.
104    pub fn build(
105        &self,
106        command: &mut process::Command,
107    ) -> Result<CommandReader, CommandError> {
108        let mut child = command
109            .stdout(process::Stdio::piped())
110            .stderr(process::Stdio::piped())
111            .spawn()?;
112        let stderr = if self.async_stderr {
113            StderrReader::r#async(child.stderr.take().unwrap())
114        } else {
115            StderrReader::sync(child.stderr.take().unwrap())
116        };
117        Ok(CommandReader { child, stderr, eof: false })
118    }
119
120    /// When enabled, the reader will asynchronously read the contents of the
121    /// command's stderr output. When disabled, stderr is only read after the
122    /// stdout stream has been exhausted (or if the process quits with an error
123    /// code).
124    ///
125    /// Note that when enabled, this may require launching an additional
126    /// thread in order to read stderr. This is done so that the process being
127    /// executed is never blocked from writing to stdout or stderr. If this is
128    /// disabled, then it is possible for the process to fill up the stderr
129    /// buffer and deadlock.
130    ///
131    /// This is enabled by default.
132    pub fn async_stderr(&mut self, yes: bool) -> &mut CommandReaderBuilder {
133        self.async_stderr = yes;
134        self
135    }
136}
137
138/// A streaming reader for a command's output.
139///
140/// The purpose of this reader is to provide an easy way to execute processes
141/// whose stdout is read in a streaming way while also making the processes'
142/// stderr available when the process fails with an exit code. This makes it
143/// possible to execute processes while surfacing the underlying failure mode
144/// in the case of an error.
145///
146/// Moreover, by default, this reader will asynchronously read the processes'
147/// stderr. This prevents subtle deadlocking bugs for noisy processes that
148/// write a lot to stderr. Currently, the entire contents of stderr is read
149/// on to the heap.
150///
151/// # Example
152///
153/// This example shows how to invoke `gzip` to decompress the contents of a
154/// file. If the `gzip` command reports a failing exit status, then its stderr
155/// is returned as an error.
156///
157/// ```no_run
158/// use std::{io::Read, process::Command};
159///
160/// use grep_cli::CommandReader;
161///
162/// let mut cmd = Command::new("gzip");
163/// cmd.arg("-d").arg("-c").arg("/usr/share/man/man1/ls.1.gz");
164///
165/// let mut rdr = CommandReader::new(&mut cmd)?;
166/// let mut contents = vec![];
167/// rdr.read_to_end(&mut contents)?;
168/// # Ok::<(), Box<dyn std::error::Error>>(())
169/// ```
170#[derive(Debug)]
171pub struct CommandReader {
172    child: process::Child,
173    stderr: StderrReader,
174    /// This is set to true once 'read' returns zero bytes. When this isn't
175    /// set and we close the reader, then we anticipate a pipe error when
176    /// reaping the child process and silence it.
177    eof: bool,
178}
179
180impl CommandReader {
181    /// Create a new streaming reader for the given command using the default
182    /// configuration.
183    ///
184    /// The caller should set everything that's required on the given command
185    /// before building a reader, such as its arguments, environment and
186    /// current working directory. Settings such as the stdout and stderr (but
187    /// not stdin) pipes will be overridden so that they can be controlled by
188    /// the reader.
189    ///
190    /// If there was a problem spawning the given command, then its error is
191    /// returned.
192    ///
193    /// If the caller requires additional configuration for the reader
194    /// returned, then use [`CommandReaderBuilder`].
195    pub fn new(
196        cmd: &mut process::Command,
197    ) -> Result<CommandReader, CommandError> {
198        CommandReaderBuilder::new().build(cmd)
199    }
200
201    /// Closes the CommandReader, freeing any resources used by its underlying
202    /// child process. If the child process exits with a nonzero exit code, the
203    /// returned Err value will include its stderr.
204    ///
205    /// `close` is idempotent, meaning it can be safely called multiple times.
206    /// The first call closes the CommandReader and any subsequent calls do
207    /// nothing.
208    ///
209    /// This method should be called after partially reading a file to prevent
210    /// resource leakage. However there is no need to call `close` explicitly
211    /// if your code always calls `read` to EOF, as `read` takes care of
212    /// calling `close` in this case.
213    ///
214    /// `close` is also called in `drop` as a last line of defense against
215    /// resource leakage. Any error from the child process is then printed as a
216    /// warning to stderr. This can be avoided by explicitly calling `close`
217    /// before the CommandReader is dropped.
218    pub fn close(&mut self) -> io::Result<()> {
219        // Dropping stdout closes the underlying file descriptor, which should
220        // cause a well-behaved child process to exit. If child.stdout is None
221        // we assume that close() has already been called and do nothing.
222        let stdout = match self.child.stdout.take() {
223            None => return Ok(()),
224            Some(stdout) => stdout,
225        };
226        drop(stdout);
227        if self.child.wait()?.success() {
228            Ok(())
229        } else {
230            let err = self.stderr.read_to_end();
231            // In the specific case where we haven't consumed the full data
232            // from the child process, then closing stdout above results in
233            // a pipe signal being thrown in most cases. But I don't think
234            // there is any reliable and portable way of detecting it. Instead,
235            // if we know we haven't hit EOF (so we anticipate a broken pipe
236            // error) and if stderr otherwise doesn't have anything on it, then
237            // we assume total success.
238            if !self.eof && err.is_empty() {
239                return Ok(());
240            }
241            Err(io::Error::from(err))
242        }
243    }
244}
245
246impl Drop for CommandReader {
247    fn drop(&mut self) {
248        if let Err(error) = self.close() {
249            log::warn!("{}", error);
250        }
251    }
252}
253
254impl io::Read for CommandReader {
255    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
256        let stdout = match self.child.stdout {
257            None => return Ok(0),
258            Some(ref mut stdout) => stdout,
259        };
260        let nread = stdout.read(buf)?;
261        if nread == 0 {
262            self.eof = true;
263            self.close().map(|_| 0)
264        } else {
265            Ok(nread)
266        }
267    }
268}
269
270/// A reader that encapsulates the asynchronous or synchronous reading of
271/// stderr.
272#[derive(Debug)]
273enum StderrReader {
274    Async(Option<std::thread::JoinHandle<CommandError>>),
275    Sync(process::ChildStderr),
276}
277
278impl StderrReader {
279    /// Create a reader for stderr that reads contents asynchronously.
280    fn r#async(mut stderr: process::ChildStderr) -> StderrReader {
281        let handle =
282            std::thread::spawn(move || stderr_to_command_error(&mut stderr));
283        StderrReader::Async(Some(handle))
284    }
285
286    /// Create a reader for stderr that reads contents synchronously.
287    fn sync(stderr: process::ChildStderr) -> StderrReader {
288        StderrReader::Sync(stderr)
289    }
290
291    /// Consumes all of stderr on to the heap and returns it as an error.
292    ///
293    /// If there was a problem reading stderr itself, then this returns an I/O
294    /// command error.
295    fn read_to_end(&mut self) -> CommandError {
296        match *self {
297            StderrReader::Async(ref mut handle) => {
298                let handle = handle
299                    .take()
300                    .expect("read_to_end cannot be called more than once");
301                handle.join().expect("stderr reading thread does not panic")
302            }
303            StderrReader::Sync(ref mut stderr) => {
304                stderr_to_command_error(stderr)
305            }
306        }
307    }
308}
309
310fn stderr_to_command_error(stderr: &mut process::ChildStderr) -> CommandError {
311    let mut bytes = vec![];
312    match stderr.read_to_end(&mut bytes) {
313        Ok(_) => CommandError::stderr(bytes),
314        Err(err) => CommandError::io(err),
315    }
316}