subprocess 1.0.3

Execution and control of child processes and pipelines.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
use std::ffi::OsString;
use std::fmt;
use std::fs::File;
use std::io::{self, Read, Write};
use std::ops::BitOr;
use std::path::Path;
use std::sync::Arc;

#[cfg(unix)]
mod os {
    #[derive(Clone, Default)]
    pub struct PipelineOsOptions {
        pub setpgid: bool,
    }
}

#[cfg(windows)]
mod os {
    #[derive(Clone, Default)]
    pub struct PipelineOsOptions;
}

use crate::communicate::Communicator;
use crate::exec::Redirection;
use crate::process::ExitStatus;
use crate::process::Process;

use crate::exec::{
    Capture, Exec, FromSink, FromSource, InputData, InputRedirection, ReadAdapter, ReadErrAdapter,
    WriteAdapter,
};
use crate::job::Job;

/// A builder for pipelines of subprocesses connected via pipes.
///
/// A pipeline is a sequence of two or more [`Exec`] commands connected via pipes.  Just
/// like in a Unix shell pipeline, each command receives standard input from the previous
/// command, and passes standard output to the next command.  Optionally, the standard
/// input of the first command can be provided from the outside, and the output of the
/// last command can be captured.
///
/// In most cases you do not need to create [`Pipeline`] instances directly; instead,
/// combine [`Exec`] instances using the `|` operator which produces `Pipeline`.
///
/// # Examples
///
/// Execute a pipeline and return the exit status of the last command:
///
/// ```no_run
/// # use subprocess::*;
/// # fn dummy() -> std::io::Result<()> {
/// let exit_status =
///   (Exec::shell("ls *.bak") | Exec::cmd("xargs").arg("rm")).join()?;
/// # Ok(())
/// # }
/// ```
///
/// Capture the pipeline's output:
///
/// ```no_run
/// # use subprocess::*;
/// # fn dummy() -> std::io::Result<()> {
/// let dir_checksum = {
///     Exec::shell("find . -type f") | Exec::cmd("sort") | Exec::cmd("sha1sum")
/// }.capture()?.stdout_str();
/// # Ok(())
/// # }
/// ```
#[must_use]
pub struct Pipeline {
    execs: Vec<Exec>,
    stdin_redirect: Arc<Redirection>,
    stdout: Arc<Redirection>,
    stderr: Arc<Redirection>,
    stdin_data: Option<InputData>,
    check_success: bool,
    detached: bool,
    cwd: Option<OsString>,
    #[allow(dead_code)]
    os_options: os::PipelineOsOptions,
}

impl Default for Pipeline {
    fn default() -> Pipeline {
        Pipeline::new()
    }
}

impl Pipeline {
    /// Creates a new empty pipeline.
    ///
    /// Use [`pipe`](Self::pipe) to add commands to the pipeline, or the `|` operator
    /// to combine `Exec` instances.
    ///
    /// An empty pipeline's `join()` returns success and `capture()` returns empty
    /// output. A single-command pipeline behaves like the command run on its own.
    pub fn new() -> Pipeline {
        Pipeline {
            execs: vec![],
            stdin_redirect: Arc::new(Redirection::None),
            stdout: Arc::new(Redirection::None),
            stderr: Arc::new(Redirection::None),
            stdin_data: None,
            check_success: false,
            detached: false,
            cwd: None,
            os_options: Default::default(),
        }
    }

    /// Appends a command to the pipeline.
    ///
    /// This is the builder-style equivalent of the `|` operator.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use subprocess::*;
    /// # fn dummy() -> std::io::Result<()> {
    /// let output = Pipeline::new()
    ///     .pipe(Exec::cmd("echo").arg("hello world"))
    ///     .pipe(Exec::cmd("wc").arg("-w"))
    ///     .capture()?
    ///     .stdout_str();
    /// # Ok(())
    /// # }
    /// ```
    pub fn pipe(mut self, cmd: Exec) -> Pipeline {
        self.execs.push(cmd);
        self
    }

