Skip to main content

nu_protocol/process/
child.rs

1use crate::{
2    ShellError, Span,
3    byte_stream::convert_file,
4    engine::{EngineState, FrozenJob, Job},
5    shell_error::{generic::GenericError, io::IoError},
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11    fmt::Debug,
12    io::{self, Read},
13    sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14    sync::{Arc, Mutex},
15    thread,
16};
17
18/// Check the exit status of each pipeline element.
19///
20/// This is used to implement pipefail.
21#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23    exit_status: Vec<Option<ExitStatusGuard>>,
24) -> Result<(), ShellError> {
25    for one_status in exit_status.into_iter().rev().flatten() {
26        check_exit_status_future_ok(one_status)?
27    }
28    Ok(())
29}
30
31fn check_exit_status_future_ok(exit_status_guard: ExitStatusGuard) -> Result<(), ShellError> {
32    let ignore_error = {
33        let guard = exit_status_guard
34            .ignore_error
35            .lock()
36            .expect("lock ignore_error should success");
37        *guard
38    };
39    let mut future = exit_status_guard
40        .exit_status_future
41        .lock()
42        .expect("lock exit_status_future should success");
43    let span = exit_status_guard.span.unwrap_or_default();
44    let exit_status = future.wait(span)?;
45    check_ok(exit_status, ignore_error, span)
46}
47
48pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
49    match status {
50        ExitStatus::Exited(exit_code) => {
51            if ignore_error {
52                Ok(())
53            } else if let Ok(exit_code) = exit_code.try_into() {
54                Err(ShellError::NonZeroExitCode { exit_code, span })
55            } else {
56                Ok(())
57            }
58        }
59        #[cfg(unix)]
60        ExitStatus::Signaled {
61            signal,
62            core_dumped,
63        } => {
64            use nix::sys::signal::Signal;
65
66            let sig = Signal::try_from(signal);
67
68            if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
69                // Processes often exit with SIGPIPE, but this is not an error condition.
70                Ok(())
71            } else {
72                let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
73                Err(if core_dumped {
74                    ShellError::CoreDumped {
75                        signal_name,
76                        signal,
77                        span,
78                    }
79                } else {
80                    ShellError::TerminatedBySignal {
81                        signal_name,
82                        signal,
83                        span,
84                    }
85                })
86            }
87        }
88    }
89}
90
91/// A wrapper for both `exit_status_future: Arc<Mutex<ExitStatusFuture>>`
92/// and `ignore_error: Arc<Mutex<bool>>`
93///
94/// It's useful for `pipefail` feature, which tracks exit status code with potentially
95/// ignore the error.
96#[derive(Debug)]
97pub struct ExitStatusGuard {
98    pub exit_status_future: Arc<Mutex<ExitStatusFuture>>,
99    pub ignore_error: Arc<Mutex<bool>>,
100    pub span: Option<Span>,
101}
102
103impl ExitStatusGuard {
104    pub fn new(
105        exit_status_future: Arc<Mutex<ExitStatusFuture>>,
106        ignore_error: Arc<Mutex<bool>>,
107    ) -> Self {
108        Self {
109            exit_status_future,
110            ignore_error,
111            span: None,
112        }
113    }
114
115    pub fn with_span(self, span: Span) -> Self {
116        Self {
117            exit_status_future: self.exit_status_future,
118            ignore_error: self.ignore_error,
119            span: Some(span),
120        }
121    }
122}
123
124#[derive(Debug)]
125pub enum ExitStatusFuture {
126    Finished(Result<ExitStatus, Box<ShellError>>),
127    Running(Receiver<io::Result<ExitStatus>>),
128}
129
130impl ExitStatusFuture {
131    pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
132        match self {
133            ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
134            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
135            ExitStatusFuture::Running(receiver) => {
136                let code = match receiver.recv() {
137                    #[cfg(unix)]
138                    Ok(Ok(
139                        status @ ExitStatus::Signaled {
140                            core_dumped: true, ..
141                        },
142                    )) => {
143                        check_ok(status, false, span)?;
144                        Ok(status)
145                    }
146                    Ok(Ok(status)) => Ok(status),
147                    Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
148                        err,
149                        span,
150                        None,
151                        "failed to get exit code",
152                    ))),
153                    Err(err @ RecvError) => Err(ShellError::Generic(GenericError::new(
154                        err.to_string(),
155                        "failed to get exit code",
156                        span,
157                    ))),
158                };
159
160                *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
161
162                code
163            }
164        }
165    }
166
167    fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
168        match self {
169            ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
170            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
171            ExitStatusFuture::Running(receiver) => {
172                let code = match receiver.try_recv() {
173                    Ok(Ok(status)) => Ok(Some(status)),
174                    Ok(Err(err)) => Err(ShellError::Generic(GenericError::new(
175                        err.to_string(),
176                        "failed to get exit code",
177                        span,
178                    ))),
179                    Err(TryRecvError::Disconnected) => Err(ShellError::Generic(GenericError::new(
180                        "receiver disconnected",
181                        "failed to get exit code",
182                        span,
183                    ))),
184                    Err(TryRecvError::Empty) => Ok(None),
185                };
186
187                if let Some(code) = code.clone().transpose() {
188                    *self = ExitStatusFuture::Finished(code.map_err(Box::new));
189                }
190
191                code
192            }
193        }
194    }
195}
196
197#[derive(derive_more::Debug)]
198pub enum ChildPipe {
199    #[debug("ChildPipe::Pipe")]
200    Pipe(PipeReader),
201
202    #[debug("ChildPipe::Tee")]
203    Tee(Box<dyn Read + Send + 'static>),
204}
205
206impl From<PipeReader> for ChildPipe {
207    fn from(pipe: PipeReader) -> Self {
208        Self::Pipe(pipe)
209    }
210}
211
212impl Read for ChildPipe {
213    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
214        match self {
215            ChildPipe::Pipe(pipe) => pipe.read(buf),
216            ChildPipe::Tee(tee) => tee.read(buf),
217        }
218    }
219}
220
221#[derive(Debug)]
222pub struct ChildProcess {
223    pub stdout: Option<ChildPipe>,
224    pub stderr: Option<ChildPipe>,
225    exit_status: Arc<Mutex<ExitStatusFuture>>,
226    ignore_error: Arc<Mutex<bool>>,
227    span: Span,
228}
229
230/// A wrapper for a closure that runs once the shell finishes waiting on the process.
231#[derive(derive_more::Debug)]
232#[debug("<wait_callback>")]
233pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
234
235impl PostWaitCallback {
236    pub fn new<F>(f: F) -> Self
237    where
238        F: FnOnce(ForegroundWaitStatus) + Send + 'static,
239    {
240        PostWaitCallback(Box::new(f))
241    }
242
243    /// Creates a PostWaitCallback that creates a frozen job in the job table
244    /// if the incoming wait status indicates that the job was frozen.
245    ///
246    /// If `child_pid` is provided, the returned callback will also remove
247    /// it from the pid list of the current running job.
248    ///
249    /// The given `description` argument will be used as the description for the newly created job table entry.
250    pub fn for_job_control(
251        engine_state: &EngineState,
252        child_pid: Option<u32>,
253        description: Option<String>,
254    ) -> Self {
255        let this_job = engine_state.current_thread_job().cloned();
256        let jobs = engine_state.jobs.clone();
257        let is_interactive = engine_state.is_interactive;
258
259        PostWaitCallback::new(move |status| {
260            if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
261                this_job.remove_pid(child_pid);
262            }
263
264            if let ForegroundWaitStatus::Frozen(unfreeze) = status {
265                let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
266
267                let job_id = jobs.add_job(Job::Frozen(FrozenJob {
268                    unfreeze,
269                    description,
270                }));
271
272                if is_interactive {
273                    println!("\nJob {} is frozen", job_id.get());
274                }
275            }
276        })
277    }
278}
279
280impl ChildProcess {
281    pub fn new(
282        mut child: ForegroundChild,
283        reader: Option<PipeReader>,
284        swap: bool,
285        span: Span,
286        callback: Option<PostWaitCallback>,
287    ) -> Result<Self, ShellError> {
288        let (stdout, stderr) = match reader {
289            Some(combined) => (Some(combined), None),
290            None => {
291                let stdout = child.as_mut().stdout.take().map(convert_file);
292                let stderr = child.as_mut().stderr.take().map(convert_file);
293
294                if swap {
295                    (stderr, stdout)
296                } else {
297                    (stdout, stderr)
298                }
299            }
300        };
301
302        // Create a thread to wait for the exit status.
303        let (exit_status_sender, exit_status) = mpsc::channel();
304
305        thread::Builder::new()
306            .name("exit status waiter".into())
307            .spawn(move || {
308                let matched = match child.wait() {
309                    // there are two possible outcomes when we `wait` for a process to finish:
310                    // 1. the process finishes as usual
311                    // 2. (unix only) the process gets signaled with SIGTSTP
312                    //
313                    // in the second case, although the process may still be alive in a
314                    // cryonic state, we explicitly treat as it has finished with exit code 0
315                    // for the sake of the current pipeline
316                    Ok(wait_status) => {
317                        let next = match &wait_status {
318                            ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
319                            ForegroundWaitStatus::Finished(exit_status) => *exit_status,
320                        };
321
322                        if let Some(callback) = callback {
323                            (callback.0)(wait_status);
324                        }
325
326                        Ok(next)
327                    }
328                    Err(err) => Err(err),
329                };
330
331                exit_status_sender.send(matched)
332            })
333            .map_err(|err| {
334                IoError::new_with_additional_context(
335                    err,
336                    span,
337                    None,
338                    "Could now spawn exit status waiter",
339                )
340            })?;
341
342        Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
343    }
344
345    pub fn from_raw(
346        stdout: Option<PipeReader>,
347        stderr: Option<PipeReader>,
348        exit_status: Option<Receiver<io::Result<ExitStatus>>>,
349        span: Span,
350    ) -> Self {
351        Self {
352            stdout: stdout.map(Into::into),
353            stderr: stderr.map(Into::into),
354            exit_status: Arc::new(Mutex::new(
355                exit_status
356                    .map(ExitStatusFuture::Running)
357                    .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
358            )),
359            ignore_error: Arc::new(Mutex::new(false)),
360            span,
361        }
362    }
363
364    pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
365        {
366            let mut ignore_error = self.ignore_error.lock().expect("lock should success");
367            *ignore_error = ignore;
368        }
369        self
370    }
371
372    pub fn span(&self) -> Span {
373        self.span
374    }
375
376    pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
377        if self.stderr.is_some() {
378            debug_assert!(false, "stderr should not exist");
379            return Err(ShellError::Generic(GenericError::new(
380                "internal error",
381                "stderr should not exist",
382                self.span,
383            )));
384        }
385
386        let bytes = (self.stdout)
387            .map(collect_bytes)
388            .transpose()
389            .map_err(|err| IoError::new(err, self.span, None))?
390            .unwrap_or_default();
391
392        let mut exit_status = self
393            .exit_status
394            .lock()
395            .expect("lock exit_status future should success");
396        let ignore_error = {
397            let guard = self
398                .ignore_error
399                .lock()
400                .expect("lock ignore error should success");
401            *guard
402        };
403        check_ok(exit_status.wait(self.span)?, ignore_error, self.span)?;
404
405        Ok(bytes)
406    }
407
408    pub fn wait(mut self) -> Result<(), ShellError> {
409        let from_io_error = IoError::factory(self.span, None);
410        if let Some(stdout) = self.stdout.take() {
411            let stderr = self
412                .stderr
413                .take()
414                .map(|stderr| {
415                    thread::Builder::new()
416                        .name("stderr consumer".into())
417                        .spawn(move || consume_pipe(stderr))
418                })
419                .transpose()
420                .map_err(&from_io_error)?;
421
422            let res = consume_pipe(stdout);
423
424            if let Some(handle) = stderr {
425                handle
426                    .join()
427                    .map_err(|e| match e.downcast::<io::Error>() {
428                        Ok(io) => from_io_error(*io).into(),
429                        Err(err) => ShellError::Generic(GenericError::new(
430                            "Unknown error",
431                            format!("{err:?}"),
432                            self.span,
433                        )),
434                    })?
435                    .map_err(&from_io_error)?;
436            }
437
438            res.map_err(&from_io_error)?;
439        } else if let Some(stderr) = self.stderr.take() {
440            consume_pipe(stderr).map_err(&from_io_error)?;
441        }
442        let mut exit_status = self
443            .exit_status
444            .lock()
445            .expect("lock exit_status future should success");
446        let ignore_error = {
447            let guard = self
448                .ignore_error
449                .lock()
450                .expect("lock ignore error should success");
451            *guard
452        };
453        check_ok(exit_status.wait(self.span)?, ignore_error, self.span)
454    }
455
456    pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
457        let mut exit_status = self
458            .exit_status
459            .lock()
460            .expect("lock exit_status future should success");
461        exit_status.try_wait(self.span)
462    }
463
464    pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
465        let from_io_error = IoError::factory(self.span, None);
466
467        let (stdout, stderr) = match (self.stdout, self.stderr) {
468            (None, None) => (None, None),
469            (None, Some(stderr)) => (None, Some(collect_bytes(stderr).map_err(&from_io_error)?)),
470            (Some(stdout), None) => (Some(collect_bytes(stdout).map_err(&from_io_error)?), None),
471            (Some(stdout), Some(stderr)) => {
472                let stderr = thread::Builder::new()
473                    .spawn(move || collect_bytes(stderr))
474                    .map_err(&from_io_error)?;
475
476                let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
477
478                let stderr = stderr
479                    .join()
480                    .map_err(|e| match e.downcast::<io::Error>() {
481                        Ok(io) => from_io_error(*io).into(),
482                        Err(err) => ShellError::Generic(GenericError::new(
483                            "Unknown error",
484                            format!("{err:?}"),
485                            self.span,
486                        )),
487                    })?
488                    .map_err(&from_io_error)?;
489
490                (Some(stdout), Some(stderr))
491            }
492        };
493
494        let mut exit_status = self
495            .exit_status
496            .lock()
497            .expect("lock exit_status future should success");
498        let exit_status = exit_status.wait(self.span)?;
499
500        Ok(ProcessOutput {
501            stdout,
502            stderr,
503            exit_status,
504        })
505    }
506
507    pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
508        self.exit_status.clone()
509    }
510
511    pub fn clone_ignore_error(&self) -> Arc<Mutex<bool>> {
512        self.ignore_error.clone()
513    }
514}
515
516fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
517    let mut buf = Vec::new();
518    match pipe {
519        ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
520        ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
521    }?;
522    Ok(buf)
523}
524
525fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
526    match pipe {
527        ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
528        ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
529    }?;
530    Ok(())
531}
532
533pub struct ProcessOutput {
534    pub stdout: Option<Vec<u8>>,
535    pub stderr: Option<Vec<u8>>,
536    pub exit_status: ExitStatus,
537}