1use crate::{
2 ShellError, Span,
3 byte_stream::convert_file,
4 engine::{EngineState, FrozenJob, Job},
5 shell_error::io::IoError,
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11 fmt::Debug,
12 io::{self, Read},
13 sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14 thread,
15};
16
17pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
18 match status {
19 ExitStatus::Exited(exit_code) => {
20 if ignore_error {
21 Ok(())
22 } else if let Ok(exit_code) = exit_code.try_into() {
23 Err(ShellError::NonZeroExitCode { exit_code, span })
24 } else {
25 Ok(())
26 }
27 }
28 #[cfg(unix)]
29 ExitStatus::Signaled {
30 signal,
31 core_dumped,
32 } => {
33 use nix::sys::signal::Signal;
34
35 let sig = Signal::try_from(signal);
36
37 if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
38 Ok(())
40 } else {
41 let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
42 Err(if core_dumped {
43 ShellError::CoreDumped {
44 signal_name,
45 signal,
46 span,
47 }
48 } else {
49 ShellError::TerminatedBySignal {
50 signal_name,
51 signal,
52 span,
53 }
54 })
55 }
56 }
57 }
58}
59
60#[derive(Debug)]
61enum ExitStatusFuture {
62 Finished(Result<ExitStatus, Box<ShellError>>),
63 Running(Receiver<io::Result<ExitStatus>>),
64}
65
66impl ExitStatusFuture {
67 fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
68 match self {
69 ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
70 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
71 ExitStatusFuture::Running(receiver) => {
72 let code = match receiver.recv() {
73 #[cfg(unix)]
74 Ok(Ok(
75 status @ ExitStatus::Signaled {
76 core_dumped: true, ..
77 },
78 )) => {
79 check_ok(status, false, span)?;
80 Ok(status)
81 }
82 Ok(Ok(status)) => Ok(status),
83 Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
84 err,
85 span,
86 None,
87 "failed to get exit code",
88 ))),
89 Err(err @ RecvError) => Err(ShellError::GenericError {
90 error: err.to_string(),
91 msg: "failed to get exit code".into(),
92 span: span.into(),
93 help: None,
94 inner: vec![],
95 }),
96 };
97
98 *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
99
100 code
101 }
102 }
103 }
104
105 fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
106 match self {
107 ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
108 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
109 ExitStatusFuture::Running(receiver) => {
110 let code = match receiver.try_recv() {
111 Ok(Ok(status)) => Ok(Some(status)),
112 Ok(Err(err)) => Err(ShellError::GenericError {
113 error: err.to_string(),
114 msg: "failed to get exit code".to_string(),
115 span: span.into(),
116 help: None,
117 inner: vec![],
118 }),
119 Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
120 error: "receiver disconnected".to_string(),
121 msg: "failed to get exit code".into(),
122 span: span.into(),
123 help: None,
124 inner: vec![],
125 }),
126 Err(TryRecvError::Empty) => Ok(None),
127 };
128
129 if let Some(code) = code.clone().transpose() {
130 *self = ExitStatusFuture::Finished(code.map_err(Box::new));
131 }
132
133 code
134 }
135 }
136 }
137}
138
139pub enum ChildPipe {
140 Pipe(PipeReader),
141 Tee(Box<dyn Read + Send + 'static>),
142}
143
144impl Debug for ChildPipe {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ChildPipe").finish()
147 }
148}
149
150impl From<PipeReader> for ChildPipe {
151 fn from(pipe: PipeReader) -> Self {
152 Self::Pipe(pipe)
153 }
154}
155
156impl Read for ChildPipe {
157 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
158 match self {
159 ChildPipe::Pipe(pipe) => pipe.read(buf),
160 ChildPipe::Tee(tee) => tee.read(buf),
161 }
162 }
163}
164
165#[derive(Debug)]
166pub struct ChildProcess {
167 pub stdout: Option<ChildPipe>,
168 pub stderr: Option<ChildPipe>,
169 exit_status: ExitStatusFuture,
170 ignore_error: bool,
171 span: Span,
172}
173
174pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
176
177impl PostWaitCallback {
178 pub fn new<F>(f: F) -> Self
179 where
180 F: FnOnce(ForegroundWaitStatus) + Send + 'static,
181 {
182 PostWaitCallback(Box::new(f))
183 }
184
185 pub fn for_job_control(
193 engine_state: &EngineState,
194 child_pid: Option<u32>,
195 tag: Option<String>,
196 ) -> Self {
197 let this_job = engine_state.current_thread_job().cloned();
198 let jobs = engine_state.jobs.clone();
199 let is_interactive = engine_state.is_interactive;
200
201 PostWaitCallback::new(move |status| {
202 if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
203 this_job.remove_pid(child_pid);
204 }
205
206 if let ForegroundWaitStatus::Frozen(unfreeze) = status {
207 let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
208
209 let job_id = jobs.add_job(Job::Frozen(FrozenJob { unfreeze, tag }));
210
211 if is_interactive {
212 println!("\nJob {} is frozen", job_id.get());
213 }
214 }
215 })
216 }
217}
218
219impl Debug for PostWaitCallback {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 write!(f, "<wait_callback>")
222 }
223}
224
225impl ChildProcess {
226 pub fn new(
227 mut child: ForegroundChild,
228 reader: Option<PipeReader>,
229 swap: bool,
230 span: Span,
231 callback: Option<PostWaitCallback>,
232 ) -> Result<Self, ShellError> {
233 let (stdout, stderr) = if let Some(combined) = reader {
234 (Some(combined), None)
235 } else {
236 let stdout = child.as_mut().stdout.take().map(convert_file);
237 let stderr = child.as_mut().stderr.take().map(convert_file);
238
239 if swap {
240 (stderr, stdout)
241 } else {
242 (stdout, stderr)
243 }
244 };
245
246 let (exit_status_sender, exit_status) = mpsc::channel();
248
249 thread::Builder::new()
250 .name("exit status waiter".into())
251 .spawn(move || {
252 let matched = match child.wait() {
253 Ok(wait_status) => {
261 let next = match &wait_status {
262 ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
263 ForegroundWaitStatus::Finished(exit_status) => *exit_status,
264 };
265
266 if let Some(callback) = callback {
267 (callback.0)(wait_status);
268 }
269
270 Ok(next)
271 }
272 Err(err) => Err(err),
273 };
274
275 exit_status_sender.send(matched)
276 })
277 .map_err(|err| {
278 IoError::new_with_additional_context(
279 err,
280 span,
281 None,
282 "Could now spawn exit status waiter",
283 )
284 })?;
285
286 Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
287 }
288
289 pub fn from_raw(
290 stdout: Option<PipeReader>,
291 stderr: Option<PipeReader>,
292 exit_status: Option<Receiver<io::Result<ExitStatus>>>,
293 span: Span,
294 ) -> Self {
295 Self {
296 stdout: stdout.map(Into::into),
297 stderr: stderr.map(Into::into),
298 exit_status: exit_status
299 .map(ExitStatusFuture::Running)
300 .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
301 ignore_error: false,
302 span,
303 }
304 }
305
306 pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
307 self.ignore_error = ignore;
308 self
309 }
310
311 pub fn span(&self) -> Span {
312 self.span
313 }
314
315 pub fn into_bytes(mut self) -> Result<Vec<u8>, ShellError> {
316 if self.stderr.is_some() {
317 debug_assert!(false, "stderr should not exist");
318 return Err(ShellError::GenericError {
319 error: "internal error".into(),
320 msg: "stderr should not exist".into(),
321 span: self.span.into(),
322 help: None,
323 inner: vec![],
324 });
325 }
326
327 let bytes = if let Some(stdout) = self.stdout {
328 collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
329 } else {
330 Vec::new()
331 };
332
333 check_ok(
334 self.exit_status.wait(self.span)?,
335 self.ignore_error,
336 self.span,
337 )?;
338
339 Ok(bytes)
340 }
341
342 pub fn wait(mut self) -> Result<(), ShellError> {
343 let from_io_error = IoError::factory(self.span, None);
344 if let Some(stdout) = self.stdout.take() {
345 let stderr = self
346 .stderr
347 .take()
348 .map(|stderr| {
349 thread::Builder::new()
350 .name("stderr consumer".into())
351 .spawn(move || consume_pipe(stderr))
352 })
353 .transpose()
354 .map_err(&from_io_error)?;
355
356 let res = consume_pipe(stdout);
357
358 if let Some(handle) = stderr {
359 handle
360 .join()
361 .map_err(|e| match e.downcast::<io::Error>() {
362 Ok(io) => from_io_error(*io).into(),
363 Err(err) => ShellError::GenericError {
364 error: "Unknown error".into(),
365 msg: format!("{err:?}"),
366 span: Some(self.span),
367 help: None,
368 inner: Vec::new(),
369 },
370 })?
371 .map_err(&from_io_error)?;
372 }
373
374 res.map_err(&from_io_error)?;
375 } else if let Some(stderr) = self.stderr.take() {
376 consume_pipe(stderr).map_err(&from_io_error)?;
377 }
378
379 check_ok(
380 self.exit_status.wait(self.span)?,
381 self.ignore_error,
382 self.span,
383 )
384 }
385
386 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
387 self.exit_status.try_wait(self.span)
388 }
389
390 pub fn wait_with_output(mut self) -> Result<ProcessOutput, ShellError> {
391 let from_io_error = IoError::factory(self.span, None);
392 let (stdout, stderr) = if let Some(stdout) = self.stdout {
393 let stderr = self
394 .stderr
395 .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
396 .transpose()
397 .map_err(&from_io_error)?;
398
399 let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
400
401 let stderr = stderr
402 .map(|handle| {
403 handle.join().map_err(|e| match e.downcast::<io::Error>() {
404 Ok(io) => from_io_error(*io).into(),
405 Err(err) => ShellError::GenericError {
406 error: "Unknown error".into(),
407 msg: format!("{err:?}"),
408 span: Some(self.span),
409 help: None,
410 inner: Vec::new(),
411 },
412 })
413 })
414 .transpose()?
415 .transpose()
416 .map_err(&from_io_error)?;
417
418 (Some(stdout), stderr)
419 } else {
420 let stderr = self
421 .stderr
422 .map(collect_bytes)
423 .transpose()
424 .map_err(&from_io_error)?;
425
426 (None, stderr)
427 };
428
429 let exit_status = self.exit_status.wait(self.span)?;
430
431 Ok(ProcessOutput {
432 stdout,
433 stderr,
434 exit_status,
435 })
436 }
437}
438
439fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
440 let mut buf = Vec::new();
441 match pipe {
442 ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
443 ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
444 }?;
445 Ok(buf)
446}
447
448fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
449 match pipe {
450 ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
451 ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
452 }?;
453 Ok(())
454}
455
456pub struct ProcessOutput {
457 pub stdout: Option<Vec<u8>>,
458 pub stderr: Option<Vec<u8>>,
459 pub exit_status: ExitStatus,
460}