nu_protocol/process/
child.rs

1use crate::{
2    ShellError, Span,
3    byte_stream::convert_file,
4    engine::{EngineState, FrozenJob, Job},
5    shell_error::io::IoError,
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11    fmt::Debug,
12    io::{self, Read},
13    sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14    thread,
15};
16
17pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
18    match status {
19        ExitStatus::Exited(exit_code) => {
20            if ignore_error {
21                Ok(())
22            } else if let Ok(exit_code) = exit_code.try_into() {
23                Err(ShellError::NonZeroExitCode { exit_code, span })
24            } else {
25                Ok(())
26            }
27        }
28        #[cfg(unix)]
29        ExitStatus::Signaled {
30            signal,
31            core_dumped,
32        } => {
33            use nix::sys::signal::Signal;
34
35            let sig = Signal::try_from(signal);
36
37            if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
38                // Processes often exit with SIGPIPE, but this is not an error condition.
39                Ok(())
40            } else {
41                let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
42                Err(if core_dumped {
43                    ShellError::CoreDumped {
44                        signal_name,
45                        signal,
46                        span,
47                    }
48                } else {
49                    ShellError::TerminatedBySignal {
50                        signal_name,
51                        signal,
52                        span,
53                    }
54                })
55            }
56        }
57    }
58}
59
60#[derive(Debug)]
61enum ExitStatusFuture {
62    Finished(Result<ExitStatus, Box<ShellError>>),
63    Running(Receiver<io::Result<ExitStatus>>),
64}
65
66impl ExitStatusFuture {
67    fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
68        match self {
69            ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
70            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
71            ExitStatusFuture::Running(receiver) => {
72                let code = match receiver.recv() {
73                    #[cfg(unix)]
74                    Ok(Ok(
75                        status @ ExitStatus::Signaled {
76                            core_dumped: true, ..
77                        },
78                    )) => {
79                        check_ok(status, false, span)?;
80                        Ok(status)
81                    }
82                    Ok(Ok(status)) => Ok(status),
83                    Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
84                        err,
85                        span,
86                        None,
87                        "failed to get exit code",
88                    ))),
89                    Err(err @ RecvError) => Err(ShellError::GenericError {
90                        error: err.to_string(),
91                        msg: "failed to get exit code".into(),
92                        span: span.into(),
93                        help: None,
94                        inner: vec![],
95                    }),
96                };
97
98                *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
99
100                code
101            }
102        }
103    }
104
105    fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
106        match self {
107            ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
108            ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
109            ExitStatusFuture::Running(receiver) => {
110                let code = match receiver.try_recv() {
111                    Ok(Ok(status)) => Ok(Some(status)),
112                    Ok(Err(err)) => Err(ShellError::GenericError {
113                        error: err.to_string(),
114                        msg: "failed to get exit code".to_string(),
115                        span: span.into(),
116                        help: None,
117                        inner: vec![],
118                    }),
119                    Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
120                        error: "receiver disconnected".to_string(),
121                        msg: "failed to get exit code".into(),
122                        span: span.into(),
123                        help: None,
124                        inner: vec![],
125                    }),
126                    Err(TryRecvError::Empty) => Ok(None),
127                };
128
129                if let Some(code) = code.clone().transpose() {
130                    *self = ExitStatusFuture::Finished(code.map_err(Box::new));
131                }
132
133                code
134            }
135        }
136    }
137}
138
139pub enum ChildPipe {
140    Pipe(PipeReader),
141    Tee(Box<dyn Read + Send + 'static>),
142}
143
144impl Debug for ChildPipe {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("ChildPipe").finish()
147    }
148}
149
150impl From<PipeReader> for ChildPipe {
151    fn from(pipe: PipeReader) -> Self {
152        Self::Pipe(pipe)
153    }
154}
155
156impl Read for ChildPipe {
157    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
158        match self {
159            ChildPipe::Pipe(pipe) => pipe.read(buf),
160            ChildPipe::Tee(tee) => tee.read(buf),
161        }
162    }
163}
164
165#[derive(Debug)]
166pub struct ChildProcess {
167    pub stdout: Option<ChildPipe>,
168    pub stderr: Option<ChildPipe>,
169    exit_status: ExitStatusFuture,
170    ignore_error: bool,
171    span: Span,
172}
173
174/// A wrapper for a closure that runs once the shell finishes waiting on the process.
175pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
176
177impl PostWaitCallback {
178    pub fn new<F>(f: F) -> Self
179    where
180        F: FnOnce(ForegroundWaitStatus) + Send + 'static,
181    {
182        PostWaitCallback(Box::new(f))
183    }
184
185    /// Creates a PostWaitCallback that creates a frozen job in the job table
186    /// if the incoming wait status indicates that the job was frozen.
187    ///
188    /// If `child_pid` is provided, the returned callback will also remove
189    /// it from the pid list of the current running job.
190    ///
191    /// The given `tag` argument will be used as the tag for the newly created job table entry.
192    pub fn for_job_control(
193        engine_state: &EngineState,
194        child_pid: Option<u32>,
195        tag: Option<String>,
196    ) -> Self {
197        let this_job = engine_state.current_thread_job().cloned();
198        let jobs = engine_state.jobs.clone();
199        let is_interactive = engine_state.is_interactive;
200
201        PostWaitCallback::new(move |status| {
202            if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
203                this_job.remove_pid(child_pid);
204            }
205
206            if let ForegroundWaitStatus::Frozen(unfreeze) = status {
207                let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
208
209                let job_id = jobs.add_job(Job::Frozen(FrozenJob { unfreeze, tag }));
210
211                if is_interactive {
212                    println!("\nJob {} is frozen", job_id.get());
213                }
214            }
215        })
216    }
217}
218
219impl Debug for PostWaitCallback {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        write!(f, "<wait_callback>")
222    }
223}
224
225impl ChildProcess {
226    pub fn new(
227        mut child: ForegroundChild,
228        reader: Option<PipeReader>,
229        swap: bool,
230        span: Span,
231        callback: Option<PostWaitCallback>,
232    ) -> Result<Self, ShellError> {
233        let (stdout, stderr) = if let Some(combined) = reader {
234            (Some(combined), None)
235        } else {
236            let stdout = child.as_mut().stdout.take().map(convert_file);
237            let stderr = child.as_mut().stderr.take().map(convert_file);
238
239            if swap {
240                (stderr, stdout)
241            } else {
242                (stdout, stderr)
243            }
244        };
245
246        // Create a thread to wait for the exit status.
247        let (exit_status_sender, exit_status) = mpsc::channel();
248
249        thread::Builder::new()
250            .name("exit status waiter".into())
251            .spawn(move || {
252                let matched = match child.wait() {
253                    // there are two possible outcomes when we `wait` for a process to finish:
254                    // 1. the process finishes as usual
255                    // 2. (unix only) the process gets signaled with SIGTSTP
256                    //
257                    // in the second case, although the process may still be alive in a
258                    // cryonic state, we explicitly treat as it has finished with exit code 0
259                    // for the sake of the current pipeline
260                    Ok(wait_status) => {
261                        let next = match &wait_status {
262                            ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
263                            ForegroundWaitStatus::Finished(exit_status) => *exit_status,
264                        };
265
266                        if let Some(callback) = callback {
267                            (callback.0)(wait_status);
268                        }
269
270                        Ok(next)
271                    }
272                    Err(err) => Err(err),
273                };
274
275                exit_status_sender.send(matched)
276            })
277            .map_err(|err| {
278                IoError::new_with_additional_context(
279                    err,
280                    span,
281                    None,
282                    "Could now spawn exit status waiter",
283                )
284            })?;
285
286        Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
287    }
288
289    pub fn from_raw(
290        stdout: Option<PipeReader>,
291        stderr: Option<PipeReader>,
292        exit_status: Option<Receiver<io::Result<ExitStatus>>>,
293        span: Span,
294    ) -> Self {
295        Self {
296            stdout: stdout.map(Into::into),
297            stderr: stderr.map(Into::into),
298            exit_status: exit_status
299                .map(ExitStatusFuture::Running)
300                .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
301            ignore_error: false,
302            span,
303        }
304    }
305
306    pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
307        self.ignore_error = ignore;
308        self
309    }
310
311    pub fn span(&self) -> Span {
312        self.span
313    }
314
315    pub fn into_bytes(mut self) -> Result<Vec<u8>, ShellError> {
316        if self.stderr.is_some() {
317            debug_assert!(false, "stderr should not exist");
318            return Err(ShellError::GenericError {
319                error: "internal error".into(),
320                msg: "stderr should not exist".into(),
321                span: self.span.into(),
322                help: None,
323                inner: vec![],
324            });
325        }
326
327        let bytes = if let Some(stdout) = self.stdout {
328            collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
329        } else {
330            Vec::new()
331        };
332
333        check_ok(
334            self.exit_status.wait(self.span)?,
335            self.ignore_error,
336            self.span,
337        )?;
338
339        Ok(bytes)
340    }
341
342    pub fn wait(mut self) -> Result<(), ShellError> {
343        let from_io_error = IoError::factory(self.span, None);
344        if let Some(stdout) = self.stdout.take() {
345            let stderr = self
346                .stderr
347                .take()
348                .map(|stderr| {
349                    thread::Builder::new()
350                        .name("stderr consumer".into())
351                        .spawn(move || consume_pipe(stderr))
352                })
353                .transpose()
354                .map_err(&from_io_error)?;
355
356            let res = consume_pipe(stdout);
357
358            if let Some(handle) = stderr {
359                handle
360                    .join()
361                    .map_err(|e| match e.downcast::<io::Error>() {
362                        Ok(io) => from_io_error(*io).into(),
363                        Err(err) => ShellError::GenericError {
364                            error: "Unknown error".into(),
365                            msg: format!("{err:?}"),
366                            span: Some(self.span),
367                            help: None,
368                            inner: Vec::new(),
369                        },
370                    })?
371                    .map_err(&from_io_error)?;
372            }
373
374            res.map_err(&from_io_error)?;
375        } else if let Some(stderr) = self.stderr.take() {
376            consume_pipe(stderr).map_err(&from_io_error)?;
377        }
378
379        check_ok(
380            self.exit_status.wait(self.span)?,
381            self.ignore_error,
382            self.span,
383        )
384    }
385
386    pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
387        self.exit_status.try_wait(self.span)
388    }
389
390    pub fn wait_with_output(mut self) -> Result<ProcessOutput, ShellError> {
391        let from_io_error = IoError::factory(self.span, None);
392        let (stdout, stderr) = if let Some(stdout) = self.stdout {
393            let stderr = self
394                .stderr
395                .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
396                .transpose()
397                .map_err(&from_io_error)?;
398
399            let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
400
401            let stderr = stderr
402                .map(|handle| {
403                    handle.join().map_err(|e| match e.downcast::<io::Error>() {
404                        Ok(io) => from_io_error(*io).into(),
405                        Err(err) => ShellError::GenericError {
406                            error: "Unknown error".into(),
407                            msg: format!("{err:?}"),
408                            span: Some(self.span),
409                            help: None,
410                            inner: Vec::new(),
411                        },
412                    })
413                })
414                .transpose()?
415                .transpose()
416                .map_err(&from_io_error)?;
417
418            (Some(stdout), stderr)
419        } else {
420            let stderr = self
421                .stderr
422                .map(collect_bytes)
423                .transpose()
424                .map_err(&from_io_error)?;
425
426            (None, stderr)
427        };
428
429        let exit_status = self.exit_status.wait(self.span)?;
430
431        Ok(ProcessOutput {
432            stdout,
433            stderr,
434            exit_status,
435        })
436    }
437}
438
439fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
440    let mut buf = Vec::new();
441    match pipe {
442        ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
443        ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
444    }?;
445    Ok(buf)
446}
447
448fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
449    match pipe {
450        ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
451        ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
452    }?;
453    Ok(())
454}
455
456pub struct ProcessOutput {
457    pub stdout: Option<Vec<u8>>,
458    pub stderr: Option<Vec<u8>>,
459    pub exit_status: ExitStatus,
460}