nu_protocol/process/
child.rs

1use crate::{
2    ShellError, Span,
3    byte_stream::convert_file,
4    engine::{EngineState, FrozenJob, Job},
5    shell_error::io::IoError,
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11    fmt::Debug,
12    io::{self, Read},
13    sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14    sync::{Arc, Mutex},
15    thread,
16};
17
18/// Check the exit status of each pipeline element.
19///
20/// This is used to implement pipefail.
21#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23    exit_status: Vec<Option<(Arc<Mutex<ExitStatusFuture>>, Span)>>,
24) -> Result<(), ShellError> {
25    for (future, span) in exit_status.into_iter().rev().flatten() {
26        check_exit_status_future_ok(future, span)?
27    }
28    Ok(())
29}
30
31fn check_exit_status_future_ok(
32    exit_status_future: Arc<Mutex<ExitStatusFuture>>,
33    span: Span,
34) -> Result<(), ShellError> {
35    let mut future = exit_status_future
36        .lock()
37        .expect("lock exit_status_future should success");
38    let exit_status = future.wait(span)?;
39    check_ok(exit_status, false, span)
40}
41
42pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
43    match status {
44        ExitStatus::Exited(exit_code) => {
45            if ignore_error {
46                Ok(())
47            } else if let Ok(exit_code) = exit_code.try_into() {
48                Err(ShellError::NonZeroExitCode { exit_code, span })
49            } else {
50                Ok(())
51            }
52        }
53        #[cfg(unix)]
54        ExitStatus::Signaled {
55            signal,
56            core_dumped,
57        } => {
58            use nix::sys::signal::Signal;
59
60            let sig = Signal::try_from(signal);
61
62            if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
63                // Processes often exit with SIGPIPE, but this is not an error condition.
64                Ok(())
65            } else {
66                let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
67                Err(if core_dumped {
68                    ShellError::CoreDumped {
69                        signal_name,
70                        signal,
71                        span,
72                    }
73                } else {
74                    ShellError::TerminatedBySignal {
75                        signal_name,
76                        signal,
77                        span,
78                    }
79                })
80            }
81        }
82    }
83}
84
85#[derive(Debug)]
86pub enum ExitStatusFuture {
87    Finished(Result<ExitStatus, Box<ShellError>>),
88    Running(Receiver<io::Result<ExitStatus>>),
89}
90
91impl ExitStatusFuture {
92    pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
93        match self {
94            ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
95            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
96            ExitStatusFuture::Running(receiver) => {
97                let code = match receiver.recv() {
98                    #[cfg(unix)]
99                    Ok(Ok(
100                        status @ ExitStatus::Signaled {
101                            core_dumped: true, ..
102                        },
103                    )) => {
104                        check_ok(status, false, span)?;
105                        Ok(status)
106                    }
107                    Ok(Ok(status)) => Ok(status),
108                    Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
109                        err,
110                        span,
111                        None,
112                        "failed to get exit code",
113                    ))),
114                    Err(err @ RecvError) => Err(ShellError::GenericError {
115                        error: err.to_string(),
116                        msg: "failed to get exit code".into(),
117                        span: span.into(),
118                        help: None,
119                        inner: vec![],
120                    }),
121                };
122
123                *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
124
125                code
126            }
127        }
128    }
129
130    fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
131        match self {
132            ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
133            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
134            ExitStatusFuture::Running(receiver) => {
135                let code = match receiver.try_recv() {
136                    Ok(Ok(status)) => Ok(Some(status)),
137                    Ok(Err(err)) => Err(ShellError::GenericError {
138                        error: err.to_string(),
139                        msg: "failed to get exit code".to_string(),
140                        span: span.into(),
141                        help: None,
142                        inner: vec![],
143                    }),
144                    Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
145                        error: "receiver disconnected".to_string(),
146                        msg: "failed to get exit code".into(),
147                        span: span.into(),
148                        help: None,
149                        inner: vec![],
150                    }),
151                    Err(TryRecvError::Empty) => Ok(None),
152                };
153
154                if let Some(code) = code.clone().transpose() {
155                    *self = ExitStatusFuture::Finished(code.map_err(Box::new));
156                }
157
158                code
159            }
160        }
161    }
162}
163
164pub enum ChildPipe {
165    Pipe(PipeReader),
166    Tee(Box<dyn Read + Send + 'static>),
167}
168
169impl Debug for ChildPipe {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("ChildPipe").finish()
172    }
173}
174
175impl From<PipeReader> for ChildPipe {
176    fn from(pipe: PipeReader) -> Self {
177        Self::Pipe(pipe)
178    }
179}
180
181impl Read for ChildPipe {
182    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
183        match self {
184            ChildPipe::Pipe(pipe) => pipe.read(buf),
185            ChildPipe::Tee(tee) => tee.read(buf),
186        }
187    }
188}
189
190#[derive(Debug)]
191pub struct ChildProcess {
192    pub stdout: Option<ChildPipe>,
193    pub stderr: Option<ChildPipe>,
194    exit_status: Arc<Mutex<ExitStatusFuture>>,
195    ignore_error: bool,
196    span: Span,
197}
198
199/// A wrapper for a closure that runs once the shell finishes waiting on the process.
200pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
201
202impl PostWaitCallback {
203    pub fn new<F>(f: F) -> Self
204    where
205        F: FnOnce(ForegroundWaitStatus) + Send + 'static,
206    {
207        PostWaitCallback(Box::new(f))
208    }
209
210    /// Creates a PostWaitCallback that creates a frozen job in the job table
211    /// if the incoming wait status indicates that the job was frozen.
212    ///
213    /// If `child_pid` is provided, the returned callback will also remove
214    /// it from the pid list of the current running job.
215    ///
216    /// The given `tag` argument will be used as the tag for the newly created job table entry.
217    pub fn for_job_control(
218        engine_state: &EngineState,
219        child_pid: Option<u32>,
220        tag: Option<String>,
221    ) -> Self {
222        let this_job = engine_state.current_thread_job().cloned();
223        let jobs = engine_state.jobs.clone();
224        let is_interactive = engine_state.is_interactive;
225
226        PostWaitCallback::new(move |status| {
227            if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
228                this_job.remove_pid(child_pid);
229            }
230
231            if let ForegroundWaitStatus::Frozen(unfreeze) = status {
232                let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
233
234                let job_id = jobs.add_job(Job::Frozen(FrozenJob { unfreeze, tag }));
235
236                if is_interactive {
237                    println!("\nJob {} is frozen", job_id.get());
238                }
239            }
240        })
241    }
242}
243
244impl Debug for PostWaitCallback {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        write!(f, "<wait_callback>")
247    }
248}
249
250impl ChildProcess {
251    pub fn new(
252        mut child: ForegroundChild,
253        reader: Option<PipeReader>,
254        swap: bool,
255        span: Span,
256        callback: Option<PostWaitCallback>,
257    ) -> Result<Self, ShellError> {
258        let (stdout, stderr) = if let Some(combined) = reader {
259            (Some(combined), None)
260        } else {
261            let stdout = child.as_mut().stdout.take().map(convert_file);
262            let stderr = child.as_mut().stderr.take().map(convert_file);
263
264            if swap {
265                (stderr, stdout)
266            } else {
267                (stdout, stderr)
268            }
269        };
270
271        // Create a thread to wait for the exit status.
272        let (exit_status_sender, exit_status) = mpsc::channel();
273
274        thread::Builder::new()
275            .name("exit status waiter".into())
276            .spawn(move || {
277                let matched = match child.wait() {
278                    // there are two possible outcomes when we `wait` for a process to finish:
279                    // 1. the process finishes as usual
280                    // 2. (unix only) the process gets signaled with SIGTSTP
281                    //
282                    // in the second case, although the process may still be alive in a
283                    // cryonic state, we explicitly treat as it has finished with exit code 0
284                    // for the sake of the current pipeline
285                    Ok(wait_status) => {
286                        let next = match &wait_status {
287                            ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
288                            ForegroundWaitStatus::Finished(exit_status) => *exit_status,
289                        };
290
291                        if let Some(callback) = callback {
292                            (callback.0)(wait_status);
293                        }
294
295                        Ok(next)
296                    }
297                    Err(err) => Err(err),
298                };
299
300                exit_status_sender.send(matched)
301            })
302            .map_err(|err| {
303                IoError::new_with_additional_context(
304                    err,
305                    span,
306                    None,
307                    "Could now spawn exit status waiter",
308                )
309            })?;
310
311        Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
312    }
313
314    pub fn from_raw(
315        stdout: Option<PipeReader>,
316        stderr: Option<PipeReader>,
317        exit_status: Option<Receiver<io::Result<ExitStatus>>>,
318        span: Span,
319    ) -> Self {
320        Self {
321            stdout: stdout.map(Into::into),
322            stderr: stderr.map(Into::into),
323            exit_status: Arc::new(Mutex::new(
324                exit_status
325                    .map(ExitStatusFuture::Running)
326                    .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
327            )),
328            ignore_error: false,
329            span,
330        }
331    }
332
333    pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
334        self.ignore_error = ignore;
335        self
336    }
337
338    pub fn span(&self) -> Span {
339        self.span
340    }
341
342    pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
343        if self.stderr.is_some() {
344            debug_assert!(false, "stderr should not exist");
345            return Err(ShellError::GenericError {
346                error: "internal error".into(),
347                msg: "stderr should not exist".into(),
348                span: self.span.into(),
349                help: None,
350                inner: vec![],
351            });
352        }
353
354        let bytes = if let Some(stdout) = self.stdout {
355            collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
356        } else {
357            Vec::new()
358        };
359
360        let mut exit_status = self
361            .exit_status
362            .lock()
363            .expect("lock exit_status future should success");
364        check_ok(exit_status.wait(self.span)?, self.ignore_error, self.span)?;
365
366        Ok(bytes)
367    }
368
369    pub fn wait(mut self) -> Result<(), ShellError> {
370        let from_io_error = IoError::factory(self.span, None);
371        if let Some(stdout) = self.stdout.take() {
372            let stderr = self
373                .stderr
374                .take()
375                .map(|stderr| {
376                    thread::Builder::new()
377                        .name("stderr consumer".into())
378                        .spawn(move || consume_pipe(stderr))
379                })
380                .transpose()
381                .map_err(&from_io_error)?;
382
383            let res = consume_pipe(stdout);
384
385            if let Some(handle) = stderr {
386                handle
387                    .join()
388                    .map_err(|e| match e.downcast::<io::Error>() {
389                        Ok(io) => from_io_error(*io).into(),
390                        Err(err) => ShellError::GenericError {
391                            error: "Unknown error".into(),
392                            msg: format!("{err:?}"),
393                            span: Some(self.span),
394                            help: None,
395                            inner: Vec::new(),
396                        },
397                    })?
398                    .map_err(&from_io_error)?;
399            }
400
401            res.map_err(&from_io_error)?;
402        } else if let Some(stderr) = self.stderr.take() {
403            consume_pipe(stderr).map_err(&from_io_error)?;
404        }
405        let mut exit_status = self
406            .exit_status
407            .lock()
408            .expect("lock exit_status future should success");
409        check_ok(exit_status.wait(self.span)?, self.ignore_error, self.span)
410    }
411
412    pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
413        let mut exit_status = self
414            .exit_status
415            .lock()
416            .expect("lock exit_status future should success");
417        exit_status.try_wait(self.span)
418    }
419
420    pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
421        let from_io_error = IoError::factory(self.span, None);
422        let (stdout, stderr) = if let Some(stdout) = self.stdout {
423            let stderr = self
424                .stderr
425                .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
426                .transpose()
427                .map_err(&from_io_error)?;
428
429            let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
430
431            let stderr = stderr
432                .map(|handle| {
433                    handle.join().map_err(|e| match e.downcast::<io::Error>() {
434                        Ok(io) => from_io_error(*io).into(),
435                        Err(err) => ShellError::GenericError {
436                            error: "Unknown error".into(),
437                            msg: format!("{err:?}"),
438                            span: Some(self.span),
439                            help: None,
440                            inner: Vec::new(),
441                        },
442                    })
443                })
444                .transpose()?
445                .transpose()
446                .map_err(&from_io_error)?;
447
448            (Some(stdout), stderr)
449        } else {
450            let stderr = self
451                .stderr
452                .map(collect_bytes)
453                .transpose()
454                .map_err(&from_io_error)?;
455
456            (None, stderr)
457        };
458
459        let mut exit_status = self
460            .exit_status
461            .lock()
462            .expect("lock exit_status future should success");
463        let exit_status = exit_status.wait(self.span)?;
464
465        Ok(ProcessOutput {
466            stdout,
467            stderr,
468            exit_status,
469        })
470    }
471
472    pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
473        self.exit_status.clone()
474    }
475}
476
477fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
478    let mut buf = Vec::new();
479    match pipe {
480        ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
481        ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
482    }?;
483    Ok(buf)
484}
485
486fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
487    match pipe {
488        ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
489        ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
490    }?;
491    Ok(())
492}
493
494pub struct ProcessOutput {
495    pub stdout: Option<Vec<u8>>,
496    pub stderr: Option<Vec<u8>>,
497    pub exit_status: ExitStatus,
498}