creche/pipeline.rs
1use crate::*;
2use ioconfig::{interprocess_pipe, InterprocessPipeRead};
3use nix::sys::wait::WaitStatus;
4use std::{
5 fs::File,
6 os::fd::{AsRawFd, RawFd},
7};
8use utils::*;
9
10/// Constructs a pipeline that connects stdout of a child process to
11/// stdin of the next child process, except for stdin of the first
12/// process and stdout of the final process.
13///
14/// The [`Self::quiet()`] method may be used to suppress stderr output.
15///
16/// The general pattern for constructing pipelines is to build each
17/// child process first, configuring their environments and fd
18/// redirects to files and such first - everything except the
19/// interprocess pipe between stdout and stdin. Then add the
20/// `ChildBuilder`s to the pipeline.
21///
22/// Example:
23/// ```
24/// let mut ls_cmd = ChildBuilder::new("ls");
25/// ls_cmd.arg("-1");
26/// let mut sort_cmd = ChildBuilder::new("sort");
27/// sort_cmd.arg("-r");
28/// let mut tr_cmd = ChildBuilder::new("tr");
29/// tr_cmd.arg("[:lower:]");
30/// tr_cmd.arg("[:upper:]");
31///
32/// let mut pipeline = SimplePipelineBuilder::new();
33/// let mut children = pipeline
34/// .add_builder(ls_cmd)
35/// .add_builder(sort_cmd)
36/// .add_builder(tr_cmd)
37/// .quiet()
38/// .spawn();
39///
40/// println!("{:?}", children.wait());
41/// ```
42#[derive(Default)]
43pub struct SimplePipelineBuilder {
44 builders: Vec<ChildBuilder>,
45 devnull: Option<File>,
46 env: Option<envconfig::Environment>,
47}
48impl SimplePipelineBuilder {
49 pub fn new() -> Self {
50 Self::default()
51 }
52 /// Adds a [``ChildBuilder``] to the pipeline.
53 pub fn add_builder(&mut self, builder: ChildBuilder) -> &mut Self {
54 self.builders.push(builder);
55 self
56 }
57 /// Sets an environment for all child processes in the pipeline.
58 /// See [`envconfig::EnvironmentBuilder`].
59 pub fn set_env(&mut self, env: envconfig::Environment) -> &mut Self {
60 self.env = Some(env);
61 self
62 }
63 fn devnull(&mut self) -> RawFd {
64 let f = self.devnull.get_or_insert_with(|| {
65 let mut f_opts = File::options();
66 f_opts.write(true).read(false).create(false);
67 let f = f_opts
68 .open(DEVNULL)
69 .expect("Should have opened /dev/null for writing");
70 f
71 });
72 f.as_raw_fd()
73 }
74 /// Redirects stderr of all child processes to /dev/null. Affects
75 /// only the builders added to the pipeline prior to calling this
76 /// method. This is a convenience method that replaces configuring
77 /// each [`ChildBuilder`] to open /dev/null and redirecting the fd.
78 /// To economize file descriptors, it opens /dev/null only once.
79 pub fn quiet(&mut self) -> &mut Self {
80 let raw_fd = self.devnull();
81 for builder in self.builders.iter_mut() {
82 builder.config_io(ioconfig::Redirect::new_with_priority(2, raw_fd, 10));
83 }
84 self
85 }
86 /// Executes the pipeline. Interprocess pipes are created as needed
87 /// to link stdout of a child process to stdin of the next process.
88 /// The child processes are started in the reverse of the order in
89 /// which they were added to the builder.
90 ///
91 /// Returns a [`PipelineChildren`] value that may be `.wait()`ed
92 /// to get the exit status of each child process.
93 pub fn spawn(&mut self) -> PipelineChildren {
94 if self.builders.len() == 0 {
95 panic!("Pipeline has no processes to spawn");
96 }
97
98 // set up interprocess pipes
99 let mut pipe_read: Option<Box<InterprocessPipeRead>> = None;
100 let builders = HeadBodyTailIter::new(self.builders.iter_mut());
101 for ref mut builder in builders {
102 match builder {
103 HeadBodyTail::Only(_) => (),
104 HeadBodyTail::Head(x) => {
105 let (r, w) = interprocess_pipe(0, 1);
106 x.config_io(w);
107 pipe_read = Some(r);
108 }
109 HeadBodyTail::Body(x) => {
110 let (r, w) = interprocess_pipe(0, 1);
111 x.config_io(w);
112 if let Some(prev_r) = pipe_read.take() {
113 x.config_io(prev_r);
114 }
115 pipe_read = Some(r);
116 }
117 HeadBodyTail::Tail(x) => {
118 if let Some(prev_r) = pipe_read.take() {
119 x.config_io(prev_r);
120 }
121 }
122 }
123 }
124
125 // spawn the processes, last to first
126 self.builders.reverse();
127 let mut children = Vec::new();
128 while let Some(mut builder) = self.builders.pop() {
129 // set env
130 if let Some(env) = self.env.as_ref() {
131 builder.set_env(env.clone());
132 }
133 let child = builder.spawn();
134 children.push(child);
135 }
136 PipelineChildren { children }
137 }
138}
139
140/// Struct that contains the ``Child`` values generated by spawning the
141/// ``ChildBuilders`` in the pipeline. Its sole function is to wait on
142/// all of the children processes to terminate, and then return a
143/// ``Vec`` of exit ``WaitStatus``. This status vector is in the same
144/// order that the ``ChildBuilder``s were added to the pipeline
145/// builder.
146pub struct PipelineChildren {
147 children: Vec<Child>,
148}
149impl PipelineChildren {
150 pub fn wait(&mut self) -> Option<Vec<WaitStatus>> {
151 if self.children.len() == 0 {
152 return None;
153 }
154 self.children.reverse();
155 let mut wait_results = Vec::new();
156 while let Some(child) = self.children.pop() {
157 if let Ok(waitstatus) = child.wait() {
158 // child exit
159 wait_results.push(waitstatus);
160 } else {
161 // wait error
162 panic!("Child wait error");
163 }
164 }
165 Some(wait_results)
166 }
167 pub fn children(&self) -> &[Child] {
168 self.children.as_ref()
169 }
170}