1use crate::{
2 ShellError, Span,
3 byte_stream::convert_file,
4 engine::{EngineState, FrozenJob, Job},
5 shell_error::{generic::GenericError, io::IoError},
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11 fmt::Debug,
12 io::{self, Read},
13 sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14 sync::{Arc, Mutex},
15 thread,
16};
17
18#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23 exit_status: Vec<Option<ExitStatusGuard>>,
24) -> Result<(), ShellError> {
25 for one_status in exit_status.into_iter().rev().flatten() {
26 check_exit_status_future_ok(one_status)?
27 }
28 Ok(())
29}
30
31fn check_exit_status_future_ok(exit_status_guard: ExitStatusGuard) -> Result<(), ShellError> {
32 let ignore_error = {
33 let guard = exit_status_guard
34 .ignore_error
35 .lock()
36 .expect("lock ignore_error should success");
37 *guard
38 };
39 let mut future = exit_status_guard
40 .exit_status_future
41 .lock()
42 .expect("lock exit_status_future should success");
43 let span = exit_status_guard.span.unwrap_or_default();
44 let exit_status = future.wait(span)?;
45 check_ok(exit_status, ignore_error, span)
46}
47
48pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
49 match status {
50 ExitStatus::Exited(exit_code) => {
51 if ignore_error {
52 Ok(())
53 } else if let Ok(exit_code) = exit_code.try_into() {
54 Err(ShellError::NonZeroExitCode { exit_code, span })
55 } else {
56 Ok(())
57 }
58 }
59 #[cfg(unix)]
60 ExitStatus::Signaled {
61 signal,
62 core_dumped,
63 } => {
64 use nix::sys::signal::Signal;
65
66 let sig = Signal::try_from(signal);
67
68 if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
69 Ok(())
71 } else {
72 let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
73 Err(if core_dumped {
74 ShellError::CoreDumped {
75 signal_name,
76 signal,
77 span,
78 }
79 } else {
80 ShellError::TerminatedBySignal {
81 signal_name,
82 signal,
83 span,
84 }
85 })
86 }
87 }
88 }
89}
90
91#[derive(Debug)]
97pub struct ExitStatusGuard {
98 pub exit_status_future: Arc<Mutex<ExitStatusFuture>>,
99 pub ignore_error: Arc<Mutex<bool>>,
100 pub span: Option<Span>,
101}
102
103impl ExitStatusGuard {
104 pub fn new(
105 exit_status_future: Arc<Mutex<ExitStatusFuture>>,
106 ignore_error: Arc<Mutex<bool>>,
107 ) -> Self {
108 Self {
109 exit_status_future,
110 ignore_error,
111 span: None,
112 }
113 }
114
115 pub fn with_span(self, span: Span) -> Self {
116 Self {
117 exit_status_future: self.exit_status_future,
118 ignore_error: self.ignore_error,
119 span: Some(span),
120 }
121 }
122}
123
124#[derive(Debug)]
125pub enum ExitStatusFuture {
126 Finished(Result<ExitStatus, Box<ShellError>>),
127 Running(Receiver<io::Result<ExitStatus>>),
128}
129
130impl ExitStatusFuture {
131 pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
132 match self {
133 ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
134 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
135 ExitStatusFuture::Running(receiver) => {
136 let code = match receiver.recv() {
137 #[cfg(unix)]
138 Ok(Ok(
139 status @ ExitStatus::Signaled {
140 core_dumped: true, ..
141 },
142 )) => {
143 check_ok(status, false, span)?;
144 Ok(status)
145 }
146 Ok(Ok(status)) => Ok(status),
147 Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
148 err,
149 span,
150 None,
151 "failed to get exit code",
152 ))),
153 Err(err @ RecvError) => Err(ShellError::Generic(GenericError::new(
154 err.to_string(),
155 "failed to get exit code",
156 span,
157 ))),
158 };
159
160 *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
161
162 code
163 }
164 }
165 }
166
167 fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
168 match self {
169 ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
170 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
171 ExitStatusFuture::Running(receiver) => {
172 let code = match receiver.try_recv() {
173 Ok(Ok(status)) => Ok(Some(status)),
174 Ok(Err(err)) => Err(ShellError::Generic(GenericError::new(
175 err.to_string(),
176 "failed to get exit code",
177 span,
178 ))),
179 Err(TryRecvError::Disconnected) => Err(ShellError::Generic(GenericError::new(
180 "receiver disconnected",
181 "failed to get exit code",
182 span,
183 ))),
184 Err(TryRecvError::Empty) => Ok(None),
185 };
186
187 if let Some(code) = code.clone().transpose() {
188 *self = ExitStatusFuture::Finished(code.map_err(Box::new));
189 }
190
191 code
192 }
193 }
194 }
195}
196
197#[derive(derive_more::Debug)]
198pub enum ChildPipe {
199 #[debug("ChildPipe::Pipe")]
200 Pipe(PipeReader),
201
202 #[debug("ChildPipe::Tee")]
203 Tee(Box<dyn Read + Send + 'static>),
204}
205
206impl From<PipeReader> for ChildPipe {
207 fn from(pipe: PipeReader) -> Self {
208 Self::Pipe(pipe)
209 }
210}
211
212impl Read for ChildPipe {
213 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
214 match self {
215 ChildPipe::Pipe(pipe) => pipe.read(buf),
216 ChildPipe::Tee(tee) => tee.read(buf),
217 }
218 }
219}
220
221#[derive(Debug)]
222pub struct ChildProcess {
223 pub stdout: Option<ChildPipe>,
224 pub stderr: Option<ChildPipe>,
225 exit_status: Arc<Mutex<ExitStatusFuture>>,
226 ignore_error: Arc<Mutex<bool>>,
227 span: Span,
228}
229
230#[derive(derive_more::Debug)]
232#[debug("<wait_callback>")]
233pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
234
235impl PostWaitCallback {
236 pub fn new<F>(f: F) -> Self
237 where
238 F: FnOnce(ForegroundWaitStatus) + Send + 'static,
239 {
240 PostWaitCallback(Box::new(f))
241 }
242
243 pub fn for_job_control(
251 engine_state: &EngineState,
252 child_pid: Option<u32>,
253 description: Option<String>,
254 ) -> Self {
255 let this_job = engine_state.current_thread_job().cloned();
256 let jobs = engine_state.jobs.clone();
257 let is_interactive = engine_state.is_interactive;
258
259 PostWaitCallback::new(move |status| {
260 if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
261 this_job.remove_pid(child_pid);
262 }
263
264 if let ForegroundWaitStatus::Frozen(unfreeze) = status {
265 let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
266
267 let job_id = jobs.add_job(Job::Frozen(FrozenJob {
268 unfreeze,
269 description,
270 }));
271
272 if is_interactive {
273 println!("\nJob {} is frozen", job_id.get());
274 }
275 }
276 })
277 }
278}
279
280impl ChildProcess {
281 pub fn new(
282 mut child: ForegroundChild,
283 reader: Option<PipeReader>,
284 swap: bool,
285 span: Span,
286 callback: Option<PostWaitCallback>,
287 ) -> Result<Self, ShellError> {
288 let (stdout, stderr) = match reader {
289 Some(combined) => (Some(combined), None),
290 None => {
291 let stdout = child.as_mut().stdout.take().map(convert_file);
292 let stderr = child.as_mut().stderr.take().map(convert_file);
293
294 if swap {
295 (stderr, stdout)
296 } else {
297 (stdout, stderr)
298 }
299 }
300 };
301
302 let (exit_status_sender, exit_status) = mpsc::channel();
304
305 thread::Builder::new()
306 .name("exit status waiter".into())
307 .spawn(move || {
308 let matched = match child.wait() {
309 Ok(wait_status) => {
317 let next = match &wait_status {
318 ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
319 ForegroundWaitStatus::Finished(exit_status) => *exit_status,
320 };
321
322 if let Some(callback) = callback {
323 (callback.0)(wait_status);
324 }
325
326 Ok(next)
327 }
328 Err(err) => Err(err),
329 };
330
331 exit_status_sender.send(matched)
332 })
333 .map_err(|err| {
334 IoError::new_with_additional_context(
335 err,
336 span,
337 None,
338 "Could now spawn exit status waiter",
339 )
340 })?;
341
342 Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
343 }
344
345 pub fn from_raw(
346 stdout: Option<PipeReader>,
347 stderr: Option<PipeReader>,
348 exit_status: Option<Receiver<io::Result<ExitStatus>>>,
349 span: Span,
350 ) -> Self {
351 Self {
352 stdout: stdout.map(Into::into),
353 stderr: stderr.map(Into::into),
354 exit_status: Arc::new(Mutex::new(
355 exit_status
356 .map(ExitStatusFuture::Running)
357 .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
358 )),
359 ignore_error: Arc::new(Mutex::new(false)),
360 span,
361 }
362 }
363
364 pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
365 {
366 let mut ignore_error = self.ignore_error.lock().expect("lock should success");
367 *ignore_error = ignore;
368 }
369 self
370 }
371
372 pub fn span(&self) -> Span {
373 self.span
374 }
375
376 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
377 if self.stderr.is_some() {
378 debug_assert!(false, "stderr should not exist");
379 return Err(ShellError::Generic(GenericError::new(
380 "internal error",
381 "stderr should not exist",
382 self.span,
383 )));
384 }
385
386 let bytes = (self.stdout)
387 .map(collect_bytes)
388 .transpose()
389 .map_err(|err| IoError::new(err, self.span, None))?
390 .unwrap_or_default();
391
392 let mut exit_status = self
393 .exit_status
394 .lock()
395 .expect("lock exit_status future should success");
396 let ignore_error = {
397 let guard = self
398 .ignore_error
399 .lock()
400 .expect("lock ignore error should success");
401 *guard
402 };
403 check_ok(exit_status.wait(self.span)?, ignore_error, self.span)?;
404
405 Ok(bytes)
406 }
407
408 pub fn wait(mut self) -> Result<(), ShellError> {
409 let from_io_error = IoError::factory(self.span, None);
410 if let Some(stdout) = self.stdout.take() {
411 let stderr = self
412 .stderr
413 .take()
414 .map(|stderr| {
415 thread::Builder::new()
416 .name("stderr consumer".into())
417 .spawn(move || consume_pipe(stderr))
418 })
419 .transpose()
420 .map_err(&from_io_error)?;
421
422 let res = consume_pipe(stdout);
423
424 if let Some(handle) = stderr {
425 handle
426 .join()
427 .map_err(|e| match e.downcast::<io::Error>() {
428 Ok(io) => from_io_error(*io).into(),
429 Err(err) => ShellError::Generic(GenericError::new(
430 "Unknown error",
431 format!("{err:?}"),
432 self.span,
433 )),
434 })?
435 .map_err(&from_io_error)?;
436 }
437
438 res.map_err(&from_io_error)?;
439 } else if let Some(stderr) = self.stderr.take() {
440 consume_pipe(stderr).map_err(&from_io_error)?;
441 }
442 let mut exit_status = self
443 .exit_status
444 .lock()
445 .expect("lock exit_status future should success");
446 let ignore_error = {
447 let guard = self
448 .ignore_error
449 .lock()
450 .expect("lock ignore error should success");
451 *guard
452 };
453 check_ok(exit_status.wait(self.span)?, ignore_error, self.span)
454 }
455
456 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
457 let mut exit_status = self
458 .exit_status
459 .lock()
460 .expect("lock exit_status future should success");
461 exit_status.try_wait(self.span)
462 }
463
464 pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
465 let from_io_error = IoError::factory(self.span, None);
466
467 let (stdout, stderr) = match (self.stdout, self.stderr) {
468 (None, None) => (None, None),
469 (None, Some(stderr)) => (None, Some(collect_bytes(stderr).map_err(&from_io_error)?)),
470 (Some(stdout), None) => (Some(collect_bytes(stdout).map_err(&from_io_error)?), None),
471 (Some(stdout), Some(stderr)) => {
472 let stderr = thread::Builder::new()
473 .spawn(move || collect_bytes(stderr))
474 .map_err(&from_io_error)?;
475
476 let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
477
478 let stderr = stderr
479 .join()
480 .map_err(|e| match e.downcast::<io::Error>() {
481 Ok(io) => from_io_error(*io).into(),
482 Err(err) => ShellError::Generic(GenericError::new(
483 "Unknown error",
484 format!("{err:?}"),
485 self.span,
486 )),
487 })?
488 .map_err(&from_io_error)?;
489
490 (Some(stdout), Some(stderr))
491 }
492 };
493
494 let mut exit_status = self
495 .exit_status
496 .lock()
497 .expect("lock exit_status future should success");
498 let exit_status = exit_status.wait(self.span)?;
499
500 Ok(ProcessOutput {
501 stdout,
502 stderr,
503 exit_status,
504 })
505 }
506
507 pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
508 self.exit_status.clone()
509 }
510
511 pub fn clone_ignore_error(&self) -> Arc<Mutex<bool>> {
512 self.ignore_error.clone()
513 }
514}
515
516fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
517 let mut buf = Vec::new();
518 match pipe {
519 ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
520 ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
521 }?;
522 Ok(buf)
523}
524
525fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
526 match pipe {
527 ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
528 ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
529 }?;
530 Ok(())
531}
532
533pub struct ProcessOutput {
534 pub stdout: Option<Vec<u8>>,
535 pub stderr: Option<Vec<u8>>,
536 pub exit_status: ExitStatus,
537}