1use super::Command;
20use crate::job::add_job_if_suspended;
21use crate::trap::run_exit_trap;
22use enumset::EnumSet;
23use itertools::Itertools;
24use std::ops::ControlFlow::{Break, Continue};
25use std::rc::Rc;
26use yash_env::Env;
27use yash_env::System;
28use yash_env::io::Fd;
29use yash_env::job::Pid;
30use yash_env::option::Option::{Exec, Interactive, PipeFail};
31use yash_env::option::State::{Off, On};
32use yash_env::semantics::Divert;
33use yash_env::semantics::ExitStatus;
34use yash_env::semantics::Result;
35use yash_env::stack::Frame;
36use yash_env::subshell::JobControl;
37use yash_env::subshell::Subshell;
38use yash_env::system::Errno;
39use yash_syntax::syntax;
40
41impl Command for syntax::Pipeline {
79 async fn execute(&self, env: &mut Env) -> Result {
80 if env.options.get(Exec) == Off && env.options.get(Interactive) == Off {
81 return Continue(());
82 }
83
84 if !self.negation {
85 return execute_commands_in_pipeline(env, &self.commands).await;
86 }
87
88 let mut env = env.push_frame(Frame::Condition);
89 execute_commands_in_pipeline(&mut env, &self.commands).await?;
90 env.exit_status = if env.exit_status.is_successful() {
91 ExitStatus::FAILURE
92 } else {
93 ExitStatus::SUCCESS
94 };
95 Continue(())
96 }
97}
98
99async fn execute_commands_in_pipeline(env: &mut Env, commands: &[Rc<syntax::Command>]) -> Result {
100 match commands.len() {
101 0 => {
102 env.exit_status = ExitStatus::SUCCESS;
103 Continue(())
104 }
105
106 1 => commands[0].execute(env).await,
107
108 _ => {
109 if env.controls_jobs() {
110 execute_job_controlled_pipeline(env, commands).await?
111 } else {
112 execute_multi_command_pipeline(env, commands).await?
113 }
114 env.apply_errexit()
115 }
116 }
117}
118
119async fn execute_job_controlled_pipeline(
120 env: &mut Env,
121 commands: &[Rc<syntax::Command>],
122) -> Result {
123 let commands_2 = commands.to_vec();
124 let subshell = Subshell::new(|sub_env, _job_control| {
125 Box::pin(async move {
126 let result = execute_multi_command_pipeline(sub_env, &commands_2).await;
127 sub_env.apply_result(result);
128 run_exit_trap(sub_env).await;
129 })
130 })
131 .job_control(JobControl::Foreground);
132
133 match subshell.start_and_wait(env).await {
134 Ok((pid, result)) => {
135 env.exit_status = add_job_if_suspended(env, pid, result, || to_job_name(commands))?;
136 Continue(())
137 }
138 Err(errno) => {
139 let message = format!("cannot start a subshell in the pipeline: {errno}\n");
141 env.system.print_error(&message).await;
142 Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
143 }
144 }
145}
146
147fn to_job_name(commands: &[Rc<syntax::Command>]) -> String {
148 commands
149 .iter()
150 .format_with(" | ", |cmd, f| f(&format_args!("{cmd}")))
151 .to_string()
152}
153
154async fn execute_multi_command_pipeline(env: &mut Env, commands: &[Rc<syntax::Command>]) -> Result {
155 let mut commands = commands.iter().cloned();
157 let mut pipes = PipeSet::new();
158 let mut pids = Vec::new();
159 while let Some(command) = commands.next() {
160 let has_next = commands.len() > 0; shift_or_fail(env, &mut pipes, has_next).await?;
162
163 let pipes = pipes;
164 let subshell = Subshell::new(move |env, _job_control| {
165 Box::pin(async move {
166 let result = connect_pipe_and_execute_command(env, pipes, command).await;
167 env.apply_result(result);
168 run_exit_trap(env).await;
169 })
170 });
171 let start_result = subshell.start(env).await;
172 pids.push(pid_or_fail(env, start_result).await?);
173 }
174
175 shift_or_fail(env, &mut pipes, false).await?;
176
177 let mut final_exit_status = ExitStatus::SUCCESS;
179 let pipefail = env.options.get(PipeFail) == On;
180 for pid in pids {
181 let exit_status = env
182 .wait_for_subshell_to_finish(pid)
183 .await
184 .expect("cannot receive exit status of child process")
185 .1;
186 if !exit_status.is_successful() || !pipefail {
187 final_exit_status = exit_status;
188 }
189 }
190 env.exit_status = final_exit_status;
191
192 Continue(())
193}
194
195async fn shift_or_fail(env: &mut Env, pipes: &mut PipeSet, has_next: bool) -> Result {
196 match pipes.shift(env, has_next) {
197 Ok(()) => Continue(()),
198 Err(errno) => {
199 let message = format!("cannot connect pipes in the pipeline: {errno}\n");
201 env.system.print_error(&message).await;
202 Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
203 }
204 }
205}
206
207async fn connect_pipe_and_execute_command(
208 env: &mut Env,
209 pipes: PipeSet,
210 command: Rc<syntax::Command>,
211) -> Result {
212 match pipes.move_to_stdin_stdout(env) {
213 Ok(()) => (),
214 Err(errno) => {
215 let message = format!("cannot connect pipes in the pipeline: {errno}\n");
217 env.system.print_error(&message).await;
218 return Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)));
219 }
220 }
221
222 command.execute(env).await
223}
224
225async fn pid_or_fail(
226 env: &mut Env,
227 start_result: std::result::Result<(Pid, Option<JobControl>), Errno>,
228) -> Result<Pid> {
229 match start_result {
230 Ok((pid, job_control)) => {
231 debug_assert_eq!(job_control, None);
232 Continue(pid)
233 }
234 Err(errno) => {
235 env.system
237 .print_error(&format!(
238 "cannot start a subshell in the pipeline: {errno}\n"
239 ))
240 .await;
241 Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
242 }
243 }
244}
245
246#[derive(Clone, Copy, Default)]
248struct PipeSet {
249 read_previous: Option<Fd>,
250 next: Option<(Fd, Fd)>,
252}
253
254impl PipeSet {
255 fn new() -> Self {
256 Self::default()
257 }
258
259 fn shift(&mut self, env: &mut Env, has_next: bool) -> std::result::Result<(), Errno> {
264 if let Some(fd) = self.read_previous {
265 let _ = env.system.close(fd);
266 }
267
268 if let Some((reader, writer)) = self.next {
269 let _ = env.system.close(writer);
270 self.read_previous = Some(reader);
271 } else {
272 self.read_previous = None;
273 }
274
275 self.next = None;
276 if has_next {
277 self.next = Some(env.system.pipe()?);
278 }
279
280 Ok(())
281 }
282
283 fn move_to_stdin_stdout(mut self, env: &mut Env) -> std::result::Result<(), Errno> {
286 if let Some((reader, writer)) = self.next {
287 assert_ne!(reader, writer);
288 assert_ne!(self.read_previous, Some(reader));
289 assert_ne!(self.read_previous, Some(writer));
290
291 env.system.close(reader)?;
292 if writer != Fd::STDOUT {
293 if self.read_previous == Some(Fd::STDOUT) {
294 self.read_previous =
295 Some(env.system.dup(Fd::STDOUT, Fd(0), EnumSet::empty())?);
296 }
297 env.system.dup2(writer, Fd::STDOUT)?;
298 env.system.close(writer)?;
299 }
300 }
301 if let Some(reader) = self.read_previous {
302 if reader != Fd::STDIN {
303 env.system.dup2(reader, Fd::STDIN)?;
304 env.system.close(reader)?;
305 }
306 }
307 Ok(())
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::tests::cat_builtin;
315 use crate::tests::return_builtin;
316 use crate::tests::suspend_builtin;
317 use assert_matches::assert_matches;
318 use futures_util::FutureExt;
319 use std::pin::Pin;
320 use std::rc::Rc;
321 use yash_env::VirtualSystem;
322 use yash_env::builtin::Builtin;
323 use yash_env::builtin::Type::Special;
324 use yash_env::job::ProcessResult;
325 use yash_env::job::ProcessState;
326 use yash_env::option::Option::{ErrExit, Monitor};
327 use yash_env::semantics::Field;
328 use yash_env::system::r#virtual::FileBody;
329 use yash_env::system::r#virtual::SIGSTOP;
330 use yash_env_test_helper::assert_stdout;
331 use yash_env_test_helper::in_virtual_system;
332 use yash_env_test_helper::stub_tty;
333
334 #[test]
335 fn empty_pipeline() {
336 let mut env = Env::new_virtual();
337 let pipeline = syntax::Pipeline {
338 commands: vec![],
339 negation: false,
340 };
341 let result = pipeline.execute(&mut env).now_or_never().unwrap();
342 assert_eq!(result, Continue(()));
343 assert_eq!(env.exit_status, ExitStatus(0));
344 }
345
346 #[test]
347 fn single_command_pipeline_returns_exit_status_intact_without_divert() {
348 let mut env = Env::new_virtual();
349 env.builtins.insert("return", return_builtin());
350 let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
351 let result = pipeline.execute(&mut env).now_or_never().unwrap();
352 assert_eq!(result, Continue(()));
353 assert_eq!(env.exit_status, ExitStatus(93));
354 }
355
356 #[test]
357 fn single_command_pipeline_returns_exit_status_intact_with_divert() {
358 let mut env = Env::new_virtual();
359 env.builtins.insert("return", return_builtin());
360 env.exit_status = ExitStatus(17);
361 let pipeline: syntax::Pipeline = "return 37".parse().unwrap();
362 let result = pipeline.execute(&mut env).now_or_never().unwrap();
363 assert_eq!(result, Break(Divert::Return(Some(ExitStatus(37)))));
364 assert_eq!(env.exit_status, ExitStatus(17));
365 }
366
367 #[test]
368 fn multi_command_pipeline_without_pipefail_returns_last_command_exit_status() {
369 in_virtual_system(|mut env, _state| async move {
370 env.builtins.insert("return", return_builtin());
371 env.options.set(PipeFail, Off);
372
373 let pipeline: syntax::Pipeline = "return -n 0 | return -n 0".parse().unwrap();
374 let result = pipeline.execute(&mut env).await;
375 assert_eq!(result, Continue(()));
376 assert_eq!(env.exit_status, ExitStatus(0));
377
378 let pipeline: syntax::Pipeline = "return -n 10 | return -n 20".parse().unwrap();
379 let result = pipeline.execute(&mut env).await;
380 assert_eq!(result, Continue(()));
381 assert_eq!(env.exit_status, ExitStatus(20));
382
383 let pipeline: syntax::Pipeline = "return -n 0 | return -n 20 | return -n 0 |\
384 return -n 30 | return -n 0 | return -n 0"
385 .parse()
386 .unwrap();
387 let result = pipeline.execute(&mut env).await;
388 assert_eq!(result, Continue(()));
389 assert_eq!(env.exit_status, ExitStatus(0));
390 });
391 }
392
393 #[test]
394 fn multi_command_pipeline_with_pipefail_returns_last_failed_command_exit_status() {
395 in_virtual_system(|mut env, _state| async move {
396 env.builtins.insert("return", return_builtin());
397 env.options.set(PipeFail, On);
398
399 let pipeline: syntax::Pipeline = "return -n 0 | return -n 0".parse().unwrap();
400 let result = pipeline.execute(&mut env).await;
401 assert_eq!(result, Continue(()));
402 assert_eq!(env.exit_status, ExitStatus(0));
403
404 let pipeline: syntax::Pipeline = "return -n 10 | return -n 20".parse().unwrap();
405 let result = pipeline.execute(&mut env).await;
406 assert_eq!(result, Continue(()));
407 assert_eq!(env.exit_status, ExitStatus(20));
408
409 let pipeline: syntax::Pipeline = "return -n 0 | return -n 20 | return -n 0 |\
410 return -n 30 | return -n 0 | return -n 0"
411 .parse()
412 .unwrap();
413 let result = pipeline.execute(&mut env).await;
414 assert_eq!(result, Continue(()));
415 assert_eq!(env.exit_status, ExitStatus(30));
416 });
417 }
418
419 #[test]
420 fn multi_command_pipeline_waits_for_all_child_commands() {
421 in_virtual_system(|mut env, state| async move {
422 env.builtins.insert("return", return_builtin());
423 let pipeline: syntax::Pipeline =
424 "return -n 1 | return -n 2 | return -n 3".parse().unwrap();
425 _ = pipeline.execute(&mut env).await;
426
427 for (pid, process) in &state.borrow().processes {
429 if *pid == env.main_pid {
430 assert_eq!(process.state(), ProcessState::Running);
431 } else {
432 assert_matches!(
433 process.state(),
434 ProcessState::Halted(ProcessResult::Exited(_))
435 );
436 }
437 }
438 });
439 }
440
441 #[test]
442 fn multi_command_pipeline_does_not_wait_for_unrelated_child() {
443 in_virtual_system(|mut env, state| async move {
444 env.builtins.insert("return", return_builtin());
445
446 let list: syntax::List = "return -n 7&".parse().unwrap();
447 _ = list.execute(&mut env).await;
448 let async_pid = {
449 let state = state.borrow();
450 let mut iter = state.processes.keys();
451 assert_eq!(iter.next(), Some(&env.main_pid));
452 let async_pid = *iter.next().unwrap();
453 assert_eq!(iter.next(), None);
454 async_pid
455 };
456
457 let pipeline: syntax::Pipeline =
458 "return -n 1 | return -n 2 | return -n 3".parse().unwrap();
459 _ = pipeline.execute(&mut env).await;
460
461 let state = state.borrow();
462 let process = &state.processes[&async_pid];
463 assert_eq!(process.state(), ProcessState::exited(7));
464 assert!(process.state_has_changed());
465 });
466 }
467
468 #[test]
469 fn pipe_connects_commands_in_pipeline() {
470 in_virtual_system(|mut env, state| async move {
471 {
472 let file = state.borrow().file_system.get("/dev/stdin").unwrap();
473 let mut file = file.borrow_mut();
474 file.body = FileBody::new(*b"ok\n");
475 }
476
477 env.builtins.insert("cat", cat_builtin());
478
479 let pipeline: syntax::Pipeline = "cat | cat | cat".parse().unwrap();
480 let result = pipeline.execute(&mut env).await;
481 assert_eq!(result, Continue(()));
482 assert_eq!(env.exit_status, ExitStatus::SUCCESS);
483 assert_stdout(&state, |stdout| assert_eq!(stdout, "ok\n"));
484 });
485 }
486
487 #[test]
488 fn pipeline_leaves_no_pipe_fds_leftover() {
489 in_virtual_system(|mut env, state| async move {
490 env.builtins.insert("cat", cat_builtin());
491 let pipeline: syntax::Pipeline = "cat | cat".parse().unwrap();
492 let _ = pipeline.execute(&mut env).await;
493
494 let state = state.borrow();
495 let fds = state.processes[&env.main_pid].fds();
496 for fd in 3..10 {
497 assert!(!fds.contains_key(&Fd(fd)), "fd={fd}");
498 }
499 });
500 }
501
502 #[test]
503 fn inverting_exit_status_to_0_without_divert() {
504 let mut env = Env::new_virtual();
505 env.builtins.insert("return", return_builtin());
506 let pipeline: syntax::Pipeline = "! return -n 42".parse().unwrap();
507 let result = pipeline.execute(&mut env).now_or_never().unwrap();
508 assert_eq!(result, Continue(()));
509 assert_eq!(env.exit_status, ExitStatus(0));
510 }
511
512 #[test]
513 fn inverting_exit_status_to_1_without_divert() {
514 let mut env = Env::new_virtual();
515 env.builtins.insert("return", return_builtin());
516 let pipeline: syntax::Pipeline = "! return -n 0".parse().unwrap();
517 let result = pipeline.execute(&mut env).now_or_never().unwrap();
518 assert_eq!(result, Continue(()));
519 assert_eq!(env.exit_status, ExitStatus(1));
520 }
521
522 #[test]
523 fn not_inverting_exit_status_with_divert() {
524 let mut env = Env::new_virtual();
525 env.builtins.insert("return", return_builtin());
526 env.exit_status = ExitStatus(3);
527 let pipeline: syntax::Pipeline = "! return 15".parse().unwrap();
528 let result = pipeline.execute(&mut env).now_or_never().unwrap();
529 assert_eq!(result, Break(Divert::Return(Some(ExitStatus(15)))));
530 assert_eq!(env.exit_status, ExitStatus(3));
531 }
532
533 #[test]
534 fn noexec_option() {
535 let mut env = Env::new_virtual();
536 env.builtins.insert("return", return_builtin());
537 env.options.set(Exec, Off);
538 let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
539 let result = pipeline.execute(&mut env).now_or_never().unwrap();
540 assert_eq!(result, Continue(()));
541 assert_eq!(env.exit_status, ExitStatus::SUCCESS);
542 }
543
544 #[test]
545 fn noexec_option_interactive() {
546 let mut env = Env::new_virtual();
547 env.builtins.insert("return", return_builtin());
548 env.options.set(Exec, Off);
549 env.options.set(Interactive, On);
550 let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
551 let result = pipeline.execute(&mut env).now_or_never().unwrap();
552 assert_eq!(result, Continue(()));
553 assert_eq!(env.exit_status, ExitStatus(93));
554 }
555
556 #[test]
557 fn errexit_option() {
558 in_virtual_system(|mut env, _state| async move {
559 env.builtins.insert("return", return_builtin());
560 env.options.set(ErrExit, On);
561
562 let pipeline: syntax::Pipeline = "return -n 0 | return -n 93".parse().unwrap();
563 let result = pipeline.execute(&mut env).await;
564
565 assert_eq!(result, Break(Divert::Exit(None)));
566 assert_eq!(env.exit_status, ExitStatus(93));
567 });
568 }
569
570 #[test]
571 fn stack_without_inversion() {
572 fn stub_builtin(
573 env: &mut Env,
574 _args: Vec<Field>,
575 ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
576 Box::pin(async move {
577 assert!(!env.stack.contains(&Frame::Condition), "{:?}", env.stack);
578 Default::default()
579 })
580 }
581
582 let mut env = Env::new_virtual();
583 env.builtins
584 .insert("foo", Builtin::new(Special, stub_builtin));
585 let pipeline: syntax::Pipeline = "foo".parse().unwrap();
586 let result = pipeline.execute(&mut env).now_or_never().unwrap();
587 assert_eq!(result, Continue(()));
588 }
589
590 #[test]
591 fn stack_with_inversion() {
592 fn stub_builtin(
593 env: &mut Env,
594 _args: Vec<Field>,
595 ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
596 Box::pin(async move {
597 assert_matches!(
598 env.stack.as_slice(),
599 [Frame::Condition, Frame::Builtin { .. }]
600 );
601 Default::default()
602 })
603 }
604
605 let mut env = Env::new_virtual();
606 env.builtins
607 .insert("foo", Builtin::new(Special, stub_builtin));
608 let pipeline: syntax::Pipeline = "! foo".parse().unwrap();
609 let result = pipeline.execute(&mut env).now_or_never().unwrap();
610 assert_eq!(result, Continue(()));
611 }
612
613 #[test]
614 fn process_group_id_of_job_controlled_pipeline() {
615 fn stub_builtin(
616 env: &mut Env,
617 _args: Vec<Field>,
618 ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
619 let pgid = env.system.getpgrp().0 as _;
620 Box::pin(async move { yash_env::builtin::Result::new(ExitStatus(pgid)) })
621 }
622
623 in_virtual_system(|mut env, state| async move {
624 env.builtins
625 .insert("foo", Builtin::new(Special, stub_builtin));
626 env.options.set(Monitor, On);
627 stub_tty(&state);
628
629 let pipeline: syntax::Pipeline = "foo | foo".parse().unwrap();
631 let result = pipeline.execute(&mut env).await;
632 assert_eq!(result, Continue(()));
633 assert_ne!(env.exit_status, ExitStatus(env.main_pgid.0 as _));
634
635 assert_eq!(state.borrow().foreground, Some(env.main_pgid));
637 })
638 }
639
640 #[test]
641 fn job_controlled_suspended_pipeline_in_job_list() {
642 in_virtual_system(|mut env, state| async move {
643 env.builtins.insert("return", return_builtin());
644 env.builtins.insert("suspend", suspend_builtin());
645 env.options.set(Monitor, On);
646 stub_tty(&state);
647
648 let pipeline: syntax::Pipeline = "return -n 0 | suspend x".parse().unwrap();
649 let result = pipeline.execute(&mut env).await;
650 assert_eq!(result, Continue(()));
651 assert_eq!(env.exit_status, ExitStatus::from(SIGSTOP));
652
653 assert_eq!(env.jobs.len(), 1);
654 let job = env.jobs.iter().next().unwrap().1;
655 assert!(job.job_controlled);
656 assert_eq!(job.state, ProcessState::stopped(SIGSTOP));
657 assert!(job.state_changed);
658 assert_eq!(job.name, "return -n 0 | suspend x");
659 })
660 }
661
662 #[test]
663 fn pipe_set_shift_to_first_command() {
664 let system = VirtualSystem::new();
665 let process_id = system.process_id;
666 let state = Rc::clone(&system.state);
667 let mut env = Env::with_system(Box::new(system));
668 let mut pipes = PipeSet::new();
669
670 let result = pipes.shift(&mut env, true);
671 assert_eq!(result, Ok(()));
672 assert_eq!(pipes.read_previous, None);
673 assert_eq!(pipes.next, Some((Fd(3), Fd(4))));
674 let state = state.borrow();
675 let process = &state.processes[&process_id];
676 assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
677 assert_eq!(process.fds().get(&Fd(4)).unwrap().flags, EnumSet::empty());
678 }
679
680 #[test]
681 fn pipe_set_shift_to_middle_command() {
682 let system = VirtualSystem::new();
683 let process_id = system.process_id;
684 let state = Rc::clone(&system.state);
685 let mut env = Env::with_system(Box::new(system));
686 let mut pipes = PipeSet::new();
687
688 let _ = pipes.shift(&mut env, true);
689 let result = pipes.shift(&mut env, true);
690 assert_eq!(result, Ok(()));
691 assert_eq!(pipes.read_previous, Some(Fd(3)));
692 assert_eq!(pipes.next, Some((Fd(4), Fd(5))));
693 let state = state.borrow();
694 let process = &state.processes[&process_id];
695 assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
696 assert_eq!(process.fds().get(&Fd(4)).unwrap().flags, EnumSet::empty());
697 assert_eq!(process.fds().get(&Fd(5)).unwrap().flags, EnumSet::empty());
698 }
699
700 #[test]
701 fn pipe_set_shift_to_last_command() {
702 let system = VirtualSystem::new();
703 let process_id = system.process_id;
704 let state = Rc::clone(&system.state);
705 let mut env = Env::with_system(Box::new(system));
706 let mut pipes = PipeSet::new();
707
708 let _ = pipes.shift(&mut env, true);
709 let result = pipes.shift(&mut env, false);
710 assert_eq!(result, Ok(()));
711 assert_eq!(pipes.read_previous, Some(Fd(3)));
712 assert_eq!(pipes.next, None);
713 let state = state.borrow();
714 let process = &state.processes[&process_id];
715 assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
716 }
717
718 }