    /// Specifies the source for the standard input of the first command in
    /// the pipeline.
    ///
    /// The source can be:
    ///
    /// * a [`Redirection`];
    /// * a `File`, which is a shorthand for `Redirection::File(file)`;
    /// * a `Vec<u8>`, `&str`, `&[u8]`, `Box<[u8]>`, or `[u8; N]`, which will set up a
    ///   `Redirection::Pipe` for stdin, feeding that data into the standard input of the
    ///   subprocess;
    /// * an [`InputData`], which also sets up a pipe, but wraps any reader and feeds its
    ///   content to the standard input of the subprocess. Use [`InputData::from_bytes`]
    ///   for in-memory byte containers not covered by the above, like `bytes::Bytes` or
    ///   `memmap2::Mmap`. Use [`InputData::from_reader`] for a custom `Read` that
    ///   generates or transforms data.
    ///
    /// If the child exits before consuming all input, the `BrokenPipe` error is silently
    /// ignored. Use the exit status and output to check if the child processed the input
    /// correctly.
    ///
    /// [`Redirection`]: enum.Redirection.html
    /// [`InputData`]: struct.InputData.html
    pub fn stdin<T>(mut self, stdin: T) -> Pipeline
    where
        InputRedirection: FromSource<T>,
    {
        match InputRedirection::from_source(stdin) {
            InputRedirection::Redirection(r) => {
                self.stdin_redirect = Arc::new(r);
                self.stdin_data = None;
            }
            InputRedirection::Data(data) => {
                self.stdin_redirect = Arc::new(Redirection::Pipe);
                self.stdin_data = Some(data);
            }
        };
        self
    }

    /// Specifies the sink for the standard output of the last command in the pipeline.
    ///
    /// The sink can be:
    ///
    /// * a [`Redirection`];
    /// * a `File`, which is a shorthand for `Redirection::File(file)`.
    ///
    /// [`Redirection`]: enum.Redirection.html
    pub fn stdout<T>(mut self, stdout: T) -> Pipeline
    where
        Redirection: FromSink<T>,
    {
        self.stdout = Arc::new(Redirection::from_sink(stdout));
        self
    }

    /// Specifies the sink for the standard error of all commands in the pipeline.
    ///
    /// Unlike `stdout()`, which only affects the last command in the pipeline, this
    /// affects all commands.  The difference is because standard output is piped from one
    /// command to the next, so only the output of the last command is "free".  In
    /// contrast, the standard errors are not connected to each other and can be
    /// configured *en masse*.
    ///
    /// The sink can be:
    ///
    /// * a [`Redirection`];
    /// * a `File`, which is a shorthand for `Redirection::File(file)`.
    ///
    /// All `Redirection` variants are meaningful:
    ///
    /// * `Redirection::None` - inherit from the parent (the default)
    /// * `Redirection::Pipe` - funnel stderr of all commands into stderr obtained
    ///   with `capture()` or `communicate()`
    /// * `Redirection::Merge` - redirect stderr to stdout, like `2>&1` for each
    ///   command
    /// * `Redirection::File(f)` - redirect to a file
    /// * `Redirection::Null` - suppress stderr
    ///
    /// Note that this differs from the shell's `cmd1 | cmd2 2>file`, which only
    /// redirects stderr of the last command.  This method is equivalent to `(cmd1 |
    /// cmd2) 2>file`, but without the overhead of a subshell.
    ///
    /// If you pass `Redirection::Pipe`, the shared stderr read end
    /// will be available via [`Job::stderr`].
    ///
    /// [`Redirection`]: enum.Redirection.html
    pub fn stderr_all<T>(mut self, stderr: T) -> Pipeline
    where
        Redirection: FromSink<T>,
    {
        self.stderr = Arc::new(Redirection::from_sink(stderr));
        self
    }

    /// If called, [`join`](Self::join) and [`capture`](Self::capture) will return
    /// an error if the last command in the pipeline exits with a non-zero status.
    pub fn checked(mut self) -> Pipeline {
        self.check_success = true;
        self
    }

