1use crate::{
2 ShellError, Span,
3 byte_stream::convert_file,
4 engine::{EngineState, FrozenJob, Job},
5 shell_error::io::IoError,
6};
7use nu_system::{ExitStatus, ForegroundChild, ForegroundWaitStatus};
8
9use os_pipe::PipeReader;
10use std::{
11 fmt::Debug,
12 io::{self, Read},
13 sync::mpsc::{self, Receiver, RecvError, TryRecvError},
14 sync::{Arc, Mutex},
15 thread,
16};
17
18#[cfg(feature = "os")]
22pub fn check_exit_status_future(
23 exit_status: Vec<Option<ExitStatusGuard>>,
24) -> Result<(), ShellError> {
25 for one_status in exit_status.into_iter().rev().flatten() {
26 check_exit_status_future_ok(one_status)?
27 }
28 Ok(())
29}
30
31fn check_exit_status_future_ok(exit_status_guard: ExitStatusGuard) -> Result<(), ShellError> {
32 let ignore_error = {
33 let guard = exit_status_guard
34 .ignore_error
35 .lock()
36 .expect("lock ignore_error should success");
37 *guard
38 };
39 let mut future = exit_status_guard
40 .exit_status_future
41 .lock()
42 .expect("lock exit_status_future should success");
43 let span = exit_status_guard.span.unwrap_or_default();
44 let exit_status = future.wait(span)?;
45 check_ok(exit_status, ignore_error, span)
46}
47
48pub fn check_ok(status: ExitStatus, ignore_error: bool, span: Span) -> Result<(), ShellError> {
49 match status {
50 ExitStatus::Exited(exit_code) => {
51 if ignore_error {
52 Ok(())
53 } else if let Ok(exit_code) = exit_code.try_into() {
54 Err(ShellError::NonZeroExitCode { exit_code, span })
55 } else {
56 Ok(())
57 }
58 }
59 #[cfg(unix)]
60 ExitStatus::Signaled {
61 signal,
62 core_dumped,
63 } => {
64 use nix::sys::signal::Signal;
65
66 let sig = Signal::try_from(signal);
67
68 if sig == Ok(Signal::SIGPIPE) || (ignore_error && !core_dumped) {
69 Ok(())
71 } else {
72 let signal_name = sig.map(Signal::as_str).unwrap_or("unknown signal").into();
73 Err(if core_dumped {
74 ShellError::CoreDumped {
75 signal_name,
76 signal,
77 span,
78 }
79 } else {
80 ShellError::TerminatedBySignal {
81 signal_name,
82 signal,
83 span,
84 }
85 })
86 }
87 }
88 }
89}
90
91#[derive(Debug)]
97pub struct ExitStatusGuard {
98 pub exit_status_future: Arc<Mutex<ExitStatusFuture>>,
99 pub ignore_error: Arc<Mutex<bool>>,
100 pub span: Option<Span>,
101}
102
103impl ExitStatusGuard {
104 pub fn new(
105 exit_status_future: Arc<Mutex<ExitStatusFuture>>,
106 ignore_error: Arc<Mutex<bool>>,
107 ) -> Self {
108 Self {
109 exit_status_future,
110 ignore_error,
111 span: None,
112 }
113 }
114
115 pub fn with_span(self, span: Span) -> Self {
116 Self {
117 exit_status_future: self.exit_status_future,
118 ignore_error: self.ignore_error,
119 span: Some(span),
120 }
121 }
122}
123
124#[derive(Debug)]
125pub enum ExitStatusFuture {
126 Finished(Result<ExitStatus, Box<ShellError>>),
127 Running(Receiver<io::Result<ExitStatus>>),
128}
129
130impl ExitStatusFuture {
131 pub fn wait(&mut self, span: Span) -> Result<ExitStatus, ShellError> {
132 match self {
133 ExitStatusFuture::Finished(Ok(status)) => Ok(*status),
134 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
135 ExitStatusFuture::Running(receiver) => {
136 let code = match receiver.recv() {
137 #[cfg(unix)]
138 Ok(Ok(
139 status @ ExitStatus::Signaled {
140 core_dumped: true, ..
141 },
142 )) => {
143 check_ok(status, false, span)?;
144 Ok(status)
145 }
146 Ok(Ok(status)) => Ok(status),
147 Ok(Err(err)) => Err(ShellError::Io(IoError::new_with_additional_context(
148 err,
149 span,
150 None,
151 "failed to get exit code",
152 ))),
153 Err(err @ RecvError) => Err(ShellError::GenericError {
154 error: err.to_string(),
155 msg: "failed to get exit code".into(),
156 span: span.into(),
157 help: None,
158 inner: vec![],
159 }),
160 };
161
162 *self = ExitStatusFuture::Finished(code.clone().map_err(Box::new));
163
164 code
165 }
166 }
167 }
168
169 fn try_wait(&mut self, span: Span) -> Result<Option<ExitStatus>, ShellError> {
170 match self {
171 ExitStatusFuture::Finished(Ok(code)) => Ok(Some(*code)),
172 ExitStatusFuture::Finished(Err(err)) => Err(err.as_ref().clone()),
173 ExitStatusFuture::Running(receiver) => {
174 let code = match receiver.try_recv() {
175 Ok(Ok(status)) => Ok(Some(status)),
176 Ok(Err(err)) => Err(ShellError::GenericError {
177 error: err.to_string(),
178 msg: "failed to get exit code".to_string(),
179 span: span.into(),
180 help: None,
181 inner: vec![],
182 }),
183 Err(TryRecvError::Disconnected) => Err(ShellError::GenericError {
184 error: "receiver disconnected".to_string(),
185 msg: "failed to get exit code".into(),
186 span: span.into(),
187 help: None,
188 inner: vec![],
189 }),
190 Err(TryRecvError::Empty) => Ok(None),
191 };
192
193 if let Some(code) = code.clone().transpose() {
194 *self = ExitStatusFuture::Finished(code.map_err(Box::new));
195 }
196
197 code
198 }
199 }
200 }
201}
202
203pub enum ChildPipe {
204 Pipe(PipeReader),
205 Tee(Box<dyn Read + Send + 'static>),
206}
207
208impl Debug for ChildPipe {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("ChildPipe").finish()
211 }
212}
213
214impl From<PipeReader> for ChildPipe {
215 fn from(pipe: PipeReader) -> Self {
216 Self::Pipe(pipe)
217 }
218}
219
220impl Read for ChildPipe {
221 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
222 match self {
223 ChildPipe::Pipe(pipe) => pipe.read(buf),
224 ChildPipe::Tee(tee) => tee.read(buf),
225 }
226 }
227}
228
229#[derive(Debug)]
230pub struct ChildProcess {
231 pub stdout: Option<ChildPipe>,
232 pub stderr: Option<ChildPipe>,
233 exit_status: Arc<Mutex<ExitStatusFuture>>,
234 ignore_error: Arc<Mutex<bool>>,
235 span: Span,
236}
237
238pub struct PostWaitCallback(pub Box<dyn FnOnce(ForegroundWaitStatus) + Send>);
240
241impl PostWaitCallback {
242 pub fn new<F>(f: F) -> Self
243 where
244 F: FnOnce(ForegroundWaitStatus) + Send + 'static,
245 {
246 PostWaitCallback(Box::new(f))
247 }
248
249 pub fn for_job_control(
257 engine_state: &EngineState,
258 child_pid: Option<u32>,
259 tag: Option<String>,
260 ) -> Self {
261 let this_job = engine_state.current_thread_job().cloned();
262 let jobs = engine_state.jobs.clone();
263 let is_interactive = engine_state.is_interactive;
264
265 PostWaitCallback::new(move |status| {
266 if let (Some(this_job), Some(child_pid)) = (this_job, child_pid) {
267 this_job.remove_pid(child_pid);
268 }
269
270 if let ForegroundWaitStatus::Frozen(unfreeze) = status {
271 let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
272
273 let job_id = jobs.add_job(Job::Frozen(FrozenJob { unfreeze, tag }));
274
275 if is_interactive {
276 println!("\nJob {} is frozen", job_id.get());
277 }
278 }
279 })
280 }
281}
282
283impl Debug for PostWaitCallback {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 write!(f, "<wait_callback>")
286 }
287}
288
289impl ChildProcess {
290 pub fn new(
291 mut child: ForegroundChild,
292 reader: Option<PipeReader>,
293 swap: bool,
294 span: Span,
295 callback: Option<PostWaitCallback>,
296 ) -> Result<Self, ShellError> {
297 let (stdout, stderr) = if let Some(combined) = reader {
298 (Some(combined), None)
299 } else {
300 let stdout = child.as_mut().stdout.take().map(convert_file);
301 let stderr = child.as_mut().stderr.take().map(convert_file);
302
303 if swap {
304 (stderr, stdout)
305 } else {
306 (stdout, stderr)
307 }
308 };
309
310 let (exit_status_sender, exit_status) = mpsc::channel();
312
313 thread::Builder::new()
314 .name("exit status waiter".into())
315 .spawn(move || {
316 let matched = match child.wait() {
317 Ok(wait_status) => {
325 let next = match &wait_status {
326 ForegroundWaitStatus::Frozen(_) => ExitStatus::Exited(0),
327 ForegroundWaitStatus::Finished(exit_status) => *exit_status,
328 };
329
330 if let Some(callback) = callback {
331 (callback.0)(wait_status);
332 }
333
334 Ok(next)
335 }
336 Err(err) => Err(err),
337 };
338
339 exit_status_sender.send(matched)
340 })
341 .map_err(|err| {
342 IoError::new_with_additional_context(
343 err,
344 span,
345 None,
346 "Could now spawn exit status waiter",
347 )
348 })?;
349
350 Ok(Self::from_raw(stdout, stderr, Some(exit_status), span))
351 }
352
353 pub fn from_raw(
354 stdout: Option<PipeReader>,
355 stderr: Option<PipeReader>,
356 exit_status: Option<Receiver<io::Result<ExitStatus>>>,
357 span: Span,
358 ) -> Self {
359 Self {
360 stdout: stdout.map(Into::into),
361 stderr: stderr.map(Into::into),
362 exit_status: Arc::new(Mutex::new(
363 exit_status
364 .map(ExitStatusFuture::Running)
365 .unwrap_or(ExitStatusFuture::Finished(Ok(ExitStatus::Exited(0)))),
366 )),
367 ignore_error: Arc::new(Mutex::new(false)),
368 span,
369 }
370 }
371
372 pub fn ignore_error(&mut self, ignore: bool) -> &mut Self {
373 {
374 let mut ignore_error = self.ignore_error.lock().expect("lock should success");
375 *ignore_error = ignore;
376 }
377 self
378 }
379
380 pub fn span(&self) -> Span {
381 self.span
382 }
383
384 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
385 if self.stderr.is_some() {
386 debug_assert!(false, "stderr should not exist");
387 return Err(ShellError::GenericError {
388 error: "internal error".into(),
389 msg: "stderr should not exist".into(),
390 span: self.span.into(),
391 help: None,
392 inner: vec![],
393 });
394 }
395
396 let bytes = if let Some(stdout) = self.stdout {
397 collect_bytes(stdout).map_err(|err| IoError::new(err, self.span, None))?
398 } else {
399 Vec::new()
400 };
401
402 let mut exit_status = self
403 .exit_status
404 .lock()
405 .expect("lock exit_status future should success");
406 let ignore_error = {
407 let guard = self
408 .ignore_error
409 .lock()
410 .expect("lock ignore error should success");
411 *guard
412 };
413 check_ok(exit_status.wait(self.span)?, ignore_error, self.span)?;
414
415 Ok(bytes)
416 }
417
418 pub fn wait(mut self) -> Result<(), ShellError> {
419 let from_io_error = IoError::factory(self.span, None);
420 if let Some(stdout) = self.stdout.take() {
421 let stderr = self
422 .stderr
423 .take()
424 .map(|stderr| {
425 thread::Builder::new()
426 .name("stderr consumer".into())
427 .spawn(move || consume_pipe(stderr))
428 })
429 .transpose()
430 .map_err(&from_io_error)?;
431
432 let res = consume_pipe(stdout);
433
434 if let Some(handle) = stderr {
435 handle
436 .join()
437 .map_err(|e| match e.downcast::<io::Error>() {
438 Ok(io) => from_io_error(*io).into(),
439 Err(err) => ShellError::GenericError {
440 error: "Unknown error".into(),
441 msg: format!("{err:?}"),
442 span: Some(self.span),
443 help: None,
444 inner: Vec::new(),
445 },
446 })?
447 .map_err(&from_io_error)?;
448 }
449
450 res.map_err(&from_io_error)?;
451 } else if let Some(stderr) = self.stderr.take() {
452 consume_pipe(stderr).map_err(&from_io_error)?;
453 }
454 let mut exit_status = self
455 .exit_status
456 .lock()
457 .expect("lock exit_status future should success");
458 let ignore_error = {
459 let guard = self
460 .ignore_error
461 .lock()
462 .expect("lock ignore error should success");
463 *guard
464 };
465 check_ok(exit_status.wait(self.span)?, ignore_error, self.span)
466 }
467
468 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>, ShellError> {
469 let mut exit_status = self
470 .exit_status
471 .lock()
472 .expect("lock exit_status future should success");
473 exit_status.try_wait(self.span)
474 }
475
476 pub fn wait_with_output(self) -> Result<ProcessOutput, ShellError> {
477 let from_io_error = IoError::factory(self.span, None);
478 let (stdout, stderr) = if let Some(stdout) = self.stdout {
479 let stderr = self
480 .stderr
481 .map(|stderr| thread::Builder::new().spawn(move || collect_bytes(stderr)))
482 .transpose()
483 .map_err(&from_io_error)?;
484
485 let stdout = collect_bytes(stdout).map_err(&from_io_error)?;
486
487 let stderr = stderr
488 .map(|handle| {
489 handle.join().map_err(|e| match e.downcast::<io::Error>() {
490 Ok(io) => from_io_error(*io).into(),
491 Err(err) => ShellError::GenericError {
492 error: "Unknown error".into(),
493 msg: format!("{err:?}"),
494 span: Some(self.span),
495 help: None,
496 inner: Vec::new(),
497 },
498 })
499 })
500 .transpose()?
501 .transpose()
502 .map_err(&from_io_error)?;
503
504 (Some(stdout), stderr)
505 } else {
506 let stderr = self
507 .stderr
508 .map(collect_bytes)
509 .transpose()
510 .map_err(&from_io_error)?;
511
512 (None, stderr)
513 };
514
515 let mut exit_status = self
516 .exit_status
517 .lock()
518 .expect("lock exit_status future should success");
519 let exit_status = exit_status.wait(self.span)?;
520
521 Ok(ProcessOutput {
522 stdout,
523 stderr,
524 exit_status,
525 })
526 }
527
528 pub fn clone_exit_status_future(&self) -> Arc<Mutex<ExitStatusFuture>> {
529 self.exit_status.clone()
530 }
531
532 pub fn clone_ignore_error(&self) -> Arc<Mutex<bool>> {
533 self.ignore_error.clone()
534 }
535}
536
537fn collect_bytes(pipe: ChildPipe) -> io::Result<Vec<u8>> {
538 let mut buf = Vec::new();
539 match pipe {
540 ChildPipe::Pipe(mut pipe) => pipe.read_to_end(&mut buf),
541 ChildPipe::Tee(mut tee) => tee.read_to_end(&mut buf),
542 }?;
543 Ok(buf)
544}
545
546fn consume_pipe(pipe: ChildPipe) -> io::Result<()> {
547 match pipe {
548 ChildPipe::Pipe(mut pipe) => io::copy(&mut pipe, &mut io::sink()),
549 ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
550 }?;
551 Ok(())
552}
553
554pub struct ProcessOutput {
555 pub stdout: Option<Vec<u8>>,
556 pub stderr: Option<Vec<u8>>,
557 pub exit_status: ExitStatus,
558}