yash_semantics/command/
pipeline.rs

1// This file is part of yash, an extended POSIX shell.
2// Copyright (C) 2021 WATANABE Yuki
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17//! Implementation of pipeline semantics.
18
19use super::Command;
20use crate::job::add_job_if_suspended;
21use crate::trap::run_exit_trap;
22use enumset::EnumSet;
23use itertools::Itertools;
24use std::ops::ControlFlow::{Break, Continue};
25use std::rc::Rc;
26use yash_env::Env;
27use yash_env::System;
28use yash_env::io::Fd;
29use yash_env::job::Pid;
30use yash_env::option::Option::{Exec, Interactive};
31use yash_env::option::State::Off;
32use yash_env::semantics::Divert;
33use yash_env::semantics::ExitStatus;
34use yash_env::semantics::Result;
35use yash_env::stack::Frame;
36use yash_env::subshell::JobControl;
37use yash_env::subshell::Subshell;
38use yash_env::system::Errno;
39use yash_syntax::syntax;
40
41/// Executes the pipeline.
42///
43/// # Executing commands
44///
45/// If this pipeline contains one command, it is executed in the current shell
46/// execution environment.
47///
48/// If the pipeline has more than one command, all the commands are executed
49/// concurrently. Every command is executed in a new subshell. The standard
50/// output of a command is connected to the standard input of the next command
51/// via a pipe, except for the standard output of the last command and the
52/// standard input of the first command, which are not modified.
53///
54/// If the pipeline has no command, it is a no-op.
55///
56/// # Exit status
57///
58/// The exit status of the pipeline is that of the last command (or zero if no
59/// command). If the pipeline starts with an `!`, the exit status is inverted:
60/// zero becomes one, and non-zero becomes zero.
61///
62/// In POSIX, the expected exit status is unclear when an inverted pipeline
63/// performs a jump as in `! return 42`. The behavior disagrees among existing
64/// shells. This implementation does not invert the exit status when the return
65/// value is `Err(Divert::...)`.
66///
67/// # `noexec` option
68///
69/// If the [`Exec`] and [`Interactive`] options are [`Off`] in `env.options`,
70/// the entire execution of the pipeline is skipped. (The `noexec` option is
71/// ignored if the shell is interactive, otherwise you cannot exit the shell
72/// in any way if the `ignoreeof` option is set.)
73///
74/// # Stack
75///
76/// if `self.negation` is true, [`Frame::Condition`] is pushed to the
77/// environment's stack while the pipeline is executed.
78impl Command for syntax::Pipeline {
79    async fn execute(&self, env: &mut Env) -> Result {
80        if env.options.get(Exec) == Off && env.options.get(Interactive) == Off {
81            return Continue(());
82        }
83
84        if !self.negation {
85            return execute_commands_in_pipeline(env, &self.commands).await;
86        }
87
88        let mut env = env.push_frame(Frame::Condition);
89        execute_commands_in_pipeline(&mut env, &self.commands).await?;
90        env.exit_status = if env.exit_status.is_successful() {
91            ExitStatus::FAILURE
92        } else {
93            ExitStatus::SUCCESS
94        };
95        Continue(())
96    }
97}
98
99async fn execute_commands_in_pipeline(env: &mut Env, commands: &[Rc<syntax::Command>]) -> Result {
100    match commands.len() {
101        0 => {
102            env.exit_status = ExitStatus::SUCCESS;
103            Continue(())
104        }
105
106        1 => commands[0].execute(env).await,
107
108        _ => {
109            if env.controls_jobs() {
110                execute_job_controlled_pipeline(env, commands).await?
111            } else {
112                execute_multi_command_pipeline(env, commands).await?
113            }
114            env.apply_errexit()
115        }
116    }
117}
118
119async fn execute_job_controlled_pipeline(
120    env: &mut Env,
121    commands: &[Rc<syntax::Command>],
122) -> Result {
123    let commands_2 = commands.to_vec();
124    let subshell = Subshell::new(|sub_env, _job_control| {
125        Box::pin(async move {
126            let result = execute_multi_command_pipeline(sub_env, &commands_2).await;
127            sub_env.apply_result(result);
128            run_exit_trap(sub_env).await;
129        })
130    })
131    .job_control(JobControl::Foreground);
132
133    match subshell.start_and_wait(env).await {
134        Ok((pid, result)) => {
135            env.exit_status = add_job_if_suspended(env, pid, result, || to_job_name(commands))?;
136            Continue(())
137        }
138        Err(errno) => {
139            // TODO print error location using yash_env::io::print_error
140            let message = format!("cannot start a subshell in the pipeline: {errno}\n");
141            env.system.print_error(&message).await;
142            Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
143        }
144    }
145}
146
147fn to_job_name(commands: &[Rc<syntax::Command>]) -> String {
148    commands
149        .iter()
150        .format_with(" | ", |cmd, f| f(&format_args!("{cmd}")))
151        .to_string()
152}
153
154async fn execute_multi_command_pipeline(env: &mut Env, commands: &[Rc<syntax::Command>]) -> Result {
155    // Start commands
156    let mut commands = commands.iter().cloned().peekable();
157    let mut pipes = PipeSet::new();
158    let mut pids = Vec::new();
159    while let Some(command) = commands.next() {
160        let has_next = commands.peek().is_some();
161        shift_or_fail(env, &mut pipes, has_next).await?;
162
163        let pipes = pipes;
164        let subshell = Subshell::new(move |env, _job_control| {
165            Box::pin(async move {
166                let result = connect_pipe_and_execute_command(env, pipes, command).await;
167                env.apply_result(result);
168                run_exit_trap(env).await;
169            })
170        });
171        let start_result = subshell.start(env).await;
172        pids.push(pid_or_fail(env, start_result).await?);
173    }
174
175    shift_or_fail(env, &mut pipes, false).await?;
176
177    // Await the last command
178    for pid in pids {
179        // TODO Report if the child was signaled and the shell is interactive
180        env.exit_status = env
181            .wait_for_subshell_to_finish(pid)
182            .await
183            .expect("cannot receive exit status of child process")
184            .1;
185    }
186    Continue(())
187}
188
189async fn shift_or_fail(env: &mut Env, pipes: &mut PipeSet, has_next: bool) -> Result {
190    match pipes.shift(env, has_next) {
191        Ok(()) => Continue(()),
192        Err(errno) => {
193            // TODO print error location using yash_env::io::print_error
194            let message = format!("cannot connect pipes in the pipeline: {errno}\n");
195            env.system.print_error(&message).await;
196            Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
197        }
198    }
199}
200
201async fn connect_pipe_and_execute_command(
202    env: &mut Env,
203    pipes: PipeSet,
204    command: Rc<syntax::Command>,
205) -> Result {
206    match pipes.move_to_stdin_stdout(env) {
207        Ok(()) => (),
208        Err(errno) => {
209            // TODO print error location using yash_env::io::print_error
210            let message = format!("cannot connect pipes in the pipeline: {errno}\n");
211            env.system.print_error(&message).await;
212            return Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)));
213        }
214    }
215
216    command.execute(env).await
217}
218
219async fn pid_or_fail(
220    env: &mut Env,
221    start_result: std::result::Result<(Pid, Option<JobControl>), Errno>,
222) -> Result<Pid> {
223    match start_result {
224        Ok((pid, job_control)) => {
225            debug_assert_eq!(job_control, None);
226            Continue(pid)
227        }
228        Err(errno) => {
229            // TODO print error location using yash_env::io::print_error
230            env.system
231                .print_error(&format!(
232                    "cannot start a subshell in the pipeline: {errno}\n"
233                ))
234                .await;
235            Break(Divert::Interrupt(Some(ExitStatus::NOEXEC)))
236        }
237    }
238}
239
240/// Set of pipe file descriptors that connect commands.
241#[derive(Clone, Copy, Default)]
242struct PipeSet {
243    read_previous: Option<Fd>,
244    /// Reader and writer to the next command.
245    next: Option<(Fd, Fd)>,
246}
247
248impl PipeSet {
249    fn new() -> Self {
250        Self::default()
251    }
252
253    /// Updates the pipe set for the next command.
254    ///
255    /// Closes FDs that are no longer necessary and opens a new pipe if there is
256    /// a next command.
257    fn shift(&mut self, env: &mut Env, has_next: bool) -> std::result::Result<(), Errno> {
258        if let Some(fd) = self.read_previous {
259            let _ = env.system.close(fd);
260        }
261
262        if let Some((reader, writer)) = self.next {
263            let _ = env.system.close(writer);
264            self.read_previous = Some(reader);
265        } else {
266            self.read_previous = None;
267        }
268
269        self.next = None;
270        if has_next {
271            self.next = Some(env.system.pipe()?);
272        }
273
274        Ok(())
275    }
276
277    /// Moves the pipe FDs to stdin/stdout and closes the FDs that are no longer
278    /// necessary.
279    fn move_to_stdin_stdout(mut self, env: &mut Env) -> std::result::Result<(), Errno> {
280        if let Some((reader, writer)) = self.next {
281            assert_ne!(reader, writer);
282            assert_ne!(self.read_previous, Some(reader));
283            assert_ne!(self.read_previous, Some(writer));
284
285            env.system.close(reader)?;
286            if writer != Fd::STDOUT {
287                if self.read_previous == Some(Fd::STDOUT) {
288                    self.read_previous =
289                        Some(env.system.dup(Fd::STDOUT, Fd(0), EnumSet::empty())?);
290                }
291                env.system.dup2(writer, Fd::STDOUT)?;
292                env.system.close(writer)?;
293            }
294        }
295        if let Some(reader) = self.read_previous {
296            if reader != Fd::STDIN {
297                env.system.dup2(reader, Fd::STDIN)?;
298                env.system.close(reader)?;
299            }
300        }
301        Ok(())
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::tests::cat_builtin;
309    use crate::tests::return_builtin;
310    use crate::tests::suspend_builtin;
311    use assert_matches::assert_matches;
312    use futures_util::FutureExt;
313    use std::pin::Pin;
314    use std::rc::Rc;
315    use yash_env::VirtualSystem;
316    use yash_env::builtin::Builtin;
317    use yash_env::builtin::Type::Special;
318    use yash_env::job::ProcessResult;
319    use yash_env::job::ProcessState;
320    use yash_env::option::Option::ErrExit;
321    use yash_env::option::Option::Monitor;
322    use yash_env::option::State::On;
323    use yash_env::semantics::Field;
324    use yash_env::system::r#virtual::FileBody;
325    use yash_env::system::r#virtual::SIGSTOP;
326    use yash_env_test_helper::assert_stdout;
327    use yash_env_test_helper::in_virtual_system;
328    use yash_env_test_helper::stub_tty;
329
330    #[test]
331    fn empty_pipeline() {
332        let mut env = Env::new_virtual();
333        let pipeline = syntax::Pipeline {
334            commands: vec![],
335            negation: false,
336        };
337        let result = pipeline.execute(&mut env).now_or_never().unwrap();
338        assert_eq!(result, Continue(()));
339        assert_eq!(env.exit_status, ExitStatus(0));
340    }
341
342    #[test]
343    fn single_command_pipeline_returns_exit_status_intact_without_divert() {
344        let mut env = Env::new_virtual();
345        env.builtins.insert("return", return_builtin());
346        let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
347        let result = pipeline.execute(&mut env).now_or_never().unwrap();
348        assert_eq!(result, Continue(()));
349        assert_eq!(env.exit_status, ExitStatus(93));
350    }
351
352    #[test]
353    fn single_command_pipeline_returns_exit_status_intact_with_divert() {
354        let mut env = Env::new_virtual();
355        env.builtins.insert("return", return_builtin());
356        env.exit_status = ExitStatus(17);
357        let pipeline: syntax::Pipeline = "return 37".parse().unwrap();
358        let result = pipeline.execute(&mut env).now_or_never().unwrap();
359        assert_eq!(result, Break(Divert::Return(Some(ExitStatus(37)))));
360        assert_eq!(env.exit_status, ExitStatus(17));
361    }
362
363    #[test]
364    fn multi_command_pipeline_returns_last_command_exit_status() {
365        in_virtual_system(|mut env, _state| async move {
366            env.builtins.insert("return", return_builtin());
367            let pipeline: syntax::Pipeline = "return -n 10 | return -n 20".parse().unwrap();
368            let result = pipeline.execute(&mut env).await;
369            assert_eq!(result, Continue(()));
370            assert_eq!(env.exit_status, ExitStatus(20));
371        });
372    }
373
374    #[test]
375    fn multi_command_pipeline_waits_for_all_child_commands() {
376        in_virtual_system(|mut env, state| async move {
377            env.builtins.insert("return", return_builtin());
378            let pipeline: syntax::Pipeline =
379                "return -n 1 | return -n 2 | return -n 3".parse().unwrap();
380            _ = pipeline.execute(&mut env).await;
381
382            // Only the original process remains.
383            for (pid, process) in &state.borrow().processes {
384                if *pid == env.main_pid {
385                    assert_eq!(process.state(), ProcessState::Running);
386                } else {
387                    assert_matches!(
388                        process.state(),
389                        ProcessState::Halted(ProcessResult::Exited(_))
390                    );
391                }
392            }
393        });
394    }
395
396    #[test]
397    fn multi_command_pipeline_does_not_wait_for_unrelated_child() {
398        in_virtual_system(|mut env, state| async move {
399            env.builtins.insert("return", return_builtin());
400
401            let list: syntax::List = "return -n 7&".parse().unwrap();
402            _ = list.execute(&mut env).await;
403            let async_pid = {
404                let state = state.borrow();
405                let mut iter = state.processes.keys();
406                assert_eq!(iter.next(), Some(&env.main_pid));
407                let async_pid = *iter.next().unwrap();
408                assert_eq!(iter.next(), None);
409                async_pid
410            };
411
412            let pipeline: syntax::Pipeline =
413                "return -n 1 | return -n 2 | return -n 3".parse().unwrap();
414            _ = pipeline.execute(&mut env).await;
415
416            let state = state.borrow();
417            let process = &state.processes[&async_pid];
418            assert_eq!(process.state(), ProcessState::exited(7));
419            assert!(process.state_has_changed());
420        });
421    }
422
423    #[test]
424    fn pipe_connects_commands_in_pipeline() {
425        in_virtual_system(|mut env, state| async move {
426            {
427                let file = state.borrow().file_system.get("/dev/stdin").unwrap();
428                let mut file = file.borrow_mut();
429                file.body = FileBody::new(*b"ok\n");
430            }
431
432            env.builtins.insert("cat", cat_builtin());
433
434            let pipeline: syntax::Pipeline = "cat | cat | cat".parse().unwrap();
435            let result = pipeline.execute(&mut env).await;
436            assert_eq!(result, Continue(()));
437            assert_eq!(env.exit_status, ExitStatus::SUCCESS);
438            assert_stdout(&state, |stdout| assert_eq!(stdout, "ok\n"));
439        });
440    }
441
442    #[test]
443    fn pipeline_leaves_no_pipe_fds_leftover() {
444        in_virtual_system(|mut env, state| async move {
445            env.builtins.insert("cat", cat_builtin());
446            let pipeline: syntax::Pipeline = "cat | cat".parse().unwrap();
447            let _ = pipeline.execute(&mut env).await;
448
449            let state = state.borrow();
450            let fds = state.processes[&env.main_pid].fds();
451            for fd in 3..10 {
452                assert!(!fds.contains_key(&Fd(fd)), "fd={fd}");
453            }
454        });
455    }
456
457    #[test]
458    fn inverting_exit_status_to_0_without_divert() {
459        let mut env = Env::new_virtual();
460        env.builtins.insert("return", return_builtin());
461        let pipeline: syntax::Pipeline = "! return -n 42".parse().unwrap();
462        let result = pipeline.execute(&mut env).now_or_never().unwrap();
463        assert_eq!(result, Continue(()));
464        assert_eq!(env.exit_status, ExitStatus(0));
465    }
466
467    #[test]
468    fn inverting_exit_status_to_1_without_divert() {
469        let mut env = Env::new_virtual();
470        env.builtins.insert("return", return_builtin());
471        let pipeline: syntax::Pipeline = "! return -n 0".parse().unwrap();
472        let result = pipeline.execute(&mut env).now_or_never().unwrap();
473        assert_eq!(result, Continue(()));
474        assert_eq!(env.exit_status, ExitStatus(1));
475    }
476
477    #[test]
478    fn not_inverting_exit_status_with_divert() {
479        let mut env = Env::new_virtual();
480        env.builtins.insert("return", return_builtin());
481        env.exit_status = ExitStatus(3);
482        let pipeline: syntax::Pipeline = "! return 15".parse().unwrap();
483        let result = pipeline.execute(&mut env).now_or_never().unwrap();
484        assert_eq!(result, Break(Divert::Return(Some(ExitStatus(15)))));
485        assert_eq!(env.exit_status, ExitStatus(3));
486    }
487
488    #[test]
489    fn noexec_option() {
490        let mut env = Env::new_virtual();
491        env.builtins.insert("return", return_builtin());
492        env.options.set(Exec, Off);
493        let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
494        let result = pipeline.execute(&mut env).now_or_never().unwrap();
495        assert_eq!(result, Continue(()));
496        assert_eq!(env.exit_status, ExitStatus::SUCCESS);
497    }
498
499    #[test]
500    fn noexec_option_interactive() {
501        let mut env = Env::new_virtual();
502        env.builtins.insert("return", return_builtin());
503        env.options.set(Exec, Off);
504        env.options.set(Interactive, On);
505        let pipeline: syntax::Pipeline = "return -n 93".parse().unwrap();
506        let result = pipeline.execute(&mut env).now_or_never().unwrap();
507        assert_eq!(result, Continue(()));
508        assert_eq!(env.exit_status, ExitStatus(93));
509    }
510
511    #[test]
512    fn errexit_option() {
513        in_virtual_system(|mut env, _state| async move {
514            env.builtins.insert("return", return_builtin());
515            env.options.set(ErrExit, On);
516
517            let pipeline: syntax::Pipeline = "return -n 0 | return -n 93".parse().unwrap();
518            let result = pipeline.execute(&mut env).await;
519
520            assert_eq!(result, Break(Divert::Exit(None)));
521            assert_eq!(env.exit_status, ExitStatus(93));
522        });
523    }
524
525    #[test]
526    fn stack_without_inversion() {
527        fn stub_builtin(
528            env: &mut Env,
529            _args: Vec<Field>,
530        ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
531            Box::pin(async move {
532                assert!(!env.stack.contains(&Frame::Condition), "{:?}", env.stack);
533                Default::default()
534            })
535        }
536
537        let mut env = Env::new_virtual();
538        env.builtins
539            .insert("foo", Builtin::new(Special, stub_builtin));
540        let pipeline: syntax::Pipeline = "foo".parse().unwrap();
541        let result = pipeline.execute(&mut env).now_or_never().unwrap();
542        assert_eq!(result, Continue(()));
543    }
544
545    #[test]
546    fn stack_with_inversion() {
547        fn stub_builtin(
548            env: &mut Env,
549            _args: Vec<Field>,
550        ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
551            Box::pin(async move {
552                assert_matches!(
553                    env.stack.as_slice(),
554                    [Frame::Condition, Frame::Builtin { .. }]
555                );
556                Default::default()
557            })
558        }
559
560        let mut env = Env::new_virtual();
561        env.builtins
562            .insert("foo", Builtin::new(Special, stub_builtin));
563        let pipeline: syntax::Pipeline = "! foo".parse().unwrap();
564        let result = pipeline.execute(&mut env).now_or_never().unwrap();
565        assert_eq!(result, Continue(()));
566    }
567
568    #[test]
569    fn process_group_id_of_job_controlled_pipeline() {
570        fn stub_builtin(
571            env: &mut Env,
572            _args: Vec<Field>,
573        ) -> Pin<Box<dyn Future<Output = yash_env::builtin::Result> + '_>> {
574            let pgid = env.system.getpgrp().0 as _;
575            Box::pin(async move { yash_env::builtin::Result::new(ExitStatus(pgid)) })
576        }
577
578        in_virtual_system(|mut env, state| async move {
579            env.builtins
580                .insert("foo", Builtin::new(Special, stub_builtin));
581            env.options.set(Monitor, On);
582            stub_tty(&state);
583
584            // TODO Better test all pipeline component exit statuses
585            let pipeline: syntax::Pipeline = "foo | foo".parse().unwrap();
586            let result = pipeline.execute(&mut env).await;
587            assert_eq!(result, Continue(()));
588            assert_ne!(env.exit_status, ExitStatus(env.main_pgid.0 as _));
589
590            // The shell should come back to the foreground after running the pipeline
591            assert_eq!(state.borrow().foreground, Some(env.main_pgid));
592        })
593    }
594
595    #[test]
596    fn job_controlled_suspended_pipeline_in_job_list() {
597        in_virtual_system(|mut env, state| async move {
598            env.builtins.insert("return", return_builtin());
599            env.builtins.insert("suspend", suspend_builtin());
600            env.options.set(Monitor, On);
601            stub_tty(&state);
602
603            let pipeline: syntax::Pipeline = "return -n 0 | suspend x".parse().unwrap();
604            let result = pipeline.execute(&mut env).await;
605            assert_eq!(result, Continue(()));
606            assert_eq!(env.exit_status, ExitStatus::from(SIGSTOP));
607
608            assert_eq!(env.jobs.len(), 1);
609            let job = env.jobs.iter().next().unwrap().1;
610            assert!(job.job_controlled);
611            assert_eq!(job.state, ProcessState::stopped(SIGSTOP));
612            assert!(job.state_changed);
613            assert_eq!(job.name, "return -n 0 | suspend x");
614        })
615    }
616
617    #[test]
618    fn pipe_set_shift_to_first_command() {
619        let system = VirtualSystem::new();
620        let process_id = system.process_id;
621        let state = Rc::clone(&system.state);
622        let mut env = Env::with_system(Box::new(system));
623        let mut pipes = PipeSet::new();
624
625        let result = pipes.shift(&mut env, true);
626        assert_eq!(result, Ok(()));
627        assert_eq!(pipes.read_previous, None);
628        assert_eq!(pipes.next, Some((Fd(3), Fd(4))));
629        let state = state.borrow();
630        let process = &state.processes[&process_id];
631        assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
632        assert_eq!(process.fds().get(&Fd(4)).unwrap().flags, EnumSet::empty());
633    }
634
635    #[test]
636    fn pipe_set_shift_to_middle_command() {
637        let system = VirtualSystem::new();
638        let process_id = system.process_id;
639        let state = Rc::clone(&system.state);
640        let mut env = Env::with_system(Box::new(system));
641        let mut pipes = PipeSet::new();
642
643        let _ = pipes.shift(&mut env, true);
644        let result = pipes.shift(&mut env, true);
645        assert_eq!(result, Ok(()));
646        assert_eq!(pipes.read_previous, Some(Fd(3)));
647        assert_eq!(pipes.next, Some((Fd(4), Fd(5))));
648        let state = state.borrow();
649        let process = &state.processes[&process_id];
650        assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
651        assert_eq!(process.fds().get(&Fd(4)).unwrap().flags, EnumSet::empty());
652        assert_eq!(process.fds().get(&Fd(5)).unwrap().flags, EnumSet::empty());
653    }
654
655    #[test]
656    fn pipe_set_shift_to_last_command() {
657        let system = VirtualSystem::new();
658        let process_id = system.process_id;
659        let state = Rc::clone(&system.state);
660        let mut env = Env::with_system(Box::new(system));
661        let mut pipes = PipeSet::new();
662
663        let _ = pipes.shift(&mut env, true);
664        let result = pipes.shift(&mut env, false);
665        assert_eq!(result, Ok(()));
666        assert_eq!(pipes.read_previous, Some(Fd(3)));
667        assert_eq!(pipes.next, None);
668        let state = state.borrow();
669        let process = &state.processes[&process_id];
670        assert_eq!(process.fds().get(&Fd(3)).unwrap().flags, EnumSet::empty());
671    }
672
673    // TODO test PipeSet::move_to_stdin_stdout
674}