    /// Specifies the current working directory for all commands in the pipeline.
    ///
    /// If unspecified, the current working directory is inherited from the parent.
    pub fn cwd(mut self, dir: impl AsRef<Path>) -> Pipeline {
        self.cwd = Some(dir.as_ref().as_os_str().to_owned());
        self
    }

    /// Specifies that the pipeline processes are initially detached.
    ///
    /// A detached pipeline means that we will not wait for the processes to finish when
    /// the objects that own them go out of scope.
    pub fn detached(mut self) -> Pipeline {
        self.detached = true;
        self
    }

    #[cfg(unix)]
    pub(crate) fn set_setpgid(&mut self, value: bool) {
        self.os_options.setpgid = value;
    }

    fn check_no_stdin_data(&self, meth: &str) {
        if self.stdin_data.is_some() {
            panic!("{} called with input data specified", meth);
        }
    }

    /// Convert pipeline-level stderr redirection into a per-command form, applying it
    /// to all commands. Returns the read end of the pipe if stderr was set to Pipe.
    fn setup_stderr(&mut self) -> io::Result<Option<File>> {
        let stderr_arc = std::mem::replace(&mut self.stderr, Arc::new(Redirection::None));
        if matches!(*stderr_arc, Redirection::None) {
            return Ok(None);
        }

        // For Pipe, create a pipe and distribute the write end as a File
        // redirection. For everything else, use the redirection as-is.
        // Either way, share the same error redirection across all commands.
        let (shared, stderr_read) = if matches!(*stderr_arc, Redirection::Pipe) {
            let (stderr_read, stderr_write) = crate::spawn::make_pipe()?;
            (Arc::new(Redirection::File(stderr_write)), Some(stderr_read))
        } else {
            (stderr_arc, None)
        };
        for exec in &mut self.execs {
            exec.stderr_redirect = Arc::clone(&shared);
        }
        Ok(stderr_read)
    }

    // Terminators:

