subprocess_communicate/
unix.rs

1#![cfg(unix)]
2
3extern crate mio;
4use std::mem;
5use mio::*;
6use std::io;
7use std::process;
8use std::cmp;
9
10use mio::deprecated::{TryRead, TryWrite};
11use mio::deprecated::{PipeReader, PipeWriter};
12#[allow(unused_imports)]
13use std::process::{Command, Stdio, Child};
14
15
16struct SubprocessClient {
17    stdin: Option<PipeWriter>,
18    stdout: Option<PipeReader>,
19    stderr: Option<PipeReader>,
20    stdin_token : Token,
21    stdout_token : Token,
22    stderr_token : Token,
23    output : Vec<u8>,
24    output_stderr : Vec<u8>,
25    input : Vec<u8>,
26    input_offset : usize,
27    buf : [u8; 65536],
28    stdout_bound : Option<usize>,
29    stderr_bound : Option<usize>,
30    return_on_stdout_fill : bool,
31    has_shutdown : bool,
32    child_shutdown : bool,
33}
34
35
36impl SubprocessClient {
37    fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8],
38           stdout_bound : Option<usize>, stderr_bound : Option<usize>,
39           return_on_stdout_fill : bool) -> SubprocessClient {
40        SubprocessClient {
41            stdin: stdin,
42            stdout: stdout,
43            stderr: stderr,
44            stdin_token : Token(0),
45            stdout_token : Token(1),
46            stderr_token : Token(2),
47            output : Vec::<u8>::new(),
48            output_stderr : Vec::<u8>::new(),
49            buf : [0; 65536],
50            input : data.to_vec(),
51            input_offset : 0,
52            stdout_bound : stdout_bound,
53            stderr_bound : stderr_bound,
54            return_on_stdout_fill : return_on_stdout_fill,
55            has_shutdown : false,
56            child_shutdown : false,
57        }
58    }
59
60    fn readable(&mut self, poll: &mut Poll) -> io::Result<()> {
61        if self.has_shutdown {
62            return Ok(());
63        }
64        let mut eof = false;
65        let mut buf_bound : usize = cmp::min(self.stdout_bound.unwrap_or(self.buf.len()), self.buf.len());
66        if buf_bound == 0 {
67            buf_bound = self.buf.len(); // if we ran out of space and our socket is readable, throw result out
68        }
69        match self.stdout {
70            None => unreachable!(),
71            Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..buf_bound]) {
72                Ok(Some(r)) => {
73                    if r == 0 {
74                        eof = true;
75                    } else {
76                        let do_extend : bool;
77                        match self.stdout_bound {
78                            None => do_extend = true,
79                            Some(ref mut bound) => {
80                               if *bound >= r {
81                                   *bound = *bound - r;
82                                  do_extend = true;
83                               } else {
84                                  *bound = 0;
85                                  do_extend = false;
86                                  if self.return_on_stdout_fill || self.stderr.is_none() || self.stderr_bound.unwrap_or(1) == 0 {
87                                      match self.stderr {
88                                          Some(ref sub_stderr) =>
89                                              match poll.deregister(sub_stderr){
90                                                Err(e) => return Err(e),
91                                                _ => {},
92                                          },
93                                          _ => {},
94                                      }
95                                      drop(self.stderr.take());
96                                      eof = true;
97                                  }
98                               }
99                            },
100                        }
101                        if do_extend {
102                            self.output.extend(&self.buf[0..r]);
103                        }
104                    }
105                },
106                Ok(None) => {},
107                Err(e) => {
108                    return Err(e);
109                }
110            }
111        };
112        if eof {
113            match self.stdout {
114               Some(ref sub_stdout) =>
115                   match poll.deregister(sub_stdout) {
116                      Err(e) => return Err(e),
117                      _ => {},
118                   },
119               _ => {},
120            }
121            drop(self.stdout.take());
122            if self.stderr.is_none() {
123                self.has_shutdown = true;
124                self.child_shutdown = true;
125            }
126        }
127        return Ok(());
128    }
129
130    fn readable_stderr(&mut self, poll: &mut Poll) -> io::Result<()> {
131        if self.has_shutdown {
132            return Ok(());
133        }
134
135        let mut eof = false;
136        let mut buf_bound : usize = cmp::min(self.stderr_bound.unwrap_or(self.buf.len()), self.buf.len());
137        if buf_bound == 0 {
138            buf_bound = self.buf.len(); // if we ran out of space and our socket is readable, throw result out
139        }
140        match self.stderr {
141            None => unreachable!(),
142            Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..buf_bound]) {
143                Ok(None) => {
144                }
145                Ok(Some(r)) => {
146                    if r == 0 {
147                        eof = true;
148                    } else {
149                        let do_extend : bool;
150                        match self.stderr_bound {
151                            None => do_extend = true,
152                            Some(ref mut bound) => {
153                               if *bound >= r {
154                                  *bound = *bound - r;
155                                  do_extend = true;
156                               } else {
157                                  *bound = 0;
158                                  do_extend = false;
159                                  if self.stdout.is_none() || self.stdout_bound.unwrap_or(1) == 0 {
160                                      match self.stdout {
161                                          Some(ref sub_stdout) =>
162                                              match poll.deregister(sub_stdout){
163                                                  Err(e) => return Err(e),
164                                                  _ => {},
165                                              },
166                                          _ => {},
167                                      }
168                                      drop(self.stdout.take()); // in case stdout had overrun bound
169                                      eof = true;
170                                  }
171                               }
172                            },
173                        }
174                        if do_extend {
175                            self.output_stderr.extend(&self.buf[0..r]);
176                        }
177                    }
178                }
179                Err(e) => {
180                    return Err(e);
181                }
182            }
183        };
184        if eof {
185            match self.stderr {
186               Some(ref sub_stderr) =>
187                   match poll.deregister(sub_stderr){
188                       Err(e) => return Err(e),
189                       _ => {},
190                   },
191               _ => {},
192            }
193            drop(self.stderr.take());
194            if self.stdout.is_none() {
195                self.has_shutdown = true;
196                self.child_shutdown = true;
197            }
198        }
199        return Ok(());
200    }
201
202    fn writable(&mut self, poll: &mut Poll) -> io::Result<()> {
203        if self.has_shutdown {
204            return Ok(());
205        }
206
207        let mut ok = true;
208        match self.stdin {
209            None => unreachable!(),
210            Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
211                Ok(None) => {
212                },
213                Ok(Some(r)) => {
214                    if r == 0 {
215                        ok = false;
216                    } else {
217                        self.input_offset += r;
218                    }
219                },
220                Err(_e) => {
221                    ok = false;
222                },
223            }
224        }
225        if self.input_offset == self.input.len() || !ok {
226            match self.stdin {
227                Some(ref sub_stdin) =>
228                    match poll.deregister(sub_stdin) {
229                       Err(e) => return Err(e),
230                       _ => {},
231                    },
232                _ => {},
233            }
234            drop(self.stdin.take());
235            match self.stderr {
236                None => match self.stdout {
237                            None => {
238                                self.has_shutdown = true;
239                                self.child_shutdown = true
240                            },
241                            Some(_) => {},
242                },
243                Some(_) => {},
244            }
245        }
246        return Ok(());
247    }
248
249    fn ready(&mut self, poll: &mut Poll, token: Token,
250             _events: Ready) {
251        if token == self.stderr_token {
252            let _x = self.readable_stderr(poll);
253        } else {
254            let _x = self.readable(poll);
255        }
256        if token == self.stdin_token {
257            let _y = self.writable(poll);
258        }
259    }
260}
261
262pub fn from_stdin(mut stdin: Option<process::ChildStdin>) -> io::Result<Option<PipeWriter> > {
263    match stdin {
264      None => return Ok(None),
265      Some(_) => {},
266    }
267    Ok(Some(PipeWriter::from_stdin(stdin.take().unwrap()).unwrap()))
268}
269
270pub fn from_stdout(mut stdout: Option<process::ChildStdout>) -> io::Result<Option<PipeReader> > {
271    match stdout {
272      None => return Ok(None),
273      Some(_) => {},
274    }
275    Ok(Some(PipeReader::from_stdout(stdout.take().unwrap()).unwrap()))
276}
277
278
279pub fn from_stderr(mut stderr: Option<process::ChildStderr>) -> io::Result<Option<PipeReader> > {
280    match stderr {
281      None => return Ok(None),
282      Some(_) => {},
283    }
284    Ok(Some(PipeReader::from_stderr(stderr.take().unwrap()).unwrap()))
285}
286
287/// Sends input to process and returns stdout and stderr
288/// up until stdout_bound or stderr_bound are reached
289/// If stdout_bound is reached and return_on_stdout_fill is true,
290/// the rest of stderr will not be awaited
291///
292/// Conversely, if stdout_bound is reached and return_on_stderr_fill is false
293/// Then if insufficient stderr is produced and that file descriptor is not closed by
294/// the callee, then the subprocess_communicate will hang until the child produces
295/// up to at least the stderr_bound or closes the stderr file descriptor
296/// This function may return errors if the stdin, stdout or stderr are unable to be set into nonblocking
297/// or if the event loop is unable to be created, otherwise the last return value will be Ok(())
298pub fn subprocess_communicate(process : &mut Child,
299                              input : &[u8],
300                              stdout_bound : Option<usize>,
301                              stderr_bound : Option<usize>,
302                              return_on_stdout_fill : bool) -> (Vec<u8>, Vec<u8>, io::Result<()>) {
303    let stdin : Option<PipeWriter>;
304    match from_stdin(process.stdin.take()) {
305        Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
306        Ok(pipe) => stdin = pipe,
307    }
308
309    let stdout : Option<PipeReader>;
310    match from_stdout(process.stdout.take()) {
311        Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
312        Ok(pipe) => stdout = pipe,
313    }
314
315    let stderr : Option<PipeReader>;
316    match from_stderr(process.stderr.take()) {
317        Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
318        Ok(pipe) => stderr = pipe,
319    }
320
321
322    let mut subprocess = SubprocessClient::new(stdin,
323                                               stdout,
324                                               stderr,
325                                               input,
326                                               stdout_bound,
327                                               stderr_bound,
328                                               return_on_stdout_fill);
329    let mut poll = Poll::new().unwrap();
330    match subprocess.stdout {
331       Some(ref sub_stdout) =>
332          match poll.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
333                                                   PollOpt::level()) {
334            Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
335            Ok(_) =>{},
336          },
337       None => {},
338    }
339
340    match subprocess.stderr {
341        Some(ref sub_stderr) => match poll.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
342                        PollOpt::level()) {
343           Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
344           Ok(_) => {},
345        },
346        None => {},
347    }
348
349    // Connect to the server
350    match subprocess.stdin {
351        Some (ref sub_stdin) => match poll.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
352                        PollOpt::level()) {
353           Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
354           Ok(_) => {},
355         },
356         None => {},
357    }
358    let mut events = Events::with_capacity(1024);
359    while !subprocess.child_shutdown {
360        poll.poll(&mut events, None).unwrap();
361        for event in events.iter() {
362            subprocess.ready(&mut poll, event.token(), event.kind())
363        }
364    }
365
366    let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
367    let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
368
369    return (ret_stdout, ret_stderr, Ok(()));
370}
371
372#[allow(dead_code)]
373const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
374
375#[test]
376fn test_subprocess_pipe() {
377    let mut process =
378           Command::new("/bin/cat")
379           .stdin(Stdio::piped())
380           .stdout(Stdio::piped())
381           .stderr(Stdio::piped())
382           .spawn().unwrap();
383     let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], None, None, true);
384     process.wait().unwrap();
385     err.unwrap();
386     assert_eq!(TEST_DATA.len(), ret_stdout.len());
387     assert_eq!(0usize, ret_stderr.len());
388     let mut i : usize = 0;
389     for item in TEST_DATA.iter() {
390         assert_eq!(*item, ret_stdout[i]);
391         i += 1;
392     }
393}
394
395
396#[test]
397fn test_subprocess_bounded_pipe() {
398    let mut process =
399           Command::new("/bin/cat")
400           .stdin(Stdio::piped())
401           .stdout(Stdio::piped())
402           .stderr(Stdio::piped())
403           .spawn().unwrap();
404     let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(TEST_DATA.len() - 1), None, true);
405     process.wait().unwrap();
406     err.unwrap();
407     assert_eq!(TEST_DATA.len() - 1, ret_stdout.len());
408     assert_eq!(0usize, ret_stderr.len());
409     let mut i : usize = 0;
410     for item in TEST_DATA[0..TEST_DATA.len() - 1].iter() {
411         assert_eq!(*item, ret_stdout[i]);
412         i += 1;
413     }
414}
415
416#[test]
417fn test_subprocess_bounded_yes_stderr0() {
418    let mut process =
419           Command::new("/usr/bin/yes")
420           .stdin(Stdio::piped())
421           .stdout(Stdio::piped())
422           .stderr(Stdio::piped())
423           .spawn().unwrap();
424     let bound : usize = 130000;
425     let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(bound), Some(0), false);
426     err.unwrap();
427     assert_eq!(bound, ret_stdout.len());
428     assert_eq!(0usize, ret_stderr.len());
429     let mut i : usize = 0;
430     for item in ret_stdout.iter() {
431         let val : u8;
432         if (i & 1) == 1 {
433             val = '\n' as u8;
434         } else {
435             val = 'y' as u8;
436         }
437         assert_eq!(*item, val);
438         i += 1;
439     }
440}
441
442#[test]
443fn test_subprocess_bounded_yes() {
444    let mut process =
445           Command::new("/usr/bin/yes")
446           .stdin(Stdio::piped())
447           .stdout(Stdio::piped())
448           .stderr(Stdio::piped())
449           .spawn().unwrap();
450     let bound : usize = 130000;
451     let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(bound), Some(bound), true);
452     err.unwrap();
453     assert_eq!(bound, ret_stdout.len());
454     assert_eq!(0usize, ret_stderr.len());
455     let mut i : usize = 0;
456     for item in ret_stdout.iter() {
457         let val : u8;
458         if (i & 1) == 1 {
459             val = '\n' as u8;
460         } else {
461             val = 'y' as u8;
462         }
463         assert_eq!(*item, val);
464         i += 1;
465     }
466}
467
468
469#[test]
470fn test_subprocess_bounded_yes_no_stderr() {
471    let mut process =
472           Command::new("/usr/bin/yes")
473           .stdin(Stdio::piped())
474           .stdout(Stdio::piped())
475           .spawn().unwrap();
476     let bound : usize = 130000;
477     let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(bound), None, false);
478     err.unwrap();
479     assert_eq!(bound, ret_stdout.len());
480     assert_eq!(0usize, ret_stderr.len());
481     let mut i : usize = 0;
482     for item in ret_stdout.iter() {
483         let val : u8;
484         if (i & 1) == 1 {
485             val = '\n' as u8;
486         } else {
487             val = 'y' as u8;
488         }
489         assert_eq!(*item, val);
490         i += 1;
491     }
492}