1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
use crate::env::{FileDescEnvironment, FileDescOpener, ReportErrorEnvironment, SubEnvironment};
use crate::error::IsFatalError;
use crate::io::Permissions;
use crate::spawn::swallow_non_fatal_errors;
use crate::{ExitStatus, Spawn, EXIT_ERROR, EXIT_SUCCESS, STDIN_FILENO, STDOUT_FILENO};
use futures_core::future::BoxFuture;
use futures_core::stream::Stream;
use futures_util::future::poll_fn;
use futures_util::stream::{FuturesUnordered, StreamExt};
use std::error::Error;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Spawns a pipeline of commands.
///
/// The standard output of the previous command will be piped as standard input
/// to the next. The very first and last commands will inherit standard intput
/// and output from the environment, respectively.
///
/// If `invert_last_status` is set to `false`, the pipeline will fully resolve
/// to the last command's exit status. Otherwise, `EXIT_ERROR` will be returned
/// if the last command succeeds, and `EXIT_SUCCESS` will be returned otherwise.
pub async fn pipeline<S, I, E>(
invert_last_status: bool,
first: S,
rest: I,
env: &mut E,
) -> Result<BoxFuture<'static, ExitStatus>, S::Error>
where
I: IntoIterator<Item = S>,
S: Send + Sync + Spawn<E>,
S::Error: From<io::Error> + IsFatalError,
E: Send + FileDescEnvironment + FileDescOpener + ReportErrorEnvironment + SubEnvironment,
E::FileHandle: From<E::OpenedFileHandle>,
{
do_pipeline(invert_last_status, first, rest.into_iter(), env).await
}
async fn do_pipeline<S, I, E>(
invert_last_status: bool,
first: S,
mut rest: I,
orig_env: &mut E,
) -> Result<BoxFuture<'static, ExitStatus>, S::Error>
where
I: Iterator<Item = S>,
S: Send + Sync + Spawn<E>,
S::Error: From<io::Error> + IsFatalError,
E: Send + FileDescEnvironment + FileDescOpener + ReportErrorEnvironment + SubEnvironment,
E::FileHandle: From<E::OpenedFileHandle>,
{
// When we spawn each command in the pipeline, we'll pins them to their own
// (sub) environments.
//
// bash will apparently run each pipeline command in its own environment, thus
// no side-effects (e.g. setting variables) are reflected on the parent environment,
// (though this is probably a side effect of bash forking on each command).
//
// zsh, on the other hand, does persist side effects from individual commands
// to the parent environment. Although we could implement this behavior as well,
// it would require custom fiddling and book keeping with the environment (e.g.
// only swap the file descriptors between commands, but persist other things
// like variables), but this doesn't go well with our *generic* approach to everything.
//
// There is also a question of how useful something like `echo foo | var=value`
// even is, and whether such a command would even appear in regular scripts.
// Given that bash is pretty popular, and given that the POSIX spec is slient
// on how side-effects from pipelines should be handled, we have a pretty low
// risk of behaving differently than the script author intends, so we'll take
// bash's approach and spawn each command with its own environment and hide any
// lasting effects.
// Futures which are still holding an environment or a reference to the command being
// spawned and as such they cannot be treated as static (well, without imposing that
// bound on the caller).
let env_futures = FuturesUnordered::new();
let final_cmd_env_future: BoxFuture<'_, _> = if let Some(second) = rest.next() {
let mut next_in = {
// First command will automatically inherit the stdin of the
// parent environment, so no need to manually set it
let mut env = orig_env.sub_env();
let pipe = env.open_pipe()?;
env.set_file_desc(STDOUT_FILENO, pipe.writer.into(), Permissions::Write);
env_futures.push(spawn_and_swallow_errors(first, env));
pipe.reader
};
let mut last = second;
for next in rest {
let mut env = orig_env.sub_env();
let pipe = env.open_pipe()?;
env.set_file_desc(STDIN_FILENO, next_in.into(), Permissions::Read);
env.set_file_desc(STDOUT_FILENO, pipe.writer.into(), Permissions::Write);
next_in = pipe.reader;
env_futures.push(spawn_and_swallow_errors(last, env));
last = next;
}
let mut env = orig_env.sub_env();
env.set_file_desc(STDIN_FILENO, next_in.into(), Permissions::Read);
Box::pin(async move {
let ret = swallow_non_fatal_errors(last, &mut env).await;
drop(env);
ret
})
} else {
Box::pin(swallow_non_fatal_errors(first, orig_env))
};
// At this point every single command in the pipeline has been "spawned" into
// the first future which holds references to the command itself. We now have
// to poll all these futures until they resolve to their second layer, at which
// point everything should be 'static and the caller can drop their environment
// reference as well.
//
// The complication arises when we have to consider that if one "env future"
// resolves, we should start polling it's "static futue" while the other
// "env futures" are still pending. Consider a pipeline of one builtin utility
// whose output is being fed into a loop command. If we wait until the loop breaks
// to poll the builtin "static future", we could end up starving the loop and
// dead locking.
//
// Thus we have to keep polling *everything* until all "env futures" have resolved,
// at which point we can move into the second "static future" phase. But this requires
// doing some extra book keeping which happens below.
let mut env_futures = Box::pin(env_futures);
let mut static_futures = Box::pin(FuturesUnordered::new());
let mut final_cmd_state = FinalCmdState::EnvFuture(final_cmd_env_future);
poll_fn(|cx| {
let env_futures_done = loop {
match env_futures.as_mut().poll_next(cx) {
Poll::Ready(Some(sf)) => sf.map(|sf| static_futures.push(sf)),
Poll::Ready(None) => break true,
Poll::Pending => break false,
};
};
loop {
match &mut final_cmd_state {
FinalCmdState::EnvFuture(ef) => {
final_cmd_state = match ef.as_mut().poll(cx) {
Poll::Pending => break,
Poll::Ready(Ok(f)) => FinalCmdState::Maybe(MaybeDone::Future(f)),
Poll::Ready(Err(e)) => FinalCmdState::Error(e),
};
}
FinalCmdState::Error(_) => {}
FinalCmdState::Maybe(f) => {
let _ = Pin::new(f).poll(cx);
}
}
// Don't need references to any environments
// or commands any more, so bail!
if env_futures_done {
return Poll::Ready(());
}
}
// Still have pending futures, keep polling any static_futures so they
// can make progress.
while let Poll::Ready(Some(_exit)) = static_futures.as_mut().poll_next(cx) {}
Poll::Pending
})
.await;
let final_cmd = match final_cmd_state {
FinalCmdState::EnvFuture(_) => unreachable!(),
FinalCmdState::Maybe(m) => m,
FinalCmdState::Error(e) => {
// We need to return an error back to the caller, but before we
// do so, we should continue to poll any pending futures and give
// those commands a chance to complete (or die due to pipe errors)
// rather than abruptly dropping/killing them.
while let Some(_status) = static_futures.next().await {}
return Err(e);
}
};
Ok(Box::pin(async move {
let (_, final_status) = futures_util::join!(
async move { while let Some(_status) = static_futures.next().await {} },
final_cmd,
);
if invert_last_status {
if final_status.success() {
EXIT_ERROR
} else {
EXIT_SUCCESS
}
} else {
final_status
}
}))
}
async fn spawn_and_swallow_errors<S, E>(
cmd: S,
mut env: E,
) -> Option<BoxFuture<'static, ExitStatus>>
where
S: Spawn<E>,
S::Error: 'static + Send + Sync + Error,
E: ReportErrorEnvironment,
{
match cmd.spawn(&mut env).await {
Ok(f) => Some(f),
Err(e) => {
env.report_error(&e).await;
None
}
}
}
enum FinalCmdState<EF, ERR> {
/// The outer future (with a reference to the environment and command)
/// is still pending.
EnvFuture(EF),
/// "env future" errored out
Error(ERR),
/// The static future that the "env future" will resolve to, or the final result.
Maybe(MaybeDone),
}
enum MaybeDone {
/// The static future that the "env future" will resolve to.
Future(BoxFuture<'static, ExitStatus>),
/// The command has finished
Done(ExitStatus),
}
impl Future for MaybeDone {
type Output = ExitStatus;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this {
MaybeDone::Future(ref mut f) => match f.as_mut().poll(cx) {
Poll::Ready(status) => {
*this = MaybeDone::Done(status);
Poll::Ready(status)
}
Poll::Pending => Poll::Pending,
},
MaybeDone::Done(status) => Poll::Ready(*status),
}
}
}