    /// Starts all commands in the pipeline and returns a [`Job`] with the running
    /// processes and their pipe ends.
    ///
    /// If some command fails to start, the remaining commands will not be started, and
    /// the appropriate error will be returned.  The commands that have already started
    /// will be waited to finish (but will probably exit immediately due to missing
    /// output), except for the ones for which `detached()` was called.  This is
    /// equivalent to what the shell does.
    pub fn start(mut self) -> io::Result<Job> {
        if self.execs.is_empty() {
            return Ok(Job {
                stdin: None,
                stdout: None,
                stderr: None,
                stdin_data: InputData::default(),
                check_success: self.check_success,
                processes: vec![],
            });
        }

        if self.execs.first().unwrap().stdin_is_set() {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "stdin of the first command is already redirected; \
                 use Pipeline::stdin() to redirect pipeline input",
            ));
        }
        if self.execs.last().unwrap().stdout_is_set() {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "stdout of the last command is already redirected; \
                 use Pipeline::stdout() to redirect pipeline output",
            ));
        }

        #[cfg(unix)]
        if self.execs.iter().any(|e| e.setpgid_is_set()) {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "setpgid on individual commands in a pipeline is not \
                 supported; use Pipeline::setpgid() to put the pipeline \
                 in a process group",
            ));
        }

        let stderr = self.setup_stderr()?;

        if let Some(dir) = &self.cwd {
            self.execs = self.execs.into_iter().map(|cmd| cmd.cwd(dir)).collect();
        }
        if self.detached {
            self.execs = self.execs.into_iter().map(|cmd| cmd.detached()).collect();
        }

        self.execs.first_mut().unwrap().stdin_redirect = self.stdin_redirect;

        self.execs.last_mut().unwrap().stdout_redirect = self.stdout;

        let cnt = self.execs.len();
        let mut processes = Vec::<Process>::new();
        let mut pipeline_stdin = None;
        let mut pipeline_stdout = None;
        let mut prev_stdout: Option<File> = None;
        #[cfg(unix)]
        let mut first_pid: u32 = 0;

        for (idx, mut exec) in self.execs.into_iter().enumerate() {
            if let Some(prev_out) = prev_stdout.take() {
                exec = exec.stdin(prev_out);
            }
            if idx != cnt - 1 {
                exec = exec.stdout(Redirection::Pipe);
            }
            #[cfg(unix)]
            if self.os_options.setpgid {
                // spawn() uses an exec-fail pipe, so it blocks until the child has called
                // setpgid and exec'd. By the time we fork the second child, the first
                // child's group already exists.
                if idx == 0 {
                    exec.set_pgid_value(0);
                } else {
                    exec.set_pgid_value(first_pid);
                }
            }
            let result = exec.spawn()?;
            if idx == 0 {
                pipeline_stdin = result.stdin;
                #[cfg(unix)]
                if self.os_options.setpgid {
                    first_pid = result.process.pid();
                }
            }
            if idx == cnt - 1 {
                pipeline_stdout = result.stdout;
            } else {
                prev_stdout = result.stdout;
            }
            processes.push(result.process);
        }

        Ok(Job {
            stdin: pipeline_stdin,
            stdout: pipeline_stdout,
            stderr,
            stdin_data: self.stdin_data.unwrap_or_default(),
            check_success: self.check_success,
            processes,
        })
    }

    /// Starts the pipeline, waits for it to finish, and returns the exit status
    /// of the last command.
    pub fn join(self) -> io::Result<ExitStatus> {
        self.start()?.join()
    }

    /// Starts the pipeline and returns a value implementing the `Read` trait that reads
    /// from the standard output of the last command.
    ///
    /// This will automatically set up `stdout(Redirection::Pipe)`, so it is not necessary
    /// to do that beforehand.
    ///
    /// When the trait object is dropped, it will wait for the pipeline to finish.  If
    /// this is undesirable, use `detached()`.
    ///
    /// # Panics
    ///
    /// Panics if input data was specified with [`stdin`](Self::stdin).  Use
    /// [`capture`](Self::capture) or [`communicate`](Self::communicate) to both
    /// feed input and read output.
    pub fn stream_stdout(self) -> io::Result<impl Read> {
        self.check_no_stdin_data("stream_stdout");
        let handle = self.stdout(Redirection::Pipe).start()?;
        Ok(ReadAdapter(handle))
    }

    /// Starts the pipeline and returns a value implementing the `Read` trait that reads
    /// from the standard error of all commands in the pipeline.
    ///
    /// This will automatically set up `stderr_all(Redirection::Pipe)`, so it is not
    /// necessary to do that beforehand.
    ///
    /// Note that this redirects stderr of all commands in the pipeline, not just
    /// the last one.  This differs from the shell's `cmd1 | cmd2 2>file`, which
    /// only redirects stderr of the last command.  This method is equivalent to
    /// `(cmd1 | cmd2) 2>file`, but without the overhead of a subshell.
    ///
    /// When the trait object is dropped, it will wait for the pipeline to finish.  If
    /// this is undesirable, use `detached()`.
    ///
    /// # Panics
    ///
    /// Panics if input data was specified with [`stdin`](Self::stdin).  Use
    /// [`capture`](Self::capture) or [`communicate`](Self::communicate) to both
    /// feed input and read output.
    pub fn stream_stderr_all(self) -> io::Result<impl Read> {
        self.check_no_stdin_data("stream_stderr_all");
        let handle = self.stderr_all(Redirection::Pipe).start()?;
        Ok(ReadErrAdapter(handle))
    }

    /// Starts the pipeline and returns a value implementing the `Write` trait that writes
    /// to the standard input of the first command.
    ///
    /// This will automatically set up `stdin(Redirection::Pipe)`, so it is not necessary
    /// to do that beforehand.
    ///
    /// When the trait object is dropped, it will wait for the pipeline to finish.  If this
    /// is undesirable, use `detached()`.
    ///
    /// # Panics
    ///
    /// Panics if input data was specified with [`stdin`](Self::stdin).
    pub fn stream_stdin(self) -> io::Result<impl Write> {
        self.check_no_stdin_data("stream_stdin");
        let handle = self.stdin(Redirection::Pipe).start()?;
        Ok(WriteAdapter(handle))
    }

    /// Starts the pipeline and returns a `Communicator` handle.
    ///
    /// Unless already configured, stdout and stderr are redirected to pipes.  If you
    /// need different redirection (e.g. `stderr_all(Merge)`), set it up before
    /// calling this method and it will be preserved.
    ///
    /// Compared to `capture()`, this offers more choice in how communication is
    /// performed, such as read size limit and timeout.  Unlike `capture()`, this
    /// method doesn't wait for the pipeline to finish, effectively detaching it.
    pub fn communicate(mut self) -> io::Result<Communicator> {
        self = self.detached();
        if matches!(*self.stdout, Redirection::None) {
            self = self.stdout(Redirection::Pipe);
        }
        if matches!(*self.stderr, Redirection::None) {
            self = self.stderr_all(Redirection::Pipe);
        }
        self.start()?.communicate()
    }

    /// Starts the pipeline, collects its output, and waits for it to finish.
    ///
    /// The return value provides the standard output and standard error as bytes or
    /// optionally strings, as well as the exit status.
    ///
    /// Unless already configured, stdout and stderr are redirected to pipes so they
    /// can be captured. If you need different redirection (e.g. `stderr_all(Merge)`),
    /// set it up before calling this method and it will be preserved.
    ///
    /// This method waits for the pipeline to finish, rather than simply waiting for
    /// its standard streams to close.  If this is undesirable, use `detached()`.
    pub fn capture(mut self) -> io::Result<Capture> {
        if matches!(*self.stdout, Redirection::None) {
            self = self.stdout(Redirection::Pipe);
        }
        if matches!(*self.stderr, Redirection::None) {
            self = self.stderr_all(Redirection::Pipe);
        }
        self.start()?.capture()
    }
}

