1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
use std::fs::File;
use std::io;
use std::io::{ErrorKind, Read, Write};
use std::time::{Duration, Instant};
use crate::communicate::Communicator;
use crate::exec::{Capture, InputData};
use crate::process::{ExitStatus, Process};
/// Interface to a started process or pipeline.
///
/// Created by [`Exec::start`](crate::Exec::start) or
/// [`Pipeline::start`](crate::Pipeline::start).
///
/// When dropped, waits for all processes to finish unless [`detached`](Self::detach).
#[derive(Debug)]
#[non_exhaustive]
pub struct Job {
// Pipe fields are declared before `processes` so that they are dropped
// first, allowing children to receive EOF and exit before
// `Process::drop` waits on them.
/// Write end of the first process's stdin pipe, if stdin was `Pipe`.
pub stdin: Option<File>,
/// Read end of the last process's stdout pipe, if stdout was `Pipe`.
pub stdout: Option<File>,
/// Read end of the shared stderr pipe, if stderr was `Pipe`.
pub stderr: Option<File>,
/// Data to feed to the first process's stdin, set by
/// [`Exec::stdin`](crate::Exec::stdin) or
/// [`Pipeline::stdin`](crate::Pipeline::stdin).
pub stdin_data: InputData,
/// Whether to return an error on non-zero exit status.
pub check_success: bool,
/// Started processes, in pipeline order.
pub processes: Vec<Process>,
}
impl Job {
/// Creates a [`Communicator`] from the pipe ends.
///
/// The communicator takes ownership of `stdin`, `stdout`, and `stderr`, leaving them
/// as `None`. Only streams that were redirected to a pipe will be available to the
/// communicator.
pub fn communicate(&mut self) -> io::Result<Communicator> {
Communicator::new(
self.stdin.take(),
self.stdout.take(),
self.stderr.take(),
std::mem::take(&mut self.stdin_data),
)
}
/// Terminates all processes in the pipeline.
///
/// Delegates to [`Process::terminate()`] on each process, which sends `SIGTERM` on
/// Unix and calls `TerminateProcess` on Windows. Already reaped processes are
/// silently skipped.
pub fn terminate(&self) -> io::Result<()> {
for p in &self.processes {
p.terminate()?;
}
Ok(())
}
/// Waits for all processes to finish and returns the last process's exit status.
///
/// If no processes have been started (empty pipeline), returns a successful exit
/// status.
///
/// Unlike [`join`](Self::join), this does not consume `self`, does not close the pipe
/// ends, and ignores `check_success`.
pub fn wait(&self) -> io::Result<ExitStatus> {
let mut status = ExitStatus::from_raw(0);
for p in &self.processes {
status = p.wait()?;
}
Ok(status)
}
/// Returns the PID of the last process in the pipeline.
///
/// For a single command started with
/// [`Exec::start`](crate::Exec::start), this is the PID of that
/// command. For a pipeline, this is the PID of the last command.
///
/// # Panics
///
/// Panics if no processes have been started because this was created by an empty
/// `Pipeline`.
pub fn pid(&self) -> u32 {
self.processes.last().unwrap().pid()
}
/// Returns the PIDs of all processes in the pipeline, in pipeline order.
///
/// If the job was started by a single process, this will return its pid. It will be
/// empty for a job started by an empty pipeline.
pub fn pids(&self) -> Vec<u32> {
self.processes.iter().map(|p| p.pid()).collect()
}
/// Kill all processes in the pipeline.
///
/// Delegates to [`Process::kill()`] on each process, which sends `SIGKILL` on Unix
/// and calls `TerminateProcess` on Windows. Already reaped processes are silently
/// skipped.
pub fn kill(&self) -> io::Result<()> {
for p in &self.processes {
p.kill()?;
}
Ok(())
}
/// Detach all processes in the pipeline.
///
/// After detaching, the processes will not be waited for in drop.
pub fn detach(&self) {
for p in &self.processes {
p.detach();
}
}
/// Poll all processes for completion without blocking.
///
/// Returns `Some(exit_status)` of the last process if all processes have finished, or
/// `None` if any process is still running. If no processes have been started (empty
/// pipeline), returns `Some` with a successful exit status.
pub fn poll(&self) -> Option<ExitStatus> {
let mut status = Some(ExitStatus::from_raw(0));
for p in &self.processes {
status = Some(p.poll()?);
}
status
}
/// Like [`wait`](Self::wait), but with a timeout.
///
/// Returns `Ok(None)` if the processes don't finish within the given duration.
pub fn wait_timeout(&self, timeout: Duration) -> io::Result<Option<ExitStatus>> {
let deadline = Instant::now() + timeout;
let mut status = ExitStatus::from_raw(0);
for p in &self.processes {
match p.wait_timeout(deadline.saturating_duration_since(Instant::now()))? {
Some(s) => status = s,
None => return Ok(None),
}
}
Ok(Some(status))
}
/// Closes the pipe ends, waits for all processes to finish, and returns the exit
/// status of the last process.
///
/// If input or output was redirected to pipe, this close input and drain the output
/// as needed.
pub fn join(mut self) -> io::Result<ExitStatus> {
// Communicator::read_to() feeds stdin to the subprocess and drains its
// output/error, if configured as pipe.
self.communicate()?
.read_to(std::io::sink(), std::io::sink())?;
let status = self.wait()?;
if self.check_success && !status.success() {
return Err(io::Error::other(format!("command failed: {status}")));
}
Ok(status)
}
/// Like [`join`](Self::join), but with a timeout.
///
/// Returns an error of kind `ErrorKind::TimedOut` if the processes don't finish
/// within the given duration.
pub fn join_timeout(mut self, timeout: Duration) -> io::Result<ExitStatus> {
let deadline = Instant::now() + timeout;
self.communicate()?
.limit_time(timeout)
.read_to(std::io::sink(), std::io::sink())?;
let status = self
.wait_timeout(deadline.saturating_duration_since(Instant::now()))?
.ok_or_else(|| io::Error::from(ErrorKind::TimedOut))?;
if self.check_success && !status.success() {
return Err(io::Error::other(format!("command failed: {status}")));
}
Ok(status)
}
/// Captures the output and waits for the process(es) to finish.
///
/// Only streams that were redirected to a pipe will produce data; non-piped streams
/// will result in empty bytes in `Capture`.
pub fn capture(mut self) -> io::Result<Capture> {
let mut comm = self.communicate()?;
let (stdout, stderr) = comm.read()?;
let capture = Capture {
stdout,
stderr,
exit_status: self.wait()?,
};
if self.check_success && !capture.success() {
return Err(io::Error::other(format!(
"command failed: {}",
capture.exit_status
)));
}
Ok(capture)
}
/// Like [`capture`](Self::capture), but with a timeout.
///
/// Returns an error of kind `ErrorKind::TimedOut` if the processes don't finish
/// within the given duration.
pub fn capture_timeout(mut self, timeout: Duration) -> io::Result<Capture> {
let deadline = Instant::now() + timeout;
let mut comm = self.communicate()?.limit_time(timeout);
let (stdout, stderr) = comm.read()?;
let exit_status = self
.wait_timeout(deadline.saturating_duration_since(Instant::now()))?
.ok_or_else(|| io::Error::from(ErrorKind::TimedOut))?;
let capture = Capture {
stdout,
stderr,
exit_status,
};
if self.check_success && !capture.success() {
return Err(io::Error::other(format!(
"command failed: {}",
capture.exit_status
)));
}
Ok(capture)
}
}
#[derive(Debug)]
pub(crate) struct ReadAdapter(pub(crate) Job);
impl Read for ReadAdapter {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.stdout.as_mut().unwrap().read(buf)
}
}
#[derive(Debug)]
pub(crate) struct ReadErrAdapter(pub(crate) Job);
impl Read for ReadErrAdapter {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.stderr.as_mut().unwrap().read(buf)
}
}
#[derive(Debug)]
pub(crate) struct WriteAdapter(pub(crate) Job);
impl Write for WriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.stdin.as_mut().unwrap().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.stdin.as_mut().unwrap().flush()
}
}