1use crate::{byte_stream::convert_file, shell_error::io::IoError, ShellError, Span};
2use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
3
4use os_pipe::PipeReader;
5use std::{
6 fmt::Debug,
7 io::{self, Read},
8 sync::mpsc::{self, Receiver, RecvError, TryRecvError},
9 thread,
10};
11
12pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
13 match status {
14 ExitStatus::Exited(exit_code) => {
15 if ignore_error {
16 Ok(())
17 } else if let Ok(exit_code) = exit_code.try_into() {
18 Err(ShellError::NonZeroExitCode { exit_code, span })
19 } else {
20 Ok(())
21 }
22 }
23 #[cfg(unix)]
24 ExitStatus::Signaled {
25 signal,
26 core_dumped,
27 } => {
28 use nix::sys::signal::Signal;
29
30 let sig = Signal::try_from(signal);
31
32 if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
33 Ok(())
35 } else {
36 let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
37 Err(if core_dumped {
38 ShellError::CoreDumped {
39 signal_name,
40 signal,
41 span,
42 }
43 } else {
44 ShellError::TerminatedBySignal {
45 signal_name,
46 signal,
47 span,
48 }
49 })
50 }
51 }
52 }
53}
54
55#[derive(Debug)]
56enum ExitStatusFuture {
57 Finished(Result<ExitStatus, Box<ShellError>>),
58 Running(Receiver<io::Result<ExitStatus>>),
59}
60
61impl ExitStatusFuture {
62 fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
63 match self {
64 ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
65 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
66 ExitStatusFuture::Running(receiver) => {
67 let code = match receiver.recv() {
68 #[cfg(unix)]
69 Ok(Ok(
70 status @ ExitStatus::Signaled {
71 core_dumped: true, ..
72 },
73 )) => {
74 check_ok(status, false, span)?;
75 Ok(status)
76 }
77 Ok(Ok(status)) => Ok(status),
78 Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
79 err.kind(),
80 span,
81 None,
82 "failed to get exit code",
83 ))),
84 Err(err @ RecvError) => Err(ShellError::GenericError {
85 error: err.to_string(),
86 msg: "failed to get exit code".into(),
87 span: span.into(),
88 help: None,
89 inner: vec![],
90 }),
91 };
92
93 *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
94
95 code
96 }
97 }
98 }
99
100 fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
101 match self {
102 ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
103 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
104 ExitStatusFuture::Running(receiver) => {
105 let code = match receiver.try_recv() {
106 Ok(Ok(status)) => Ok(Some(status)),
107 Ok(Err(err)) => Err(ShellError::GenericError {
108 error: err.to_string(),
109 msg: "failed to get exit code".to_string(),
110 span: span.into(),
111 help: None,
112 inner: vec![],
113 }),
114 Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
115 error: "receiver disconnected".to_string(),
116 msg: "failed to get exit code".into(),
117 span: span.into(),
118 help: None,
119 inner: vec![],
120 }),
121 Err(TryRecvError::Empty) => Ok(None),
122 };
123
124 if let Some(code) = code.clone().transpose() {
125 *self = ExitStatusFuture::Finished(code.map_err(Box::new));
126 }
127
128 code
129 }
130 }
131 }
132}
133
134pub enum ChildPipe {
135 Pipe(PipeReader),
136 Tee(Box<dyn Read + Send + 'static>),
137}
138
139impl Debug for ChildPipe {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("ChildPipe").finish()
142 }
143}
144
145impl From<PipeReader> for ChildPipe {
146 fn from(pipe: PipeReader) -> Self {
147 Self::Pipe(pipe)
148 }
149}
150
151impl Read for ChildPipe {
152 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
153 match self {
154 ChildPipe::Pipe(pipe) => pipe.read(buf),
155 ChildPipe::Tee(tee) => tee.read(buf),
156 }
157 }
158}
159
160#[derive(Debug)]
161pub struct ChildProcess {
162 pub stdout: Option<ChildPipe>,
163 pub stderr: Option<ChildPipe>,
164 exit_status: ExitStatusFuture,
165 ignore_error: bool,
166 span: Span,
167}
168
169pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
170
171impl Debug for PostWaitCallback {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 write!(f, "<wait_callback>")
174 }
175}
176
177impl ChildProcess {
178 pub fn new(
179 mut child: ForegroundChild,
180 reader: Option<PipeReader>,
181 swap: bool,
182 span: Span,
183 callback: Option<PostWaitCallback>,
184 ) -> Result<Self, ShellError> {
185 let (stdout, stderr) = if let Some(combined) = reader {
186 (Some(combined), None)
187 } else {
188 let stdout = child.as_mut().stdout.take().map(convert_file);
189 let stderr = child.as_mut().stderr.take().map(convert_file);
190
191 if swap {
192 (stderr, stdout)
193 } else {
194 (stdout, stderr)
195 }
196 };
197
198 let (exit_status_sender, exit_status) = mpsc::channel();
200
201 thread::Builder::new()
202 .name("exit status waiter".into())
203 .spawn(move || {
204 let matched = match child.wait() {
205 Ok(wait_status) => {
213 let next = match &wait_status {
214 ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
215 ForegroundWaitStatus::Finished(exit_status) => *exit_status,
216 };
217
218 if let Some(callback) = callback {
219 (callback.0)(wait_status);
220 }
221
222 Ok(next)
223 }
224 Err(err) => Err(err),
225 };
226
227 exit_status_sender.send(matched)
228 })
229 .map_err(|err| {
230 IoError::new_with_additional_context(
231 err.kind(),
232 span,
233 None,
234 "Could now spawn exit status waiter",
235 )
236 })?;
237
238 Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
239 }
240
241 pub fn from_raw(
242 stdout: Option<PipeReader>,
243 stderr: Option<PipeReader>,
244 exit_status: Option<Receiver<io::Result<ExitStatus>>>,
245 span: Span,
246 ) -> Self {
247 Self {
248 stdout: stdout.map(Into::into),
249 stderr: stderr.map(Into::into),
250 exit_status: exit_status
251 .map(ExitStatusFuture::Running)
252 .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
253 ignore_error: false,
254 span,
255 }
256 }
257
258 pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
259 self.ignore_error = ignore;
260 self
261 }
262
263 pub fn span(&self) -> Span {
264 self.span
265 }
266
267 pub fn into_bytes(mut self) -> Result<Vec<u8>, ShellError> {
268 if self.stderr.is_some() {
269 debug_assert!(false, "stderr should not exist");
270 return Err(ShellError::GenericError {
271 error: "internal error".into(),
272 msg: "stderr should not exist".into(),
273 span: self.span.into(),
274 help: None,
275 inner: vec![],
276 });
277 }
278
279 let bytes = if let Some(stdout) = self.stdout {
280 collect_bytes(stdout).map_err(|err| IoError::new(err.kind(), self.span, None))?
281 } else {
282 Vec::new()
283 };
284
285 check_ok(
286 self.exit_status.wait(self.span)?,
287 self.ignore_error,
288 self.span,
289 )?;
290
291 Ok(bytes)
292 }
293
294 pub fn wait(mut self) -> Result<(), ShellError> {
295 let from_io_error = IoError::factory(self.span, None);
296 if let Some(stdout) = self.stdout.take() {
297 let stderr = self
298 .stderr
299 .take()
300 .map(|stderr| {
301 thread::Builder::new()
302 .name("stderr consumer".into())
303 .spawn(move || consume_pipe(stderr))
304 })
305 .transpose()
306 .map_err(&from_io_error)?;
307
308 let res = consume_pipe(stdout);
309
310 if let Some(handle) = stderr {
311 handle
312 .join()
313 .map_err(|e| match e.downcast::<io::Error>() {
314 Ok(io) => from_io_error(*io).into(),
315 Err(err) => ShellError::GenericError {
316 error: "Unknown error".into(),
317 msg: format!("{err:?}"),
318 span: Some(self.span),
319 help: None,
320 inner: Vec::new(),
321 },
322 })?
323 .map_err(&from_io_error)?;
324 }
325
326 res.map_err(&from_io_error)?;
327 } else if let Some(stderr) = self.stderr.take() {
328 consume_pipe(stderr).map_err(&from_io_error)?;
329 }
330
331 check_ok(
332 self.exit_status.wait(self.span)?,
333 self.ignore_error,
334 self.span,
335 )
336 }
337
338 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
339 self.exit_status.try_wait(self.span)
340 }
341
342 pub fn wait_with_output(mut self) -> Result<ProcessOutput, ShellError> {
343 let from_io_error = IoError::factory(self.span, None);
344 let (stdout, stderr) = if let Some(stdout) = self.stdout {
345 let stderr = self
346 .stderr
347 .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
348 .transpose()
349 .map_err(&from_io_error)?;
350
351 let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
352
353 let stderr = stderr
354 .map(|handle| {
355 handle.join().map_err(|e| match e.downcast::<io::Error>() {
356 Ok(io) => from_io_error(*io).into(),
357 Err(err) => ShellError::GenericError {
358 error: "Unknown error".into(),
359 msg: format!("{err:?}"),
360 span: Some(self.span),
361 help: None,
362 inner: Vec::new(),
363 },
364 })
365 })
366 .transpose()?
367 .transpose()
368 .map_err(&from_io_error)?;
369
370 (Some(stdout), stderr)
371 } else {
372 let stderr = self
373 .stderr
374 .map(collect_bytes)
375 .transpose()
376 .map_err(&from_io_error)?;
377
378 (None, stderr)
379 };
380
381 let exit_status = self.exit_status.wait(self.span)?;
382
383 Ok(ProcessOutput {
384 stdout,
385 stderr,
386 exit_status,
387 })
388 }
389}
390
391fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
392 let mut buf = Vec::new();
393 match pipe {
394 ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
395 ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
396 }?;
397 Ok(buf)
398}
399
400fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
401 match pipe {
402 ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
403 ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
404 }?;
405 Ok(())
406}
407
408pub struct ProcessOutput {
409 pub stdout: Option<Vec<u8>>,
410 pub stderr: Option<Vec<u8>>,
411 pub exit_status: ExitStatus,
412}