1use crate::{
2 ShellError, Span,
3 byte_stream::convert_file,
4 engine::{EngineState, FrozenJob, Job},
5 shell_error::{generic::GenericError, io::IoError},
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11 fmt::Debug,
12 io::{self, Read},
13 sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14 sync::{Arc, Mutex},
15 thread,
16};
17
18#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23 exit_status: Vec<Option<ExitStatusGuard>>,
24) -> Result<(), ShellError> {
25 for one_status in exit_status.into_iter().rev().flatten() {
26 check_exit_status_future_ok(one_status)?
27 }
28 Ok(())
29}
30
31fn check_exit_status_future_ok(exit_status_guard: ExitStatusGuard) -> Result<(), ShellError> {
32 let ignore_error = {
33 let guard = exit_status_guard
34 .ignore_error
35 .lock()
36 .expect("lock ignore_error should success");
37 *guard
38 };
39 let mut future = exit_status_guard
40 .exit_status_future
41 .lock()
42 .expect("lock exit_status_future should success");
43 let span = exit_status_guard.span.unwrap_or_default();
44 let exit_status = future.wait(span)?;
45 check_ok(exit_status, ignore_error, span)
46}
47
48pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
49 match status {
50 ExitStatus::Exited(exit_code) => {
51 if ignore_error {
52 Ok(())
53 } else if let Ok(exit_code) = exit_code.try_into() {
54 Err(ShellError::NonZeroExitCode { exit_code, span })
55 } else {
56 Ok(())
57 }
58 }
59 #[cfg(unix)]
60 ExitStatus::Signaled {
61 signal,
62 core_dumped,
63 } => {
64 use nix::sys::signal::Signal;
65
66 let sig = Signal::try_from(signal);
67
68 if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
69 Ok(())
71 } else {
72 let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
73 Err(if core_dumped {
74 ShellError::CoreDumped {
75 signal_name,
76 signal,
77 span,
78 }
79 } else {
80 ShellError::TerminatedBySignal {
81 signal_name,
82 signal,
83 span,
84 }
85 })
86 }
87 }
88 }
89}
90
91#[derive(Debug)]
97pub struct ExitStatusGuard {
98 pub exit_status_future: Arc<Mutex<ExitStatusFuture>>,
99 pub ignore_error: Arc<Mutex<bool>>,
100 pub span: Option<Span>,
101}
102
103impl ExitStatusGuard {
104 pub fn new(
105 exit_status_future: Arc<Mutex<ExitStatusFuture>>,
106 ignore_error: Arc<Mutex<bool>>,
107 ) -> Self {
108 Self {
109 exit_status_future,
110 ignore_error,
111 span: None,
112 }
113 }
114
115 pub fn with_span(self, span: Span) -> Self {
116 Self {
117 exit_status_future: self.exit_status_future,
118 ignore_error: self.ignore_error,
119 span: Some(span),
120 }
121 }
122}
123
124#[derive(Debug)]
125pub enum ExitStatusFuture {
126 Finished(Result<ExitStatus, Box<ShellError>>),
127 Running(Receiver<io::Result<ExitStatus>>),
128}
129
130impl ExitStatusFuture {
131 pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
132 match self {
133 ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
134 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
135 ExitStatusFuture::Running(receiver) => {
136 let code = match receiver.recv() {
137 #[cfg(unix)]
138 Ok(Ok(
139 status @ ExitStatus::Signaled {
140 core_dumped: true, ..
141 },
142 )) => {
143 check_ok(status, false, span)?;
144 Ok(status)
145 }
146 Ok(Ok(status)) => Ok(status),
147 Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
148 err,
149 span,
150 None,
151 "failed to get exit code",
152 ))),
153 Err(err @ RecvError) => Err(ShellError::Generic(GenericError::new(
154 err.to_string(),
155 "failed to get exit code",
156 span,
157 ))),
158 };
159
160 *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
161
162 code
163 }
164 }
165 }
166
167 fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
168 match self {
169 ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
170 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
171 ExitStatusFuture::Running(receiver) => {
172 let code = match receiver.try_recv() {
173 Ok(Ok(status)) => Ok(Some(status)),
174 Ok(Err(err)) => Err(ShellError::Generic(GenericError::new(
175 err.to_string(),
176 "failed to get exit code",
177 span,
178 ))),
179 Err(TryRecvError::Disconnected) => Err(ShellError::Generic(GenericError::new(
180 "receiver disconnected",
181 "failed to get exit code",
182 span,
183 ))),
184 Err(TryRecvError::Empty) => Ok(None),
185 };
186
187 if let Some(code) = code.clone().transpose() {
188 *self = ExitStatusFuture::Finished(code.map_err(Box::new));
189 }
190
191 code
192 }
193 }
194 }
195}
196
197pub enum ChildPipe {
198 Pipe(PipeReader),
199 Tee(Box<dyn Read + Send + 'static>),
200}
201
202impl Debug for ChildPipe {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("ChildPipe").finish()
205 }
206}
207
208impl From<PipeReader> for ChildPipe {
209 fn from(pipe: PipeReader) -> Self {
210 Self::Pipe(pipe)
211 }
212}
213
214impl Read for ChildPipe {
215 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
216 match self {
217 ChildPipe::Pipe(pipe) => pipe.read(buf),
218 ChildPipe::Tee(tee) => tee.read(buf),
219 }
220 }
221}
222
223#[derive(Debug)]
224pub struct ChildProcess {
225 pub stdout: Option<ChildPipe>,
226 pub stderr: Option<ChildPipe>,
227 exit_status: Arc<Mutex<ExitStatusFuture>>,
228 ignore_error: Arc<Mutex<bool>>,
229 span: Span,
230}
231
232pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
234
235impl PostWaitCallback {
236 pub fn new<F>(f: F) -> Self
237 where
238 F: FnOnce(ForegroundWaitStatus) + Send + 'static,
239 {
240 PostWaitCallback(Box::new(f))
241 }
242
243 pub fn for_job_control(
251 engine_state: &EngineState,
252 child_pid: Option<u32>,
253 description: Option<String>,
254 ) -> Self {
255 let this_job = engine_state.current_thread_job().cloned();
256 let jobs = engine_state.jobs.clone();
257 let is_interactive = engine_state.is_interactive;
258
259 PostWaitCallback::new(move |status| {
260 if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
261 this_job.remove_pid(child_pid);
262 }
263
264 if let ForegroundWaitStatus::Frozen(unfreeze) = status {
265 let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
266
267 let job_id = jobs.add_job(Job::Frozen(FrozenJob {
268 unfreeze,
269 description,
270 }));
271
272 if is_interactive {
273 println!("\nJob {} is frozen", job_id.get());
274 }
275 }
276 })
277 }
278}
279
280impl Debug for PostWaitCallback {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "<wait_callback>")
283 }
284}
285
286impl ChildProcess {
287 pub fn new(
288 mut child: ForegroundChild,
289 reader: Option<PipeReader>,
290 swap: bool,
291 span: Span,
292 callback: Option<PostWaitCallback>,
293 ) -> Result<Self, ShellError> {
294 let (stdout, stderr) = if let Some(combined) = reader {
295 (Some(combined), None)
296 } else {
297 let stdout = child.as_mut().stdout.take().map(convert_file);
298 let stderr = child.as_mut().stderr.take().map(convert_file);
299
300 if swap {
301 (stderr, stdout)
302 } else {
303 (stdout, stderr)
304 }
305 };
306
307 let (exit_status_sender, exit_status) = mpsc::channel();
309
310 thread::Builder::new()
311 .name("exit status waiter".into())
312 .spawn(move || {
313 let matched = match child.wait() {
314 Ok(wait_status) => {
322 let next = match &wait_status {
323 ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
324 ForegroundWaitStatus::Finished(exit_status) => *exit_status,
325 };
326
327 if let Some(callback) = callback {
328 (callback.0)(wait_status);
329 }
330
331 Ok(next)
332 }
333 Err(err) => Err(err),
334 };
335
336 exit_status_sender.send(matched)
337 })
338 .map_err(|err| {
339 IoError::new_with_additional_context(
340 err,
341 span,
342 None,
343 "Could now spawn exit status waiter",
344 )
345 })?;
346
347 Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
348 }
349
350 pub fn from_raw(
351 stdout: Option<PipeReader>,
352 stderr: Option<PipeReader>,
353 exit_status: Option<Receiver<io::Result<ExitStatus>>>,
354 span: Span,
355 ) -> Self {
356 Self {
357 stdout: stdout.map(Into::into),
358 stderr: stderr.map(Into::into),
359 exit_status: Arc::new(Mutex::new(
360 exit_status
361 .map(ExitStatusFuture::Running)
362 .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
363 )),
364 ignore_error: Arc::new(Mutex::new(false)),
365 span,
366 }
367 }
368
369 pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
370 {
371 let mut ignore_error = self.ignore_error.lock().expect("lock should success");
372 *ignore_error = ignore;
373 }
374 self
375 }
376
377 pub fn span(&self) -> Span {
378 self.span
379 }
380
381 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
382 if self.stderr.is_some() {
383 debug_assert!(false, "stderr should not exist");
384 return Err(ShellError::Generic(GenericError::new(
385 "internal error",
386 "stderr should not exist",
387 self.span,
388 )));
389 }
390
391 let bytes = if let Some(stdout) = self.stdout {
392 collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
393 } else {
394 Vec::new()
395 };
396
397 let mut exit_status = self
398 .exit_status
399 .lock()
400 .expect("lock exit_status future should success");
401 let ignore_error = {
402 let guard = self
403 .ignore_error
404 .lock()
405 .expect("lock ignore error should success");
406 *guard
407 };
408 check_ok(exit_status.wait(self.span)?, ignore_error, self.span)?;
409
410 Ok(bytes)
411 }
412
413 pub fn wait(mut self) -> Result<(), ShellError> {
414 let from_io_error = IoError::factory(self.span, None);
415 if let Some(stdout) = self.stdout.take() {
416 let stderr = self
417 .stderr
418 .take()
419 .map(|stderr| {
420 thread::Builder::new()
421 .name("stderr consumer".into())
422 .spawn(move || consume_pipe(stderr))
423 })
424 .transpose()
425 .map_err(&from_io_error)?;
426
427 let res = consume_pipe(stdout);
428
429 if let Some(handle) = stderr {
430 handle
431 .join()
432 .map_err(|e| match e.downcast::<io::Error>() {
433 Ok(io) => from_io_error(*io).into(),
434 Err(err) => ShellError::Generic(GenericError::new(
435 "Unknown error",
436 format!("{err:?}"),
437 self.span,
438 )),
439 })?
440 .map_err(&from_io_error)?;
441 }
442
443 res.map_err(&from_io_error)?;
444 } else if let Some(stderr) = self.stderr.take() {
445 consume_pipe(stderr).map_err(&from_io_error)?;
446 }
447 let mut exit_status = self
448 .exit_status
449 .lock()
450 .expect("lock exit_status future should success");
451 let ignore_error = {
452 let guard = self
453 .ignore_error
454 .lock()
455 .expect("lock ignore error should success");
456 *guard
457 };
458 check_ok(exit_status.wait(self.span)?, ignore_error, self.span)
459 }
460
461 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
462 let mut exit_status = self
463 .exit_status
464 .lock()
465 .expect("lock exit_status future should success");
466 exit_status.try_wait(self.span)
467 }
468
469 pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
470 let from_io_error = IoError::factory(self.span, None);
471 let (stdout, stderr) = if let Some(stdout) = self.stdout {
472 let stderr = self
473 .stderr
474 .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
475 .transpose()
476 .map_err(&from_io_error)?;
477
478 let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
479
480 let stderr = stderr
481 .map(|handle| {
482 handle.join().map_err(|e| match e.downcast::<io::Error>() {
483 Ok(io) => from_io_error(*io).into(),
484 Err(err) => ShellError::Generic(GenericError::new(
485 "Unknown error",
486 format!("{err:?}"),
487 self.span,
488 )),
489 })
490 })
491 .transpose()?
492 .transpose()
493 .map_err(&from_io_error)?;
494
495 (Some(stdout), stderr)
496 } else {
497 let stderr = self
498 .stderr
499 .map(collect_bytes)
500 .transpose()
501 .map_err(&from_io_error)?;
502
503 (None, stderr)
504 };
505
506 let mut exit_status = self
507 .exit_status
508 .lock()
509 .expect("lock exit_status future should success");
510 let exit_status = exit_status.wait(self.span)?;
511
512 Ok(ProcessOutput {
513 stdout,
514 stderr,
515 exit_status,
516 })
517 }
518
519 pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
520 self.exit_status.clone()
521 }
522
523 pub fn clone_ignore_error(&self) -> Arc<Mutex<bool>> {
524 self.ignore_error.clone()
525 }
526}
527
528fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
529 let mut buf = Vec::new();
530 match pipe {
531 ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
532 ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
533 }?;
534 Ok(buf)
535}
536
537fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
538 match pipe {
539 ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
540 ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
541 }?;
542 Ok(())
543}
544
545pub struct ProcessOutput {
546 pub stdout: Option<Vec<u8>>,
547 pub stderr: Option<Vec<u8>>,
548 pub exit_status: ExitStatus,
549}