process_iterator/
lib.rs

1#[macro_use]
2extern crate log;
3
4use std::io;
5use std::io::prelude::*;
6use std::io::{BufReader};
7use std::fs::File;
8use std::process::{ChildStdout, Command, Stdio, ExitStatus};
9use std::sync::Mutex;
10use std::ops::DerefMut;
11
12use std::os::unix::io::FromRawFd;
13use std::os::unix::io::AsRawFd;
14use std::sync::mpsc;
15
16
17pub enum Output {
18    Parent,
19    Ignore,
20    // only works on Unix, the most efficient
21    ToFd(File),
22    /* The below would require additional error handling mechanisms
23    // Write to this file in a background thread
24    ToPath(PathBuf),
25    // Check for any output in a background thread
26    // Useful to assert that stderr has no output
27    FailOnOutput,
28    */
29}
30
31/*
32pub fn output_to_path(path: &str) -> Output {
33    Output::ToPath(PathBuf::from(path))
34}
35*/
36
37pub struct DealWithOutput { stderr: Output, stdout: Output }
38
39pub fn output() -> DealWithOutput {
40    DealWithOutput { stdout: Output::Parent, stderr: Output::Parent }
41}
42
43impl DealWithOutput {
44    pub fn stderr(&mut self, stderr: Output) -> &mut Self {
45        self.stderr = stderr;
46        self
47    }
48
49    pub fn stdout(&mut self, stdout: Output) -> &mut Self {
50        self.stdout = stdout;
51        self
52    }
53}
54
55
56// Use a process as a consumer of a Read interface.
57//
58// Start a process for the given exe and args.
59// Handle stderr and stdout in a non-blocking way according to the given DealWithOutput
60//
61// Feed the input as stdin to the process and then wait for the process to finish
62// Return the result of the process.
63pub fn process_read_consumer<R: Read>(
64    deal_with: &mut DealWithOutput,
65    mut input: R,
66    cmd_args: (String, Vec<String>))
67    -> io::Result<ExitStatus>
68{
69    let mut cmd = build_command(cmd_args);
70
71    cmd.stdin(Stdio::piped());
72    setup_stderr(&deal_with.stderr, &mut cmd)?;
73    setup_stdout(&deal_with.stdout, &mut cmd)?;
74
75    let mut process = cmd.spawn()?;
76
77    // Introducing this scope will drop stdin
78    // That will close the handle so the process can terminate
79    {
80        let mut stdin = process.stdin.take().expect("impossible! no stdin");
81
82        // output_optional_handle(&deal_with.stderr, &mut process.stderr)?;
83        // output_optional_handle(&stdout, &mut process.stdout)?;
84        io::copy(&mut input, &mut stdin)?;
85    }
86
87    let status = process.wait()?;
88    Ok(status)
89}
90
91
92// Stream data through a process using readers
93//
94// Start a process for the given exe and args.
95// Return a buffer of stdout or the error encountered when starting the process
96//
97// Feed input (if given) to its stdin (in a separate thread)
98// If no stdin is given you will need to give None::<File>
99//
100// Wait for the exit code on a separate thread
101// Handle stderr in a non-blocking way according to the stderr option
102//
103// If any of the threads have failures, including if the process has a non-zero exit code,
104// that will be reflected in `ChildStream.wait`
105pub fn process_as_reader<R>(
106	stdin_opt: Option<R>,
107    stderr: Output,
108	cmd_args: (String, Vec<String>)) -> io::Result<ChildStream>
109    where R: Read + Send + 'static,
110{
111    let mut cmd = build_command(cmd_args);
112
113    // setup stdout
114    cmd.stdout(Stdio::piped());
115    if let Some(_) = stdin_opt {
116        cmd.stdin(Stdio::piped());
117    }
118
119    setup_stderr(&stderr, &mut cmd)?;
120
121    let mut process = cmd.spawn()?;
122    let stdout = process.stdout.take().expect("impossible! no stdout");
123
124    // output_optional_handle(&stderr, &mut process.stderr)?;
125
126    let (send_result, receiver) = mpsc::channel();
127    // feed input to stdin
128    if let Some(input) = stdin_opt {
129        let mut stdin = process.stdin.take().expect("impossible! no stdin");
130        let input_mutex = Mutex::new(input);
131        let sender = send_result.clone();
132        let done_stdin = move |result| {
133            match result {
134                Err(err) => {
135                    send_or_log_result(sender, Err(ProcessAsyncError::StdinError(err)))
136                }
137                Ok(Err(err)) => {
138                    send_or_log_result(sender, Err(ProcessAsyncError::StdinError(err)))
139                }
140                // process wait returns the Ok
141                Ok(Ok(_)) => {}
142            }
143        };
144        concurrent::spawn_catch_panic(done_stdin, move || {
145            // TODO: convert error types
146            let mut inp = input_mutex.lock().expect("error locking stdin");
147            let _ = io::copy(inp.deref_mut(), &mut stdin)?;
148            Ok(())
149        });
150    }
151
152    // wait for the process to exit successfully
153    let sender = send_result.clone();
154    let done_wait = move |result| {
155        match result {
156            Err(err) => {
157                send_or_log_result(sender,
158                    Err(ProcessAsyncError::WaitError(err)))
159            }
160            Ok(Err(err)) => {
161                send_or_log_result(sender,
162                    Err(ProcessAsyncError::WaitError(err)))
163            }
164            Ok(Ok(status)) => {
165                send_or_log_result(sender, Ok(status));
166            }
167        }
168    };
169    concurrent::spawn_catch_panic(done_wait, move || {
170        // assert that the process exits successfully
171        let status = process.wait()?;
172        Ok(status)
173    });
174
175    Ok(ChildStream {
176      stdout: BufReader::new(stdout),
177      wait_result: FutureExitResult::new(receiver)
178    })
179}
180
181
182#[derive(Debug)]
183pub enum ProcessAsyncError {
184    RecvError(mpsc::RecvError),
185    WaitError(io::Error),
186    StdinError(io::Error),
187    ExitStatusError(Option<i32>),
188    AlreadyResolvedError,
189}
190
191type ProecessAsyncResult = Result<ExitStatus, ProcessAsyncError>;
192
193pub struct FutureExitResult {
194    recv: mpsc::Receiver<ProecessAsyncResult>,
195    // This future should only be resolved once
196    already: bool,
197}
198
199impl FutureExitResult {
200   fn new(receiver: mpsc::Receiver<ProecessAsyncResult>) -> Self {
201      FutureExitResult { recv: receiver, already: false }
202   }
203
204   fn exit_status(&mut self) -> ProecessAsyncResult {
205       if self.already { return Err(ProcessAsyncError::AlreadyResolvedError) }
206       self.already = true;
207       match self.recv.recv() {
208           Err(err) => {
209               Err(ProcessAsyncError::RecvError(err))
210           }
211           Ok(stream_status) => {
212               stream_status
213           }
214       }
215   }
216
217   pub fn wait(&mut self) -> Result<Option<i32>, ProcessAsyncError> {
218        let status = self.exit_status()?;
219        if status.success() {
220            Ok(status.code())
221        } else {
222            Err(ProcessAsyncError::ExitStatusError(status.code()))
223        }
224   }
225}
226
227
228pub struct ChildStream {
229    pub stdout: BufReader<ChildStdout>,
230    pub wait_result: FutureExitResult
231}
232impl ChildStream {
233   // Wait for the result of the process.
234   // This should only be called once.
235   // The second time it will return AlreadyResolvedError
236   pub fn wait(&mut self) -> Result<Option<i32>, ProcessAsyncError> {
237       self.wait_result.wait()
238   }
239}
240
241
242fn send_or_log_result<T>(sender: mpsc::Sender<Result<T, ProcessAsyncError>>, result: Result<T, ProcessAsyncError>){
243    match sender.send(result) {
244        Ok(_) => {}
245        Err(err) => error!("error sending done message: {}", err),
246    }
247}
248
249
250fn setup_stderr(deal_with_stderr: &Output, cmd: &mut Command) -> io::Result<()> {
251    // setup stderr
252    match deal_with_stderr {
253      &Output::Parent => {}
254      &Output::Ignore => {
255          cmd.stderr(Stdio::null());
256      }
257      &Output::ToFd(ref file) => {
258          unsafe {
259            cmd.stderr(Stdio::from_raw_fd(file.as_raw_fd()));
260          }
261      }
262      /*
263      &Output::FailOnOutput => {
264          cmd.stderr(Stdio::piped());
265      }
266      &Output::ToPath(_) => {
267          cmd.stderr(Stdio::piped());
268      }
269      */
270    }
271    Ok(())
272}
273
274
275fn setup_stdout(deal_with_stdout: &Output, cmd: &mut Command) -> io::Result<()> {
276    // setup stderr
277    match deal_with_stdout {
278      &Output::Parent => {}
279      &Output::Ignore => {
280          cmd.stdout(Stdio::null());
281      }
282      &Output::ToFd(ref file) => {
283          unsafe {
284            cmd.stdout(Stdio::from_raw_fd(file.as_raw_fd()));
285          }
286      }
287      /*
288      &Output::FailOnOutput => {
289          cmd.stdout(Stdio::piped());
290      }
291      &Output::ToPath(_) => {
292          cmd.stdout(Stdio::piped());
293      }
294      */
295    }
296    Ok(())
297}
298
299
300// Build up the command from the arguments
301fn build_command(cmd_args: (String, Vec<String>)) -> Command {
302    // Build up the command from the arguments
303    let (exe, args) = cmd_args;
304    let mut cmd = Command::new(exe);
305    for arg in args { cmd.arg(arg); }
306    return cmd
307}
308
309
310/*
311fn output_optional_handle<R: Read + Send + 'static>(deal_with_output: &Output, opt_handle: &mut Option<R>) -> io::Result<()> {
312    if let &Output::FailOnOutput = deal_with_output {
313        let handle = opt_handle.take().expect("impossible! no output handle");
314        let _ = thread::spawn(move || {
315            let mut output = Vec::new();
316            let size = handle.take(1).read_to_end(&mut output).expect("error reading output");
317            if size > 0 {
318              panic!("got unexpected output");
319            }
320        });
321    } else {
322        if let &Output::ToPath(ref path) = deal_with_output {
323            let mut handle = opt_handle.take().expect("impossible! no output handle");
324            let mut file = File::create(path)?;
325            let _ = thread::spawn(move || {
326                io::copy(&mut handle, &mut file)
327                  .expect("error writing output to a file");
328            });
329        }
330    }
331
332    Ok(())
333}
334*/
335
336
337mod concurrent {
338    use std::io;
339    use std::io::{Error, ErrorKind};
340    use std::panic;
341    use std::thread;
342    use std::thread::JoinHandle;
343    use std::any::Any;
344
345
346    pub fn caught_panic_to_io_error(err: Box<Any + Send + 'static>) -> io::Error {
347        let msg = match err.downcast_ref::<&'static str>() {
348            Some(s) => *s,
349            None => {
350                match err.downcast_ref::<String>() {
351                    Some(s) => &s[..],
352                    None => "Box<Any>",
353                }
354            }
355        };
356
357        Error::new(ErrorKind::Other, msg)
358    }
359
360
361    // Spawn a thread and catch any panic unwinds.
362    // Call a callback function with the result
363    // where Err is the message of the panic and
364    // Ok is the return of the function
365    pub fn spawn_catch_panic<Function, Returned, Finished>(done: Finished, f: Function) -> JoinHandle<()>
366        where Function: FnOnce() -> Returned,
367              Function: Send + 'static,
368              Function: panic::UnwindSafe,
369              Finished: FnOnce(io::Result<Returned>) -> (),
370              Finished: Send + 'static,
371    {
372        thread::spawn(move || {
373            let result = panic::catch_unwind(move || { f() });
374            match result {
375                Err(err) => { done(Err(caught_panic_to_io_error(err))) }
376                Ok(ok)   => { done(Ok(ok)) }
377            }
378        })
379    }
380}