creche/
pipeline.rs

1use crate::*;
2use ioconfig::{interprocess_pipe, InterprocessPipeRead};
3use nix::sys::wait::WaitStatus;
4use std::{
5    fs::File,
6    os::fd::{AsRawFd, RawFd},
7};
8use utils::*;
9
10/// Constructs a pipeline that connects stdout of a child process to
11/// stdin of the next child process, except for stdin of the first
12/// process and stdout of the final process.
13///
14/// The [`Self::quiet()`] method may be used to suppress stderr output.
15///
16/// The general pattern for constructing pipelines is to build each
17/// child process first, configuring their environments and fd
18/// redirects to files and such first - everything except the
19/// interprocess pipe between stdout and stdin. Then add the
20/// `ChildBuilder`s to the pipeline.
21///
22/// Example:
23/// ```
24///    let mut ls_cmd = ChildBuilder::new("ls");
25///    ls_cmd.arg("-1");
26///    let mut sort_cmd = ChildBuilder::new("sort");
27///    sort_cmd.arg("-r");
28///    let mut tr_cmd = ChildBuilder::new("tr");
29///    tr_cmd.arg("[:lower:]");
30///    tr_cmd.arg("[:upper:]");
31///
32///    let mut pipeline = SimplePipelineBuilder::new();
33///    let mut children = pipeline
34///        .add_builder(ls_cmd)
35///        .add_builder(sort_cmd)
36///        .add_builder(tr_cmd)
37///        .quiet()
38///        .spawn();
39///
40///    println!("{:?}", children.wait());
41/// ```
42#[derive(Default)]
43pub struct SimplePipelineBuilder {
44    builders: Vec<ChildBuilder>,
45    devnull: Option<File>,
46    env: Option<envconfig::Environment>,
47}
48impl SimplePipelineBuilder {
49    pub fn new() -> Self {
50        Self::default()
51    }
52    /// Adds a [``ChildBuilder``] to the pipeline.
53    pub fn add_builder(&mut self, builder: ChildBuilder) -> &mut Self {
54        self.builders.push(builder);
55        self
56    }
57    /// Sets an environment for all child processes in the pipeline.
58    /// See [`envconfig::EnvironmentBuilder`].
59    pub fn set_env(&mut self, env: envconfig::Environment) -> &mut Self {
60        self.env = Some(env);
61        self
62    }
63    fn devnull(&mut self) -> RawFd {
64        let f = self.devnull.get_or_insert_with(|| {
65            let mut f_opts = File::options();
66            f_opts.write(true).read(false).create(false);
67            let f = f_opts
68                .open(DEVNULL)
69                .expect("Should have opened /dev/null for writing");
70            f
71        });
72        f.as_raw_fd()
73    }
74    /// Redirects stderr of all child processes to /dev/null. Affects
75    /// only the builders added to the pipeline prior to calling this
76    /// method. This is a convenience method that replaces configuring
77    /// each [`ChildBuilder`] to open /dev/null and redirecting the fd.
78    /// To economize file descriptors, it opens /dev/null only once.
79    pub fn quiet(&mut self) -> &mut Self {
80        let raw_fd = self.devnull();
81        for builder in self.builders.iter_mut() {
82            builder.config_io(ioconfig::Redirect::new_with_priority(2, raw_fd, 10));
83        }
84        self
85    }
86    /// Executes the pipeline. Interprocess pipes are created as needed
87    /// to link stdout of a child process to stdin of the next process.
88    /// The child processes are started in the reverse of the order in
89    /// which they were added to the builder.
90    ///
91    /// Returns a [`PipelineChildren`] value that may be `.wait()`ed
92    /// to get the exit status of each child process.
93    pub fn spawn(&mut self) -> PipelineChildren {
94        if self.builders.len() == 0 {
95            panic!("Pipeline has no processes to spawn");
96        }
97
98        // set up interprocess pipes
99        let mut pipe_read: Option<Box<InterprocessPipeRead>> = None;
100        let builders = HeadBodyTailIter::new(self.builders.iter_mut());
101        for ref mut builder in builders {
102            match builder {
103                HeadBodyTail::Only(_) => (),
104                HeadBodyTail::Head(x) => {
105                    let (r, w) = interprocess_pipe(0, 1);
106                    x.config_io(w);
107                    pipe_read = Some(r);
108                }
109                HeadBodyTail::Body(x) => {
110                    let (r, w) = interprocess_pipe(0, 1);
111                    x.config_io(w);
112                    if let Some(prev_r) = pipe_read.take() {
113                        x.config_io(prev_r);
114                    }
115                    pipe_read = Some(r);
116                }
117                HeadBodyTail::Tail(x) => {
118                    if let Some(prev_r) = pipe_read.take() {
119                        x.config_io(prev_r);
120                    }
121                }
122            }
123        }
124
125        // spawn the processes, last to first
126        self.builders.reverse();
127        let mut children = Vec::new();
128        while let Some(mut builder) = self.builders.pop() {
129            // set env
130            if let Some(env) = self.env.as_ref() {
131                builder.set_env(env.clone());
132            }
133            let child = builder.spawn();
134            children.push(child);
135        }
136        PipelineChildren { children }
137    }
138}
139
140/// Struct that contains the ``Child`` values generated by spawning the
141/// ``ChildBuilders`` in the pipeline. Its sole function is to wait on
142/// all of the children processes to terminate, and then return a
143/// ``Vec`` of exit ``WaitStatus``. This status vector is in the same
144/// order that the ``ChildBuilder``s were added to the pipeline
145/// builder.
146pub struct PipelineChildren {
147    children: Vec<Child>,
148}
149impl PipelineChildren {
150    pub fn wait(&mut self) -> Option<Vec<WaitStatus>> {
151        if self.children.len() == 0 {
152            return None;
153        }
154        self.children.reverse();
155        let mut wait_results = Vec::new();
156        while let Some(child) = self.children.pop() {
157            if let Ok(waitstatus) = child.wait() {
158                // child exit
159                wait_results.push(waitstatus);
160            } else {
161                // wait error
162                panic!("Child wait error");
163            }
164        }
165        Some(wait_results)
166    }
167    pub fn children(&self) -> &[Child] {
168        self.children.as_ref()
169    }
170}