1use crate::{
2 ShellError, Span,
3 byte_stream::convert_file,
4 engine::{EngineState, FrozenJob, Job},
5 shell_error::io::IoError,
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11 fmt::Debug,
12 io::{self, Read},
13 sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14 sync::{Arc, Mutex},
15 thread,
16};
17
18#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23 exit_status: Vec<Option<(Arc<Mutex<ExitStatusFuture>>, Span)>>,
24) -> Result<(), ShellError> {
25 for (future, span) in exit_status.into_iter().rev().flatten() {
26 check_exit_status_future_ok(future, span)?
27 }
28 Ok(())
29}
30
31fn check_exit_status_future_ok(
32 exit_status_future: Arc<Mutex<ExitStatusFuture>>,
33 span: Span,
34) -> Result<(), ShellError> {
35 let mut future = exit_status_future
36 .lock()
37 .expect("lock exit_status_future should success");
38 let exit_status = future.wait(span)?;
39 check_ok(exit_status, false, span)
40}
41
42pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
43 match status {
44 ExitStatus::Exited(exit_code) => {
45 if ignore_error {
46 Ok(())
47 } else if let Ok(exit_code) = exit_code.try_into() {
48 Err(ShellError::NonZeroExitCode { exit_code, span })
49 } else {
50 Ok(())
51 }
52 }
53 #[cfg(unix)]
54 ExitStatus::Signaled {
55 signal,
56 core_dumped,
57 } => {
58 use nix::sys::signal::Signal;
59
60 let sig = Signal::try_from(signal);
61
62 if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
63 Ok(())
65 } else {
66 let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
67 Err(if core_dumped {
68 ShellError::CoreDumped {
69 signal_name,
70 signal,
71 span,
72 }
73 } else {
74 ShellError::TerminatedBySignal {
75 signal_name,
76 signal,
77 span,
78 }
79 })
80 }
81 }
82 }
83}
84
85#[derive(Debug)]
86pub enum ExitStatusFuture {
87 Finished(Result<ExitStatus, Box<ShellError>>),
88 Running(Receiver<io::Result<ExitStatus>>),
89}
90
91impl ExitStatusFuture {
92 pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
93 match self {
94 ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
95 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
96 ExitStatusFuture::Running(receiver) => {
97 let code = match receiver.recv() {
98 #[cfg(unix)]
99 Ok(Ok(
100 status @ ExitStatus::Signaled {
101 core_dumped: true, ..
102 },
103 )) => {
104 check_ok(status, false, span)?;
105 Ok(status)
106 }
107 Ok(Ok(status)) => Ok(status),
108 Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
109 err,
110 span,
111 None,
112 "failed to get exit code",
113 ))),
114 Err(err @ RecvError) => Err(ShellError::GenericError {
115 error: err.to_string(),
116 msg: "failed to get exit code".into(),
117 span: span.into(),
118 help: None,
119 inner: vec![],
120 }),
121 };
122
123 *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
124
125 code
126 }
127 }
128 }
129
130 fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
131 match self {
132 ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
133 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
134 ExitStatusFuture::Running(receiver) => {
135 let code = match receiver.try_recv() {
136 Ok(Ok(status)) => Ok(Some(status)),
137 Ok(Err(err)) => Err(ShellError::GenericError {
138 error: err.to_string(),
139 msg: "failed to get exit code".to_string(),
140 span: span.into(),
141 help: None,
142 inner: vec![],
143 }),
144 Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
145 error: "receiver disconnected".to_string(),
146 msg: "failed to get exit code".into(),
147 span: span.into(),
148 help: None,
149 inner: vec![],
150 }),
151 Err(TryRecvError::Empty) => Ok(None),
152 };
153
154 if let Some(code) = code.clone().transpose() {
155 *self = ExitStatusFuture::Finished(code.map_err(Box::new));
156 }
157
158 code
159 }
160 }
161 }
162}
163
164pub enum ChildPipe {
165 Pipe(PipeReader),
166 Tee(Box<dyn Read + Send + 'static>),
167}
168
169impl Debug for ChildPipe {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("ChildPipe").finish()
172 }
173}
174
175impl From<PipeReader> for ChildPipe {
176 fn from(pipe: PipeReader) -> Self {
177 Self::Pipe(pipe)
178 }
179}
180
181impl Read for ChildPipe {
182 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
183 match self {
184 ChildPipe::Pipe(pipe) => pipe.read(buf),
185 ChildPipe::Tee(tee) => tee.read(buf),
186 }
187 }
188}
189
190#[derive(Debug)]
191pub struct ChildProcess {
192 pub stdout: Option<ChildPipe>,
193 pub stderr: Option<ChildPipe>,
194 exit_status: Arc<Mutex<ExitStatusFuture>>,
195 ignore_error: bool,
196 span: Span,
197}
198
199pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
201
202impl PostWaitCallback {
203 pub fn new<F>(f: F) -> Self
204 where
205 F: FnOnce(ForegroundWaitStatus) + Send + 'static,
206 {
207 PostWaitCallback(Box::new(f))
208 }
209
210 pub fn for_job_control(
218 engine_state: &EngineState,
219 child_pid: Option<u32>,
220 tag: Option<String>,
221 ) -> Self {
222 let this_job = engine_state.current_thread_job().cloned();
223 let jobs = engine_state.jobs.clone();
224 let is_interactive = engine_state.is_interactive;
225
226 PostWaitCallback::new(move |status| {
227 if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
228 this_job.remove_pid(child_pid);
229 }
230
231 if let ForegroundWaitStatus::Frozen(unfreeze) = status {
232 let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
233
234 let job_id = jobs.add_job(Job::Frozen(FrozenJob { unfreeze, tag }));
235
236 if is_interactive {
237 println!("\nJob {} is frozen", job_id.get());
238 }
239 }
240 })
241 }
242}
243
244impl Debug for PostWaitCallback {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 write!(f, "<wait_callback>")
247 }
248}
249
250impl ChildProcess {
251 pub fn new(
252 mut child: ForegroundChild,
253 reader: Option<PipeReader>,
254 swap: bool,
255 span: Span,
256 callback: Option<PostWaitCallback>,
257 ) -> Result<Self, ShellError> {
258 let (stdout, stderr) = if let Some(combined) = reader {
259 (Some(combined), None)
260 } else {
261 let stdout = child.as_mut().stdout.take().map(convert_file);
262 let stderr = child.as_mut().stderr.take().map(convert_file);
263
264 if swap {
265 (stderr, stdout)
266 } else {
267 (stdout, stderr)
268 }
269 };
270
271 let (exit_status_sender, exit_status) = mpsc::channel();
273
274 thread::Builder::new()
275 .name("exit status waiter".into())
276 .spawn(move || {
277 let matched = match child.wait() {
278 Ok(wait_status) => {
286 let next = match &wait_status {
287 ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
288 ForegroundWaitStatus::Finished(exit_status) => *exit_status,
289 };
290
291 if let Some(callback) = callback {
292 (callback.0)(wait_status);
293 }
294
295 Ok(next)
296 }
297 Err(err) => Err(err),
298 };
299
300 exit_status_sender.send(matched)
301 })
302 .map_err(|err| {
303 IoError::new_with_additional_context(
304 err,
305 span,
306 None,
307 "Could now spawn exit status waiter",
308 )
309 })?;
310
311 Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
312 }
313
314 pub fn from_raw(
315 stdout: Option<PipeReader>,
316 stderr: Option<PipeReader>,
317 exit_status: Option<Receiver<io::Result<ExitStatus>>>,
318 span: Span,
319 ) -> Self {
320 Self {
321 stdout: stdout.map(Into::into),
322 stderr: stderr.map(Into::into),
323 exit_status: Arc::new(Mutex::new(
324 exit_status
325 .map(ExitStatusFuture::Running)
326 .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
327 )),
328 ignore_error: false,
329 span,
330 }
331 }
332
333 pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
334 self.ignore_error = ignore;
335 self
336 }
337
338 pub fn span(&self) -> Span {
339 self.span
340 }
341
342 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
343 if self.stderr.is_some() {
344 debug_assert!(false, "stderr should not exist");
345 return Err(ShellError::GenericError {
346 error: "internal error".into(),
347 msg: "stderr should not exist".into(),
348 span: self.span.into(),
349 help: None,
350 inner: vec![],
351 });
352 }
353
354 let bytes = if let Some(stdout) = self.stdout {
355 collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
356 } else {
357 Vec::new()
358 };
359
360 let mut exit_status = self
361 .exit_status
362 .lock()
363 .expect("lock exit_status future should success");
364 check_ok(exit_status.wait(self.span)?, self.ignore_error, self.span)?;
365
366 Ok(bytes)
367 }
368
369 pub fn wait(mut self) -> Result<(), ShellError> {
370 let from_io_error = IoError::factory(self.span, None);
371 if let Some(stdout) = self.stdout.take() {
372 let stderr = self
373 .stderr
374 .take()
375 .map(|stderr| {
376 thread::Builder::new()
377 .name("stderr consumer".into())
378 .spawn(move || consume_pipe(stderr))
379 })
380 .transpose()
381 .map_err(&from_io_error)?;
382
383 let res = consume_pipe(stdout);
384
385 if let Some(handle) = stderr {
386 handle
387 .join()
388 .map_err(|e| match e.downcast::<io::Error>() {
389 Ok(io) => from_io_error(*io).into(),
390 Err(err) => ShellError::GenericError {
391 error: "Unknown error".into(),
392 msg: format!("{err:?}"),
393 span: Some(self.span),
394 help: None,
395 inner: Vec::new(),
396 },
397 })?
398 .map_err(&from_io_error)?;
399 }
400
401 res.map_err(&from_io_error)?;
402 } else if let Some(stderr) = self.stderr.take() {
403 consume_pipe(stderr).map_err(&from_io_error)?;
404 }
405 let mut exit_status = self
406 .exit_status
407 .lock()
408 .expect("lock exit_status future should success");
409 check_ok(exit_status.wait(self.span)?, self.ignore_error, self.span)
410 }
411
412 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
413 let mut exit_status = self
414 .exit_status
415 .lock()
416 .expect("lock exit_status future should success");
417 exit_status.try_wait(self.span)
418 }
419
420 pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
421 let from_io_error = IoError::factory(self.span, None);
422 let (stdout, stderr) = if let Some(stdout) = self.stdout {
423 let stderr = self
424 .stderr
425 .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
426 .transpose()
427 .map_err(&from_io_error)?;
428
429 let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
430
431 let stderr = stderr
432 .map(|handle| {
433 handle.join().map_err(|e| match e.downcast::<io::Error>() {
434 Ok(io) => from_io_error(*io).into(),
435 Err(err) => ShellError::GenericError {
436 error: "Unknown error".into(),
437 msg: format!("{err:?}"),
438 span: Some(self.span),
439 help: None,
440 inner: Vec::new(),
441 },
442 })
443 })
444 .transpose()?
445 .transpose()
446 .map_err(&from_io_error)?;
447
448 (Some(stdout), stderr)
449 } else {
450 let stderr = self
451 .stderr
452 .map(collect_bytes)
453 .transpose()
454 .map_err(&from_io_error)?;
455
456 (None, stderr)
457 };
458
459 let mut exit_status = self
460 .exit_status
461 .lock()
462 .expect("lock exit_status future should success");
463 let exit_status = exit_status.wait(self.span)?;
464
465 Ok(ProcessOutput {
466 stdout,
467 stderr,
468 exit_status,
469 })
470 }
471
472 pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
473 self.exit_status.clone()
474 }
475}
476
477fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
478 let mut buf = Vec::new();
479 match pipe {
480 ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
481 ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
482 }?;
483 Ok(buf)
484}
485
486fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
487 match pipe {
488 ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
489 ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
490 }?;
491 Ok(())
492}
493
494pub struct ProcessOutput {
495 pub stdout: Option<Vec<u8>>,
496 pub stderr: Option<Vec<u8>>,
497 pub exit_status: ExitStatus,
498}