1#[macro_use]
2extern crate log;
3
4use std::io;
5use std::io::prelude::*;
6use std::io::{BufReader};
7use std::fs::File;
8use std::process::{ChildStdout, Command, Stdio, ExitStatus};
9use std::sync::Mutex;
10use std::ops::DerefMut;
11
12use std::os::unix::io::FromRawFd;
13use std::os::unix::io::AsRawFd;
14use std::sync::mpsc;
15
16
17pub enum Output {
18 Parent,
19 Ignore,
20 ToFd(File),
22 }
30
31pub struct DealWithOutput { stderr: Output, stdout: Output }
38
39pub fn output() -> DealWithOutput {
40 DealWithOutput { stdout: Output::Parent, stderr: Output::Parent }
41}
42
43impl DealWithOutput {
44 pub fn stderr(&mut self, stderr: Output) -> &mut Self {
45 self.stderr = stderr;
46 self
47 }
48
49 pub fn stdout(&mut self, stdout: Output) -> &mut Self {
50 self.stdout = stdout;
51 self
52 }
53}
54
55
56pub fn process_read_consumer<R: Read>(
64 deal_with: &mut DealWithOutput,
65 mut input: R,
66 cmd_args: (String, Vec<String>))
67 -> io::Result<ExitStatus>
68{
69 let mut cmd = build_command(cmd_args);
70
71 cmd.stdin(Stdio::piped());
72 setup_stderr(&deal_with.stderr, &mut cmd)?;
73 setup_stdout(&deal_with.stdout, &mut cmd)?;
74
75 let mut process = cmd.spawn()?;
76
77 {
80 let mut stdin = process.stdin.take().expect("impossible! no stdin");
81
82 io::copy(&mut input, &mut stdin)?;
85 }
86
87 let status = process.wait()?;
88 Ok(status)
89}
90
91
92pub fn process_as_reader<R>(
106 stdin_opt: Option<R>,
107 stderr: Output,
108 cmd_args: (String, Vec<String>)) -> io::Result<ChildStream>
109 where R: Read + Send + 'static,
110{
111 let mut cmd = build_command(cmd_args);
112
113 cmd.stdout(Stdio::piped());
115 if let Some(_) = stdin_opt {
116 cmd.stdin(Stdio::piped());
117 }
118
119 setup_stderr(&stderr, &mut cmd)?;
120
121 let mut process = cmd.spawn()?;
122 let stdout = process.stdout.take().expect("impossible! no stdout");
123
124 let (send_result, receiver) = mpsc::channel();
127 if let Some(input) = stdin_opt {
129 let mut stdin = process.stdin.take().expect("impossible! no stdin");
130 let input_mutex = Mutex::new(input);
131 let sender = send_result.clone();
132 let done_stdin = move |result| {
133 match result {
134 Err(err) => {
135 send_or_log_result(sender, Err(ProcessAsyncError::StdinError(err)))
136 }
137 Ok(Err(err)) => {
138 send_or_log_result(sender, Err(ProcessAsyncError::StdinError(err)))
139 }
140 Ok(Ok(_)) => {}
142 }
143 };
144 concurrent::spawn_catch_panic(done_stdin, move || {
145 let mut inp = input_mutex.lock().expect("error locking stdin");
147 let _ = io::copy(inp.deref_mut(), &mut stdin)?;
148 Ok(())
149 });
150 }
151
152 let sender = send_result.clone();
154 let done_wait = move |result| {
155 match result {
156 Err(err) => {
157 send_or_log_result(sender,
158 Err(ProcessAsyncError::WaitError(err)))
159 }
160 Ok(Err(err)) => {
161 send_or_log_result(sender,
162 Err(ProcessAsyncError::WaitError(err)))
163 }
164 Ok(Ok(status)) => {
165 send_or_log_result(sender, Ok(status));
166 }
167 }
168 };
169 concurrent::spawn_catch_panic(done_wait, move || {
170 let status = process.wait()?;
172 Ok(status)
173 });
174
175 Ok(ChildStream {
176 stdout: BufReader::new(stdout),
177 wait_result: FutureExitResult::new(receiver)
178 })
179}
180
181
182#[derive(Debug)]
183pub enum ProcessAsyncError {
184 RecvError(mpsc::RecvError),
185 WaitError(io::Error),
186 StdinError(io::Error),
187 ExitStatusError(Option<i32>),
188 AlreadyResolvedError,
189}
190
191type ProecessAsyncResult = Result<ExitStatus, ProcessAsyncError>;
192
193pub struct FutureExitResult {
194 recv: mpsc::Receiver<ProecessAsyncResult>,
195 already: bool,
197}
198
199impl FutureExitResult {
200 fn new(receiver: mpsc::Receiver<ProecessAsyncResult>) -> Self {
201 FutureExitResult { recv: receiver, already: false }
202 }
203
204 fn exit_status(&mut self) -> ProecessAsyncResult {
205 if self.already { return Err(ProcessAsyncError::AlreadyResolvedError) }
206 self.already = true;
207 match self.recv.recv() {
208 Err(err) => {
209 Err(ProcessAsyncError::RecvError(err))
210 }
211 Ok(stream_status) => {
212 stream_status
213 }
214 }
215 }
216
217 pub fn wait(&mut self) -> Result<Option<i32>, ProcessAsyncError> {
218 let status = self.exit_status()?;
219 if status.success() {
220 Ok(status.code())
221 } else {
222 Err(ProcessAsyncError::ExitStatusError(status.code()))
223 }
224 }
225}
226
227
228pub struct ChildStream {
229 pub stdout: BufReader<ChildStdout>,
230 pub wait_result: FutureExitResult
231}
232impl ChildStream {
233 pub fn wait(&mut self) -> Result<Option<i32>, ProcessAsyncError> {
237 self.wait_result.wait()
238 }
239}
240
241
242fn send_or_log_result<T>(sender: mpsc::Sender<Result<T, ProcessAsyncError>>, result: Result<T, ProcessAsyncError>){
243 match sender.send(result) {
244 Ok(_) => {}
245 Err(err) => error!("error sending done message: {}", err),
246 }
247}
248
249
250fn setup_stderr(deal_with_stderr: &Output, cmd: &mut Command) -> io::Result<()> {
251 match deal_with_stderr {
253 &Output::Parent => {}
254 &Output::Ignore => {
255 cmd.stderr(Stdio::null());
256 }
257 &Output::ToFd(ref file) => {
258 unsafe {
259 cmd.stderr(Stdio::from_raw_fd(file.as_raw_fd()));
260 }
261 }
262 }
271 Ok(())
272}
273
274
275fn setup_stdout(deal_with_stdout: &Output, cmd: &mut Command) -> io::Result<()> {
276 match deal_with_stdout {
278 &Output::Parent => {}
279 &Output::Ignore => {
280 cmd.stdout(Stdio::null());
281 }
282 &Output::ToFd(ref file) => {
283 unsafe {
284 cmd.stdout(Stdio::from_raw_fd(file.as_raw_fd()));
285 }
286 }
287 }
296 Ok(())
297}
298
299
300fn build_command(cmd_args: (String, Vec<String>)) -> Command {
302 let (exe, args) = cmd_args;
304 let mut cmd = Command::new(exe);
305 for arg in args { cmd.arg(arg); }
306 return cmd
307}
308
309
310mod concurrent {
338 use std::io;
339 use std::io::{Error, ErrorKind};
340 use std::panic;
341 use std::thread;
342 use std::thread::JoinHandle;
343 use std::any::Any;
344
345
346 pub fn caught_panic_to_io_error(err: Box<Any + Send + 'static>) -> io::Error {
347 let msg = match err.downcast_ref::<&'static str>() {
348 Some(s) => *s,
349 None => {
350 match err.downcast_ref::<String>() {
351 Some(s) => &s[..],
352 None => "Box<Any>",
353 }
354 }
355 };
356
357 Error::new(ErrorKind::Other, msg)
358 }
359
360
361 pub fn spawn_catch_panic<Function, Returned, Finished>(done: Finished, f: Function) -> JoinHandle<()>
366 where Function: FnOnce() -> Returned,
367 Function: Send + 'static,
368 Function: panic::UnwindSafe,
369 Finished: FnOnce(io::Result<Returned>) -> (),
370 Finished: Send + 'static,
371 {
372 thread::spawn(move || {
373 let result = panic::catch_unwind(move || { f() });
374 match result {
375 Err(err) => { done(Err(caught_panic_to_io_error(err))) }
376 Ok(ok) => { done(Ok(ok)) }
377 }
378 })
379 }
380}