impl BitOr<Exec> for Pipeline {
    type Output = Pipeline;

    /// Append a command to the pipeline and return a new pipeline.
    fn bitor(self, rhs: Exec) -> Pipeline {
        self.pipe(rhs)
    }
}

impl BitOr for Pipeline {
    type Output = Pipeline;

    /// Append the commands from `rhs` to this pipeline.
    ///
    /// Other pipeline-level settings (cwd, stdout, etc.) from `rhs` are dropped -
    /// only its commands are taken.
    fn bitor(mut self, rhs: Pipeline) -> Pipeline {
        for exec in rhs.execs {
            self = self.pipe(exec);
        }
        self
    }
}

impl FromIterator<Exec> for Pipeline {
    /// Creates a pipeline from an iterator of commands.
    ///
    /// The iterator may yield any number of commands, including zero or one.
    /// An empty pipeline returns success on `join()` and empty output on
    /// `capture()`. A single-command pipeline behaves like running that
    /// command directly.
    ///
    /// # Example
    ///
    /// ```no_run
    /// use subprocess::{Exec, Pipeline};
    ///
    /// let commands = vec![
    ///   Exec::shell("echo tset"),
    ///   Exec::shell("tr '[:lower:]' '[:upper:]'"),
    ///   Exec::shell("rev")
    /// ];
    ///
    /// let pipeline: Pipeline = commands.into_iter().collect();
    /// let output = pipeline.capture().unwrap().stdout_str();
    /// assert_eq!(output, "TEST\n");
    /// ```
    fn from_iter<I: IntoIterator<Item = Exec>>(iter: I) -> Self {
        Pipeline {
            execs: iter.into_iter().collect(),
            stdin_redirect: Arc::new(Redirection::None),
            stdout: Arc::new(Redirection::None),
            stderr: Arc::new(Redirection::None),
            stdin_data: None,
            check_success: false,
            detached: false,
            cwd: None,
            os_options: Default::default(),
        }
    }
}

impl fmt::Debug for Pipeline {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut args = vec![];
        for cmd in &self.execs {
            args.push(cmd.to_cmdline_lossy());
        }
        write!(f, "Pipeline {{ {} }}", args.join(" | "))
    }
}