1use super::Command;
20use crate::job::add_job_if_suspended;
21use crate::trap::run_exit_trap;
22use enumset::EnumSet;
23use itertools::Itertools;
24use std::ops::ControlFlow::{Break, Continue};
25use std::rc::Rc;
26use yash_env::Env;
27use yash_env::System;
28use yash_env::io::Fd;
29use yash_env::job::Pid;
30use yash_env::option::Option::{Exec, Interactive};
31use yash_env::option::State::Off;
32use yash_env::semantics::Divert;
33use yash_env::semantics::ExitStatus;
34use yash_env::semantics::Result;
35use yash_env::stack::Frame;
36use yash_env::subshell::JobControl;
37use yash_env::subshell::Subshell;
38use yash_env::system::Errno;
39use yash_syntax::syntax;
40
41impl Command for syntax::Pipeline {
79 async fn execute(&self, env: &mut Env) -> Result {
80 if env.options.get(Exec) == Off && env.options.get(Interactive) == Off {
81 return Continue(());
82 }
83
84 if !self.negation {
85 return execute_commands_in_pipeline(env, &self.commands).await;
86 }
87
88 let mut env = env.push_frame(Frame::Condition);
89 execute_commands_in_pipeline(&mut env, &self.commands).await?;
90 env.exit_status = if env.exit_status.is_successful() {
91 ExitStatus::FAILURE
92 } else {
93 ExitStatus::SUCCESS
94 };
95 Continue(())
96 }
97}
98
99async fn execute_commands_in_pipeline(env: &mut Env, commands: &[Rc<syntax::Command>]) -> Result {
100 match commands.len() {
101 0 => {
102 env.exit_status = ExitStatus::SUCCESS;
103 Continue(())
104 }
105
106 1 => commands[0].execute(env).await,
107
108 _ => {
109 if env.controls_jobs() {
110 execute_job_controlled_pipeline(env, commands).await?
111 } else {
112 execute_multi_command_pipeline(env, commands).await?
113 }
114 env.apply_errexit()
115 }
116 }
117}
118
119async fn execute_job_controlled_pipeline(
120 env: &mut Env,
121 commands: &[Rc<syntax::Command>],
122) -> Result {
123 let commands_2 = commands.to_vec();
124 let subshell = Subshell::new(|sub_env, _job_control| {
125 Box::pin(async move {
126 let result = execute_multi_command_pipeline(sub_env, &commands_2).await;
127 sub_env.apply_result(result);
128 run_exit_trap(sub_env).await;
129 })
130 })
131 .job_control(JobControl::Foreground);
132
133 match subshell.start_and_wait(env).await {
134 Ok((pid, result)) => {
135 env.exit_status = add_job_if_suspended(env, pid, result, || to_job_name(commands))?;
136 Continue(())
137 }
138 Err(errno) => {
139 let message = format!("cannot start a subshell in the pipeline: {errno}\n");
141 env.system.print_error(&message).await;
142 Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
143 }
144 }
145}
146
147fn to_job_name(commands: &[Rc<syntax::Command>]) -> String {
148 commands
149 .iter()
150 .format_with(" | ", |cmd, f| f(&format_args!("{cmd}")))
151 .to_string()
152}
153
154async fn execute_multi_command_pipeline(env: &mut Env, commands: &[Rc<syntax::Command>]) -> Result {
155 let mut commands = commands.iter().cloned().peekable();
157 let mut pipes = PipeSet::new();
158 let mut pids = Vec::new();
159 while let Some(command) = commands.next() {
160 let has_next = commands.peek().is_some();
161 shift_or_fail(env, &mut pipes, has_next).await?;
162
163 let pipes = pipes;
164 let subshell = Subshell::new(move |env, _job_control| {
165 Box::pin(async move {
166 let result = connect_pipe_and_execute_command(env, pipes, command).await;
167 env.apply_result(result);
168 run_exit_trap(env).await;
169 })
170 });
171 let start_result = subshell.start(env).await;
172 pids.push(pid_or_fail(env, start_result).await?);
173 }
174
175 shift_or_fail(env, &mut pipes, false).await?;
176
177 for pid in pids {
179 env.exit_status = env
181 .wait_for_subshell_to_finish(pid)
182 .await
183 .expect("cannot receive exit status of child process")
184 .1;
185 }
186 Continue(())
187}
188
189async fn shift_or_fail(env: &mut Env, pipes: &mut PipeSet, has_next: bool) -> Result {
190 match pipes.shift(env, has_next) {
191 Ok(()) => Continue(()),
192 Err(errno) => {
193 let message = format!("cannot connect pipes in the pipeline: {errno}\n");
195 env.system.print_error(&message).await;
196 Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
197 }
198 }
199}
200
201async fn connect_pipe_and_execute_command(
202 env: &mut Env,
203 pipes: PipeSet,
204 command: Rc<syntax::Command>,
205) -> Result {
206 match pipes.move_to_stdin_stdout(env) {
207 Ok(()) => (),
208 Err(errno) => {
209 let message = format!("cannot connect pipes in the pipeline: {errno}\n");
211 env.system.print_error(&message).await;
212 return Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)));
213 }
214 }
215
216 command.execute(env).await
217}
218
219async fn pid_or_fail(
220 env: &mut Env,
221 start_result: std::result::Result<(Pid, Option<JobControl>), Errno>,
222) -> Result<Pid> {
223 match start_result {
224 Ok((pid, job_control)) => {
225 debug_assert_eq!(job_control, None);
226 Continue(pid)
227 }
228 Err(errno) => {
229 env.system
231 .print_error(&format!(
232 "cannot start a subshell in the pipeline: {errno}\n"
233 ))
234 .await;
235 Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
236 }
237 }
238}
239
240#[derive(Clone, Copy, Default)]
242struct PipeSet {
243 read_previous: Option<Fd>,
244 next: Option<(Fd, Fd)>,
246}
247
248impl PipeSet {
249 fn new() -> Self {
250 Self::default()
251 }
252
253 fn shift(&mut self, env: &mut Env, has_next: bool) -> std::result::Result<(), Errno> {
258 if let Some(fd) = self.read_previous {
259 let _ = env.system.close(fd);
260 }
261
262 if let Some((reader, writer)) = self.next {
263 let _ = env.system.close(writer);
264 self.read_previous = Some(reader);
265 } else {
266 self.read_previous = None;
267 }
268
269 self.next = None;
270 if has_next {
271 self.next = Some(env.system.pipe()?);
272 }
273
274 Ok(())
275 }
276
277 fn move_to_stdin_stdout(mut self, env: &mut Env) -> std::result::Result<(), Errno> {
280 if let Some((reader, writer)) = self.next {
281 assert_ne!(reader, writer);
282 assert_ne!(self.read_previous, Some(reader));
283 assert_ne!(self.read_previous, Some(writer));
284
285 env.system.close(reader)?;
286 if writer != Fd::STDOUT {
287 if self.read_previous == Some(Fd::STDOUT) {
288 self.read_previous =
289 Some(env.system.dup(Fd::STDOUT, Fd(0), EnumSet::empty())?);
290 }
291 env.system.dup2(writer, Fd::STDOUT)?;
292 env.system.close(writer)?;
293 }
294 }
295 if let Some(reader) = self.read_previous {
296 if reader != Fd::STDIN {
297 env.system.dup2(reader, Fd::STDIN)?;
298 env.system.close(reader)?;
299 }
300 }
301 Ok(())
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::tests::cat_builtin;
309 use crate::tests::return_builtin;
310 use crate::tests::suspend_builtin;
311 use assert_matches::assert_matches;
312 use futures_util::FutureExt;
313 use std::pin::Pin;
314 use std::rc::Rc;
315 use yash_env::VirtualSystem;
316 use yash_env::builtin::Builtin;
317 use yash_env::builtin::Type::Special;
318 use yash_env::job::ProcessResult;
319 use yash_env::job::ProcessState;
320 use yash_env::option::Option::ErrExit;
321 use yash_env::option::Option::Monitor;
322 use yash_env::option::State::On;
323 use yash_env::semantics::Field;
324 use yash_env::system::r#virtual::FileBody;
325 use yash_env::system::r#virtual::SIGSTOP;
326 use yash_env_test_helper::assert_stdout;
327 use yash_env_test_helper::in_virtual_system;
328 use yash_env_test_helper::stub_tty;
329
330 #[test]
331 fn empty_pipeline() {
332 let mut env = Env::new_virtual();
333 let pipeline = syntax::Pipeline {
334 commands: vec![],
335 negation: false,
336 };
337 let result = pipeline.execute(&mut env).now_or_never().unwrap();
338 assert_eq!(result, Continue(()));
339 assert_eq!(env.exit_status, ExitStatus(0));
340 }
341
342 #[test]
343 fn single_command_pipeline_returns_exit_status_intact_without_divert() {
344 let mut env = Env::new_virtual();
345 env.builtins.insert("return", return_builtin());
346 let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
347 let result = pipeline.execute(&mut env).now_or_never().unwrap();
348 assert_eq!(result, Continue(()));
349 assert_eq!(env.exit_status, ExitStatus(93));
350 }
351
352 #[test]
353 fn single_command_pipeline_returns_exit_status_intact_with_divert() {
354 let mut env = Env::new_virtual();
355 env.builtins.insert("return", return_builtin());
356 env.exit_status = ExitStatus(17);
357 let pipeline: syntax::Pipeline = "return 37".parse().unwrap();
358 let result = pipeline.execute(&mut env).now_or_never().unwrap();
359 assert_eq!(result, Break(Divert::Return(Some(ExitStatus(37)))));
360 assert_eq!(env.exit_status, ExitStatus(17));
361 }
362
363 #[test]
364 fn multi_command_pipeline_returns_last_command_exit_status() {
365 in_virtual_system(|mut env, _state| async move {
366 env.builtins.insert("return", return_builtin());
367 let pipeline: syntax::Pipeline = "return -n 10 | return -n 20".parse().unwrap();
368 let result = pipeline.execute(&mut env).await;
369 assert_eq!(result, Continue(()));
370 assert_eq!(env.exit_status, ExitStatus(20));
371 });
372 }
373
374 #[test]
375 fn multi_command_pipeline_waits_for_all_child_commands() {
376 in_virtual_system(|mut env, state| async move {
377 env.builtins.insert("return", return_builtin());
378 let pipeline: syntax::Pipeline =
379 "return -n 1 | return -n 2 | return -n 3".parse().unwrap();
380 _ = pipeline.execute(&mut env).await;
381
382 for (pid, process) in &state.borrow().processes {
384 if *pid == env.main_pid {
385 assert_eq!(process.state(), ProcessState::Running);
386 } else {
387 assert_matches!(
388 process.state(),
389 ProcessState::Halted(ProcessResult::Exited(_))
390 );
391 }
392 }
393 });
394 }
395
396 #[test]
397 fn multi_command_pipeline_does_not_wait_for_unrelated_child() {
398 in_virtual_system(|mut env, state| async move {
399 env.builtins.insert("return", return_builtin());
400
401 let list: syntax::List = "return -n 7&".parse().unwrap();
402 _ = list.execute(&mut env).await;
403 let async_pid = {
404 let state = state.borrow();
405 let mut iter = state.processes.keys();
406 assert_eq!(iter.next(), Some(&env.main_pid));
407 let async_pid = *iter.next().unwrap();
408 assert_eq!(iter.next(), None);
409 async_pid
410 };
411
412 let pipeline: syntax::Pipeline =
413 "return -n 1 | return -n 2 | return -n 3".parse().unwrap();
414 _ = pipeline.execute(&mut env).await;
415
416 let state = state.borrow();
417 let process = &state.processes[&async_pid];
418 assert_eq!(process.state(), ProcessState::exited(7));
419 assert!(process.state_has_changed());
420 });
421 }
422
423 #[test]
424 fn pipe_connects_commands_in_pipeline() {
425 in_virtual_system(|mut env, state| async move {
426 {
427 let file = state.borrow().file_system.get("/dev/stdin").unwrap();
428 let mut file = file.borrow_mut();
429 file.body = FileBody::new(*b"ok\n");
430 }
431
432 env.builtins.insert("cat", cat_builtin());
433
434 let pipeline: syntax::Pipeline = "cat | cat | cat".parse().unwrap();
435 let result = pipeline.execute(&mut env).await;
436 assert_eq!(result, Continue(()));
437 assert_eq!(env.exit_status, ExitStatus::SUCCESS);
438 assert_stdout(&state, |stdout| assert_eq!(stdout, "ok\n"));
439 });
440 }
441
442 #[test]
443 fn pipeline_leaves_no_pipe_fds_leftover() {
444 in_virtual_system(|mut env, state| async move {
445 env.builtins.insert("cat", cat_builtin());
446 let pipeline: syntax::Pipeline = "cat | cat".parse().unwrap();
447 let _ = pipeline.execute(&mut env).await;
448
449 let state = state.borrow();
450 let fds = state.processes[&env.main_pid].fds();
451 for fd in 3..10 {
452 assert!(!fds.contains_key(&Fd(fd)), "fd={fd}");
453 }
454 });
455 }
456
457 #[test]
458 fn inverting_exit_status_to_0_without_divert() {
459 let mut env = Env::new_virtual();
460 env.builtins.insert("return", return_builtin());
461 let pipeline: syntax::Pipeline = "! return -n 42".parse().unwrap();
462 let result = pipeline.execute(&mut env).now_or_never().unwrap();
463 assert_eq!(result, Continue(()));
464 assert_eq!(env.exit_status, ExitStatus(0));
465 }
466
467 #[test]
468 fn inverting_exit_status_to_1_without_divert() {
469 let mut env = Env::new_virtual();
470 env.builtins.insert("return", return_builtin());
471 let pipeline: syntax::Pipeline = "! return -n 0".parse().unwrap();
472 let result = pipeline.execute(&mut env).now_or_never().unwrap();
473 assert_eq!(result, Continue(()));
474 assert_eq!(env.exit_status, ExitStatus(1));
475 }
476
477 #[test]
478 fn not_inverting_exit_status_with_divert() {
479 let mut env = Env::new_virtual();
480 env.builtins.insert("return", return_builtin());
481 env.exit_status = ExitStatus(3);
482 let pipeline: syntax::Pipeline = "! return 15".parse().unwrap();
483 let result = pipeline.execute(&mut env).now_or_never().unwrap();
484 assert_eq!(result, Break(Divert::Return(Some(ExitStatus(15)))));
485 assert_eq!(env.exit_status, ExitStatus(3));
486 }
487
488 #[test]
489 fn noexec_option() {
490 let mut env = Env::new_virtual();
491 env.builtins.insert("return", return_builtin());
492 env.options.set(Exec, Off);
493 let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
494 let result = pipeline.execute(&mut env).now_or_never().unwrap();
495 assert_eq!(result, Continue(()));
496 assert_eq!(env.exit_status, ExitStatus::SUCCESS);
497 }
498
499 #[test]
500 fn noexec_option_interactive() {
501 let mut env = Env::new_virtual();
502 env.builtins.insert("return", return_builtin());
503 env.options.set(Exec, Off);
504 env.options.set(Interactive, On);
505 let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
506 let result = pipeline.execute(&mut env).now_or_never().unwrap();
507 assert_eq!(result, Continue(()));
508 assert_eq!(env.exit_status, ExitStatus(93));
509 }
510
511 #[test]
512 fn errexit_option() {
513 in_virtual_system(|mut env, _state| async move {
514 env.builtins.insert("return", return_builtin());
515 env.options.set(ErrExit, On);
516
517 let pipeline: syntax::Pipeline = "return -n 0 | return -n 93".parse().unwrap();
518 let result = pipeline.execute(&mut env).await;
519
520 assert_eq!(result, Break(Divert::Exit(None)));
521 assert_eq!(env.exit_status, ExitStatus(93));
522 });
523 }
524
525 #[test]
526 fn stack_without_inversion() {
527 fn stub_builtin(
528 env: &mut Env,
529 _args: Vec<Field>,
530 ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
531 Box::pin(async move {
532 assert!(!env.stack.contains(&Frame::Condition), "{:?}", env.stack);
533 Default::default()
534 })
535 }
536
537 let mut env = Env::new_virtual();
538 env.builtins
539 .insert("foo", Builtin::new(Special, stub_builtin));
540 let pipeline: syntax::Pipeline = "foo".parse().unwrap();
541 let result = pipeline.execute(&mut env).now_or_never().unwrap();
542 assert_eq!(result, Continue(()));
543 }
544
545 #[test]
546 fn stack_with_inversion() {
547 fn stub_builtin(
548 env: &mut Env,
549 _args: Vec<Field>,
550 ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
551 Box::pin(async move {
552 assert_matches!(
553 env.stack.as_slice(),
554 [Frame::Condition, Frame::Builtin { .. }]
555 );
556 Default::default()
557 })
558 }
559
560 let mut env = Env::new_virtual();
561 env.builtins
562 .insert("foo", Builtin::new(Special, stub_builtin));
563 let pipeline: syntax::Pipeline = "! foo".parse().unwrap();
564 let result = pipeline.execute(&mut env).now_or_never().unwrap();
565 assert_eq!(result, Continue(()));
566 }
567
568 #[test]
569 fn process_group_id_of_job_controlled_pipeline() {
570 fn stub_builtin(
571 env: &mut Env,
572 _args: Vec<Field>,
573 ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
574 let pgid = env.system.getpgrp().0 as _;
575 Box::pin(async move { yash_env::builtin::Result::new(ExitStatus(pgid)) })
576 }
577
578 in_virtual_system(|mut env, state| async move {
579 env.builtins
580 .insert("foo", Builtin::new(Special, stub_builtin));
581 env.options.set(Monitor, On);
582 stub_tty(&state);
583
584 let pipeline: syntax::Pipeline = "foo | foo".parse().unwrap();
586 let result = pipeline.execute(&mut env).await;
587 assert_eq!(result, Continue(()));
588 assert_ne!(env.exit_status, ExitStatus(env.main_pgid.0 as _));
589
590 assert_eq!(state.borrow().foreground, Some(env.main_pgid));
592 })
593 }
594
595 #[test]
596 fn job_controlled_suspended_pipeline_in_job_list() {
597 in_virtual_system(|mut env, state| async move {
598 env.builtins.insert("return", return_builtin());
599 env.builtins.insert("suspend", suspend_builtin());
600 env.options.set(Monitor, On);
601 stub_tty(&state);
602
603 let pipeline: syntax::Pipeline = "return -n 0 | suspend x".parse().unwrap();
604 let result = pipeline.execute(&mut env).await;
605 assert_eq!(result, Continue(()));
606 assert_eq!(env.exit_status, ExitStatus::from(SIGSTOP));
607
608 assert_eq!(env.jobs.len(), 1);
609 let job = env.jobs.iter().next().unwrap().1;
610 assert!(job.job_controlled);
611 assert_eq!(job.state, ProcessState::stopped(SIGSTOP));
612 assert!(job.state_changed);
613 assert_eq!(job.name, "return -n 0 | suspend x");
614 })
615 }
616
617 #[test]
618 fn pipe_set_shift_to_first_command() {
619 let system = VirtualSystem::new();
620 let process_id = system.process_id;
621 let state = Rc::clone(&system.state);
622 let mut env = Env::with_system(Box::new(system));
623 let mut pipes = PipeSet::new();
624
625 let result = pipes.shift(&mut env, true);
626 assert_eq!(result, Ok(()));
627 assert_eq!(pipes.read_previous, None);
628 assert_eq!(pipes.next, Some((Fd(3), Fd(4))));
629 let state = state.borrow();
630 let process = &state.processes[&process_id];
631 assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
632 assert_eq!(process.fds().get(&Fd(4)).unwrap().flags, EnumSet::empty());
633 }
634
635 #[test]
636 fn pipe_set_shift_to_middle_command() {
637 let system = VirtualSystem::new();
638 let process_id = system.process_id;
639 let state = Rc::clone(&system.state);
640 let mut env = Env::with_system(Box::new(system));
641 let mut pipes = PipeSet::new();
642
643 let _ = pipes.shift(&mut env, true);
644 let result = pipes.shift(&mut env, true);
645 assert_eq!(result, Ok(()));
646 assert_eq!(pipes.read_previous, Some(Fd(3)));
647 assert_eq!(pipes.next, Some((Fd(4), Fd(5))));
648 let state = state.borrow();
649 let process = &state.processes[&process_id];
650 assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
651 assert_eq!(process.fds().get(&Fd(4)).unwrap().flags, EnumSet::empty());
652 assert_eq!(process.fds().get(&Fd(5)).unwrap().flags, EnumSet::empty());
653 }
654
655 #[test]
656 fn pipe_set_shift_to_last_command() {
657 let system = VirtualSystem::new();
658 let process_id = system.process_id;
659 let state = Rc::clone(&system.state);
660 let mut env = Env::with_system(Box::new(system));
661 let mut pipes = PipeSet::new();
662
663 let _ = pipes.shift(&mut env, true);
664 let result = pipes.shift(&mut env, false);
665 assert_eq!(result, Ok(()));
666 assert_eq!(pipes.read_previous, Some(Fd(3)));
667 assert_eq!(pipes.next, None);
668 let state = state.borrow();
669 let process = &state.processes[&process_id];
670 assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
671 }
672
673 }