nu_protocol/process/
child.rs

1use crate::{byte_stream::convert_file, shell_error::io::IoError, ShellError, Span};
2use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
3
4use os_pipe::PipeReader;
5use std::{
6    fmt::Debug,
7    io::{self, Read},
8    sync::mpsc::{self, Receiver, RecvError, TryRecvError},
9    thread,
10};
11
12pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
13    match status {
14        ExitStatus::Exited(exit_code) => {
15            if ignore_error {
16                Ok(())
17            } else if let Ok(exit_code) = exit_code.try_into() {
18                Err(ShellError::NonZeroExitCode { exit_code, span })
19            } else {
20                Ok(())
21            }
22        }
23        #[cfg(unix)]
24        ExitStatus::Signaled {
25            signal,
26            core_dumped,
27        } => {
28            use nix::sys::signal::Signal;
29
30            let sig = Signal::try_from(signal);
31
32            if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
33                // Processes often exit with SIGPIPE, but this is not an error condition.
34                Ok(())
35            } else {
36                let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
37                Err(if core_dumped {
38                    ShellError::CoreDumped {
39                        signal_name,
40                        signal,
41                        span,
42                    }
43                } else {
44                    ShellError::TerminatedBySignal {
45                        signal_name,
46                        signal,
47                        span,
48                    }
49                })
50            }
51        }
52    }
53}
54
55#[derive(Debug)]
56enum ExitStatusFuture {
57    Finished(Result<ExitStatus, Box<ShellError>>),
58    Running(Receiver<io::Result<ExitStatus>>),
59}
60
61impl ExitStatusFuture {
62    fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
63        match self {
64            ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
65            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
66            ExitStatusFuture::Running(receiver) => {
67                let code = match receiver.recv() {
68                    #[cfg(unix)]
69                    Ok(Ok(
70                        status @ ExitStatus::Signaled {
71                            core_dumped: true, ..
72                        },
73                    )) => {
74                        check_ok(status, false, span)?;
75                        Ok(status)
76                    }
77                    Ok(Ok(status)) => Ok(status),
78                    Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
79                        err.kind(),
80                        span,
81                        None,
82                        "failed to get exit code",
83                    ))),
84                    Err(err @ RecvError) => Err(ShellError::GenericError {
85                        error: err.to_string(),
86                        msg: "failed to get exit code".into(),
87                        span: span.into(),
88                        help: None,
89                        inner: vec![],
90                    }),
91                };
92
93                *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
94
95                code
96            }
97        }
98    }
99
100    fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
101        match self {
102            ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
103            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
104            ExitStatusFuture::Running(receiver) => {
105                let code = match receiver.try_recv() {
106                    Ok(Ok(status)) => Ok(Some(status)),
107                    Ok(Err(err)) => Err(ShellError::GenericError {
108                        error: err.to_string(),
109                        msg: "failed to get exit code".to_string(),
110                        span: span.into(),
111                        help: None,
112                        inner: vec![],
113                    }),
114                    Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
115                        error: "receiver disconnected".to_string(),
116                        msg: "failed to get exit code".into(),
117                        span: span.into(),
118                        help: None,
119                        inner: vec![],
120                    }),
121                    Err(TryRecvError::Empty) => Ok(None),
122                };
123
124                if let Some(code) = code.clone().transpose() {
125                    *self = ExitStatusFuture::Finished(code.map_err(Box::new));
126                }
127
128                code
129            }
130        }
131    }
132}
133
134pub enum ChildPipe {
135    Pipe(PipeReader),
136    Tee(Box<dyn Read + Send + 'static>),
137}
138
139impl Debug for ChildPipe {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("ChildPipe").finish()
142    }
143}
144
145impl From<PipeReader> for ChildPipe {
146    fn from(pipe: PipeReader) -> Self {
147        Self::Pipe(pipe)
148    }
149}
150
151impl Read for ChildPipe {
152    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
153        match self {
154            ChildPipe::Pipe(pipe) => pipe.read(buf),
155            ChildPipe::Tee(tee) => tee.read(buf),
156        }
157    }
158}
159
160#[derive(Debug)]
161pub struct ChildProcess {
162    pub stdout: Option<ChildPipe>,
163    pub stderr: Option<ChildPipe>,
164    exit_status: ExitStatusFuture,
165    ignore_error: bool,
166    span: Span,
167}
168
169pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
170
171impl Debug for PostWaitCallback {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        write!(f, "<wait_callback>")
174    }
175}
176
177impl ChildProcess {
178    pub fn new(
179        mut child: ForegroundChild,
180        reader: Option<PipeReader>,
181        swap: bool,
182        span: Span,
183        callback: Option<PostWaitCallback>,
184    ) -> Result<Self, ShellError> {
185        let (stdout, stderr) = if let Some(combined) = reader {
186            (Some(combined), None)
187        } else {
188            let stdout = child.as_mut().stdout.take().map(convert_file);
189            let stderr = child.as_mut().stderr.take().map(convert_file);
190
191            if swap {
192                (stderr, stdout)
193            } else {
194                (stdout, stderr)
195            }
196        };
197
198        // Create a thread to wait for the exit status.
199        let (exit_status_sender, exit_status) = mpsc::channel();
200
201        thread::Builder::new()
202            .name("exit status waiter".into())
203            .spawn(move || {
204                let matched = match child.wait() {
205                    // there are two possible outcomes when we `wait` for a process to finish:
206                    // 1. the process finishes as usual
207                    // 2. (unix only) the process gets signaled with SIGTSTP
208                    //
209                    // in the second case, although the process may still be alive in a
210                    // cryonic state, we explicitly treat as it has finished with exit code 0
211                    // for the sake of the current pipeline
212                    Ok(wait_status) => {
213                        let next = match &wait_status {
214                            ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
215                            ForegroundWaitStatus::Finished(exit_status) => *exit_status,
216                        };
217
218                        if let Some(callback) = callback {
219                            (callback.0)(wait_status);
220                        }
221
222                        Ok(next)
223                    }
224                    Err(err) => Err(err),
225                };
226
227                exit_status_sender.send(matched)
228            })
229            .map_err(|err| {
230                IoError::new_with_additional_context(
231                    err.kind(),
232                    span,
233                    None,
234                    "Could now spawn exit status waiter",
235                )
236            })?;
237
238        Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
239    }
240
241    pub fn from_raw(
242        stdout: Option<PipeReader>,
243        stderr: Option<PipeReader>,
244        exit_status: Option<Receiver<io::Result<ExitStatus>>>,
245        span: Span,
246    ) -> Self {
247        Self {
248            stdout: stdout.map(Into::into),
249            stderr: stderr.map(Into::into),
250            exit_status: exit_status
251                .map(ExitStatusFuture::Running)
252                .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
253            ignore_error: false,
254            span,
255        }
256    }
257
258    pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
259        self.ignore_error = ignore;
260        self
261    }
262
263    pub fn span(&self) -> Span {
264        self.span
265    }
266
267    pub fn into_bytes(mut self) -> Result<Vec<u8>, ShellError> {
268        if self.stderr.is_some() {
269            debug_assert!(false, "stderr should not exist");
270            return Err(ShellError::GenericError {
271                error: "internal error".into(),
272                msg: "stderr should not exist".into(),
273                span: self.span.into(),
274                help: None,
275                inner: vec![],
276            });
277        }
278
279        let bytes = if let Some(stdout) = self.stdout {
280            collect_bytes(stdout).map_err(|err| IoError::new(err.kind(), self.span, None))?
281        } else {
282            Vec::new()
283        };
284
285        check_ok(
286            self.exit_status.wait(self.span)?,
287            self.ignore_error,
288            self.span,
289        )?;
290
291        Ok(bytes)
292    }
293
294    pub fn wait(mut self) -> Result<(), ShellError> {
295        let from_io_error = IoError::factory(self.span, None);
296        if let Some(stdout) = self.stdout.take() {
297            let stderr = self
298                .stderr
299                .take()
300                .map(|stderr| {
301                    thread::Builder::new()
302                        .name("stderr consumer".into())
303                        .spawn(move || consume_pipe(stderr))
304                })
305                .transpose()
306                .map_err(&from_io_error)?;
307
308            let res = consume_pipe(stdout);
309
310            if let Some(handle) = stderr {
311                handle
312                    .join()
313                    .map_err(|e| match e.downcast::<io::Error>() {
314                        Ok(io) => from_io_error(*io).into(),
315                        Err(err) => ShellError::GenericError {
316                            error: "Unknown error".into(),
317                            msg: format!("{err:?}"),
318                            span: Some(self.span),
319                            help: None,
320                            inner: Vec::new(),
321                        },
322                    })?
323                    .map_err(&from_io_error)?;
324            }
325
326            res.map_err(&from_io_error)?;
327        } else if let Some(stderr) = self.stderr.take() {
328            consume_pipe(stderr).map_err(&from_io_error)?;
329        }
330
331        check_ok(
332            self.exit_status.wait(self.span)?,
333            self.ignore_error,
334            self.span,
335        )
336    }
337
338    pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
339        self.exit_status.try_wait(self.span)
340    }
341
342    pub fn wait_with_output(mut self) -> Result<ProcessOutput, ShellError> {
343        let from_io_error = IoError::factory(self.span, None);
344        let (stdout, stderr) = if let Some(stdout) = self.stdout {
345            let stderr = self
346                .stderr
347                .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
348                .transpose()
349                .map_err(&from_io_error)?;
350
351            let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
352
353            let stderr = stderr
354                .map(|handle| {
355                    handle.join().map_err(|e| match e.downcast::<io::Error>() {
356                        Ok(io) => from_io_error(*io).into(),
357                        Err(err) => ShellError::GenericError {
358                            error: "Unknown error".into(),
359                            msg: format!("{err:?}"),
360                            span: Some(self.span),
361                            help: None,
362                            inner: Vec::new(),
363                        },
364                    })
365                })
366                .transpose()?
367                .transpose()
368                .map_err(&from_io_error)?;
369
370            (Some(stdout), stderr)
371        } else {
372            let stderr = self
373                .stderr
374                .map(collect_bytes)
375                .transpose()
376                .map_err(&from_io_error)?;
377
378            (None, stderr)
379        };
380
381        let exit_status = self.exit_status.wait(self.span)?;
382
383        Ok(ProcessOutput {
384            stdout,
385            stderr,
386            exit_status,
387        })
388    }
389}
390
391fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
392    let mut buf = Vec::new();
393    match pipe {
394        ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
395        ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
396    }?;
397    Ok(buf)
398}
399
400fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
401    match pipe {
402        ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
403        ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
404    }?;
405    Ok(())
406}
407
408pub struct ProcessOutput {
409    pub stdout: Option<Vec<u8>>,
410    pub stderr: Option<Vec<u8>>,
411    pub exit_status: ExitStatus,
412}