Skip to main content

nu_protocol/process/
child.rs

1use crate::{
2    ShellError, Span,
3    byte_stream::convert_file,
4    engine::{EngineState, FrozenJob, Job},
5    shell_error::io::IoError,
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11    fmt::Debug,
12    io::{self, Read},
13    sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14    sync::{Arc, Mutex},
15    thread,
16};
17
18/// Check the exit status of each pipeline element.
19///
20/// This is used to implement pipefail.
21#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23    exit_status: Vec<Option<ExitStatusGuard>>,
24) -> Result<(), ShellError> {
25    for one_status in exit_status.into_iter().rev().flatten() {
26        check_exit_status_future_ok(one_status)?
27    }
28    Ok(())
29}
30
31fn check_exit_status_future_ok(exit_status_guard: ExitStatusGuard) -> Result<(), ShellError> {
32    let ignore_error = {
33        let guard = exit_status_guard
34            .ignore_error
35            .lock()
36            .expect("lock ignore_error should success");
37        *guard
38    };
39    let mut future = exit_status_guard
40        .exit_status_future
41        .lock()
42        .expect("lock exit_status_future should success");
43    let span = exit_status_guard.span.unwrap_or_default();
44    let exit_status = future.wait(span)?;
45    check_ok(exit_status, ignore_error, span)
46}
47
48pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
49    match status {
50        ExitStatus::Exited(exit_code) => {
51            if ignore_error {
52                Ok(())
53            } else if let Ok(exit_code) = exit_code.try_into() {
54                Err(ShellError::NonZeroExitCode { exit_code, span })
55            } else {
56                Ok(())
57            }
58        }
59        #[cfg(unix)]
60        ExitStatus::Signaled {
61            signal,
62            core_dumped,
63        } => {
64            use nix::sys::signal::Signal;
65
66            let sig = Signal::try_from(signal);
67
68            if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
69                // Processes often exit with SIGPIPE, but this is not an error condition.
70                Ok(())
71            } else {
72                let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
73                Err(if core_dumped {
74                    ShellError::CoreDumped {
75                        signal_name,
76                        signal,
77                        span,
78                    }
79                } else {
80                    ShellError::TerminatedBySignal {
81                        signal_name,
82                        signal,
83                        span,
84                    }
85                })
86            }
87        }
88    }
89}
90
91/// A wrapper for both `exit_status_future: Arc<Mutex<ExitStatusFuture>>`
92/// and `ignore_error: Arc<Mutex<bool>>`
93///
94/// It's useful for `pipefail` feature, which tracks exit status code with potentially
95/// ignore the error.
96#[derive(Debug)]
97pub struct ExitStatusGuard {
98    pub exit_status_future: Arc<Mutex<ExitStatusFuture>>,
99    pub ignore_error: Arc<Mutex<bool>>,
100    pub span: Option<Span>,
101}
102
103impl ExitStatusGuard {
104    pub fn new(
105        exit_status_future: Arc<Mutex<ExitStatusFuture>>,
106        ignore_error: Arc<Mutex<bool>>,
107    ) -> Self {
108        Self {
109            exit_status_future,
110            ignore_error,
111            span: None,
112        }
113    }
114
115    pub fn with_span(self, span: Span) -> Self {
116        Self {
117            exit_status_future: self.exit_status_future,
118            ignore_error: self.ignore_error,
119            span: Some(span),
120        }
121    }
122}
123
124#[derive(Debug)]
125pub enum ExitStatusFuture {
126    Finished(Result<ExitStatus, Box<ShellError>>),
127    Running(Receiver<io::Result<ExitStatus>>),
128}
129
130impl ExitStatusFuture {
131    pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
132        match self {
133            ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
134            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
135            ExitStatusFuture::Running(receiver) => {
136                let code = match receiver.recv() {
137                    #[cfg(unix)]
138                    Ok(Ok(
139                        status @ ExitStatus::Signaled {
140                            core_dumped: true, ..
141                        },
142                    )) => {
143                        check_ok(status, false, span)?;
144                        Ok(status)
145                    }
146                    Ok(Ok(status)) => Ok(status),
147                    Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
148                        err,
149                        span,
150                        None,
151                        "failed to get exit code",
152                    ))),
153                    Err(err @ RecvError) => Err(ShellError::GenericError {
154                        error: err.to_string(),
155                        msg: "failed to get exit code".into(),
156                        span: span.into(),
157                        help: None,
158                        inner: vec![],
159                    }),
160                };
161
162                *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
163
164                code
165            }
166        }
167    }
168
169    fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
170        match self {
171            ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
172            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
173            ExitStatusFuture::Running(receiver) => {
174                let code = match receiver.try_recv() {
175                    Ok(Ok(status)) => Ok(Some(status)),
176                    Ok(Err(err)) => Err(ShellError::GenericError {
177                        error: err.to_string(),
178                        msg: "failed to get exit code".to_string(),
179                        span: span.into(),
180                        help: None,
181                        inner: vec![],
182                    }),
183                    Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
184                        error: "receiver disconnected".to_string(),
185                        msg: "failed to get exit code".into(),
186                        span: span.into(),
187                        help: None,
188                        inner: vec![],
189                    }),
190                    Err(TryRecvError::Empty) => Ok(None),
191                };
192
193                if let Some(code) = code.clone().transpose() {
194                    *self = ExitStatusFuture::Finished(code.map_err(Box::new));
195                }
196
197                code
198            }
199        }
200    }
201}
202
203pub enum ChildPipe {
204    Pipe(PipeReader),
205    Tee(Box<dyn Read + Send + 'static>),
206}
207
208impl Debug for ChildPipe {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("ChildPipe").finish()
211    }
212}
213
214impl From<PipeReader> for ChildPipe {
215    fn from(pipe: PipeReader) -> Self {
216        Self::Pipe(pipe)
217    }
218}
219
220impl Read for ChildPipe {
221    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
222        match self {
223            ChildPipe::Pipe(pipe) => pipe.read(buf),
224            ChildPipe::Tee(tee) => tee.read(buf),
225        }
226    }
227}
228
229#[derive(Debug)]
230pub struct ChildProcess {
231    pub stdout: Option<ChildPipe>,
232    pub stderr: Option<ChildPipe>,
233    exit_status: Arc<Mutex<ExitStatusFuture>>,
234    ignore_error: Arc<Mutex<bool>>,
235    span: Span,
236}
237
238/// A wrapper for a closure that runs once the shell finishes waiting on the process.
239pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
240
241impl PostWaitCallback {
242    pub fn new<F>(f: F) -> Self
243    where
244        F: FnOnce(ForegroundWaitStatus) + Send + 'static,
245    {
246        PostWaitCallback(Box::new(f))
247    }
248
249    /// Creates a PostWaitCallback that creates a frozen job in the job table
250    /// if the incoming wait status indicates that the job was frozen.
251    ///
252    /// If `child_pid` is provided, the returned callback will also remove
253    /// it from the pid list of the current running job.
254    ///
255    /// The given `tag` argument will be used as the tag for the newly created job table entry.
256    pub fn for_job_control(
257        engine_state: &EngineState,
258        child_pid: Option<u32>,
259        tag: Option<String>,
260    ) -> Self {
261        let this_job = engine_state.current_thread_job().cloned();
262        let jobs = engine_state.jobs.clone();
263        let is_interactive = engine_state.is_interactive;
264
265        PostWaitCallback::new(move |status| {
266            if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
267                this_job.remove_pid(child_pid);
268            }
269
270            if let ForegroundWaitStatus::Frozen(unfreeze) = status {
271                let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
272
273                let job_id = jobs.add_job(Job::Frozen(FrozenJob { unfreeze, tag }));
274
275                if is_interactive {
276                    println!("\nJob {} is frozen", job_id.get());
277                }
278            }
279        })
280    }
281}
282
283impl Debug for PostWaitCallback {
284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285        write!(f, "<wait_callback>")
286    }
287}
288
289impl ChildProcess {
290    pub fn new(
291        mut child: ForegroundChild,
292        reader: Option<PipeReader>,
293        swap: bool,
294        span: Span,
295        callback: Option<PostWaitCallback>,
296    ) -> Result<Self, ShellError> {
297        let (stdout, stderr) = if let Some(combined) = reader {
298            (Some(combined), None)
299        } else {
300            let stdout = child.as_mut().stdout.take().map(convert_file);
301            let stderr = child.as_mut().stderr.take().map(convert_file);
302
303            if swap {
304                (stderr, stdout)
305            } else {
306                (stdout, stderr)
307            }
308        };
309
310        // Create a thread to wait for the exit status.
311        let (exit_status_sender, exit_status) = mpsc::channel();
312
313        thread::Builder::new()
314            .name("exit status waiter".into())
315            .spawn(move || {
316                let matched = match child.wait() {
317                    // there are two possible outcomes when we `wait` for a process to finish:
318                    // 1. the process finishes as usual
319                    // 2. (unix only) the process gets signaled with SIGTSTP
320                    //
321                    // in the second case, although the process may still be alive in a
322                    // cryonic state, we explicitly treat as it has finished with exit code 0
323                    // for the sake of the current pipeline
324                    Ok(wait_status) => {
325                        let next = match &wait_status {
326                            ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
327                            ForegroundWaitStatus::Finished(exit_status) => *exit_status,
328                        };
329
330                        if let Some(callback) = callback {
331                            (callback.0)(wait_status);
332                        }
333
334                        Ok(next)
335                    }
336                    Err(err) => Err(err),
337                };
338
339                exit_status_sender.send(matched)
340            })
341            .map_err(|err| {
342                IoError::new_with_additional_context(
343                    err,
344                    span,
345                    None,
346                    "Could now spawn exit status waiter",
347                )
348            })?;
349
350        Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
351    }
352
353    pub fn from_raw(
354        stdout: Option<PipeReader>,
355        stderr: Option<PipeReader>,
356        exit_status: Option<Receiver<io::Result<ExitStatus>>>,
357        span: Span,
358    ) -> Self {
359        Self {
360            stdout: stdout.map(Into::into),
361            stderr: stderr.map(Into::into),
362            exit_status: Arc::new(Mutex::new(
363                exit_status
364                    .map(ExitStatusFuture::Running)
365                    .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
366            )),
367            ignore_error: Arc::new(Mutex::new(false)),
368            span,
369        }
370    }
371
372    pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
373        {
374            let mut ignore_error = self.ignore_error.lock().expect("lock should success");
375            *ignore_error = ignore;
376        }
377        self
378    }
379
380    pub fn span(&self) -> Span {
381        self.span
382    }
383
384    pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
385        if self.stderr.is_some() {
386            debug_assert!(false, "stderr should not exist");
387            return Err(ShellError::GenericError {
388                error: "internal error".into(),
389                msg: "stderr should not exist".into(),
390                span: self.span.into(),
391                help: None,
392                inner: vec![],
393            });
394        }
395
396        let bytes = if let Some(stdout) = self.stdout {
397            collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
398        } else {
399            Vec::new()
400        };
401
402        let mut exit_status = self
403            .exit_status
404            .lock()
405            .expect("lock exit_status future should success");
406        let ignore_error = {
407            let guard = self
408                .ignore_error
409                .lock()
410                .expect("lock ignore error should success");
411            *guard
412        };
413        check_ok(exit_status.wait(self.span)?, ignore_error, self.span)?;
414
415        Ok(bytes)
416    }
417
418    pub fn wait(mut self) -> Result<(), ShellError> {
419        let from_io_error = IoError::factory(self.span, None);
420        if let Some(stdout) = self.stdout.take() {
421            let stderr = self
422                .stderr
423                .take()
424                .map(|stderr| {
425                    thread::Builder::new()
426                        .name("stderr consumer".into())
427                        .spawn(move || consume_pipe(stderr))
428                })
429                .transpose()
430                .map_err(&from_io_error)?;
431
432            let res = consume_pipe(stdout);
433
434            if let Some(handle) = stderr {
435                handle
436                    .join()
437                    .map_err(|e| match e.downcast::<io::Error>() {
438                        Ok(io) => from_io_error(*io).into(),
439                        Err(err) => ShellError::GenericError {
440                            error: "Unknown error".into(),
441                            msg: format!("{err:?}"),
442                            span: Some(self.span),
443                            help: None,
444                            inner: Vec::new(),
445                        },
446                    })?
447                    .map_err(&from_io_error)?;
448            }
449
450            res.map_err(&from_io_error)?;
451        } else if let Some(stderr) = self.stderr.take() {
452            consume_pipe(stderr).map_err(&from_io_error)?;
453        }
454        let mut exit_status = self
455            .exit_status
456            .lock()
457            .expect("lock exit_status future should success");
458        let ignore_error = {
459            let guard = self
460                .ignore_error
461                .lock()
462                .expect("lock ignore error should success");
463            *guard
464        };
465        check_ok(exit_status.wait(self.span)?, ignore_error, self.span)
466    }
467
468    pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
469        let mut exit_status = self
470            .exit_status
471            .lock()
472            .expect("lock exit_status future should success");
473        exit_status.try_wait(self.span)
474    }
475
476    pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
477        let from_io_error = IoError::factory(self.span, None);
478        let (stdout, stderr) = if let Some(stdout) = self.stdout {
479            let stderr = self
480                .stderr
481                .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
482                .transpose()
483                .map_err(&from_io_error)?;
484
485            let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
486
487            let stderr = stderr
488                .map(|handle| {
489                    handle.join().map_err(|e| match e.downcast::<io::Error>() {
490                        Ok(io) => from_io_error(*io).into(),
491                        Err(err) => ShellError::GenericError {
492                            error: "Unknown error".into(),
493                            msg: format!("{err:?}"),
494                            span: Some(self.span),
495                            help: None,
496                            inner: Vec::new(),
497                        },
498                    })
499                })
500                .transpose()?
501                .transpose()
502                .map_err(&from_io_error)?;
503
504            (Some(stdout), stderr)
505        } else {
506            let stderr = self
507                .stderr
508                .map(collect_bytes)
509                .transpose()
510                .map_err(&from_io_error)?;
511
512            (None, stderr)
513        };
514
515        let mut exit_status = self
516            .exit_status
517            .lock()
518            .expect("lock exit_status future should success");
519        let exit_status = exit_status.wait(self.span)?;
520
521        Ok(ProcessOutput {
522            stdout,
523            stderr,
524            exit_status,
525        })
526    }
527
528    pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
529        self.exit_status.clone()
530    }
531
532    pub fn clone_ignore_error(&self) -> Arc<Mutex<bool>> {
533        self.ignore_error.clone()
534    }
535}
536
537fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
538    let mut buf = Vec::new();
539    match pipe {
540        ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
541        ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
542    }?;
543    Ok(buf)
544}
545
546fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
547    match pipe {
548        ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
549        ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
550    }?;
551    Ok(())
552}
553
554pub struct ProcessOutput {
555    pub stdout: Option<Vec<u8>>,
556    pub stderr: Option<Vec<u8>>,
557    pub exit_status: ExitStatus,
558}