1#![cfg(unix)]
2
3extern crate mio;
4use std::mem;
5use mio::*;
6use std::io;
7use std::process;
8use std::cmp;
9
10use mio::deprecated::{TryRead, TryWrite};
11use mio::deprecated::{PipeReader, PipeWriter};
12#[allow(unused_imports)]
13use std::process::{Command, Stdio, Child};
14
15
16struct SubprocessClient {
17 stdin: Option<PipeWriter>,
18 stdout: Option<PipeReader>,
19 stderr: Option<PipeReader>,
20 stdin_token : Token,
21 stdout_token : Token,
22 stderr_token : Token,
23 output : Vec<u8>,
24 output_stderr : Vec<u8>,
25 input : Vec<u8>,
26 input_offset : usize,
27 buf : [u8; 65536],
28 stdout_bound : Option<usize>,
29 stderr_bound : Option<usize>,
30 return_on_stdout_fill : bool,
31 has_shutdown : bool,
32 child_shutdown : bool,
33}
34
35
36impl SubprocessClient {
37 fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8],
38 stdout_bound : Option<usize>, stderr_bound : Option<usize>,
39 return_on_stdout_fill : bool) -> SubprocessClient {
40 SubprocessClient {
41 stdin: stdin,
42 stdout: stdout,
43 stderr: stderr,
44 stdin_token : Token(0),
45 stdout_token : Token(1),
46 stderr_token : Token(2),
47 output : Vec::<u8>::new(),
48 output_stderr : Vec::<u8>::new(),
49 buf : [0; 65536],
50 input : data.to_vec(),
51 input_offset : 0,
52 stdout_bound : stdout_bound,
53 stderr_bound : stderr_bound,
54 return_on_stdout_fill : return_on_stdout_fill,
55 has_shutdown : false,
56 child_shutdown : false,
57 }
58 }
59
60 fn readable(&mut self, poll: &mut Poll) -> io::Result<()> {
61 if self.has_shutdown {
62 return Ok(());
63 }
64 let mut eof = false;
65 let mut buf_bound : usize = cmp::min(self.stdout_bound.unwrap_or(self.buf.len()), self.buf.len());
66 if buf_bound == 0 {
67 buf_bound = self.buf.len(); }
69 match self.stdout {
70 None => unreachable!(),
71 Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..buf_bound]) {
72 Ok(Some(r)) => {
73 if r == 0 {
74 eof = true;
75 } else {
76 let do_extend : bool;
77 match self.stdout_bound {
78 None => do_extend = true,
79 Some(ref mut bound) => {
80 if *bound >= r {
81 *bound = *bound - r;
82 do_extend = true;
83 } else {
84 *bound = 0;
85 do_extend = false;
86 if self.return_on_stdout_fill || self.stderr.is_none() || self.stderr_bound.unwrap_or(1) == 0 {
87 match self.stderr {
88 Some(ref sub_stderr) =>
89 match poll.deregister(sub_stderr){
90 Err(e) => return Err(e),
91 _ => {},
92 },
93 _ => {},
94 }
95 drop(self.stderr.take());
96 eof = true;
97 }
98 }
99 },
100 }
101 if do_extend {
102 self.output.extend(&self.buf[0..r]);
103 }
104 }
105 },
106 Ok(None) => {},
107 Err(e) => {
108 return Err(e);
109 }
110 }
111 };
112 if eof {
113 match self.stdout {
114 Some(ref sub_stdout) =>
115 match poll.deregister(sub_stdout) {
116 Err(e) => return Err(e),
117 _ => {},
118 },
119 _ => {},
120 }
121 drop(self.stdout.take());
122 if self.stderr.is_none() {
123 self.has_shutdown = true;
124 self.child_shutdown = true;
125 }
126 }
127 return Ok(());
128 }
129
130 fn readable_stderr(&mut self, poll: &mut Poll) -> io::Result<()> {
131 if self.has_shutdown {
132 return Ok(());
133 }
134
135 let mut eof = false;
136 let mut buf_bound : usize = cmp::min(self.stderr_bound.unwrap_or(self.buf.len()), self.buf.len());
137 if buf_bound == 0 {
138 buf_bound = self.buf.len(); }
140 match self.stderr {
141 None => unreachable!(),
142 Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..buf_bound]) {
143 Ok(None) => {
144 }
145 Ok(Some(r)) => {
146 if r == 0 {
147 eof = true;
148 } else {
149 let do_extend : bool;
150 match self.stderr_bound {
151 None => do_extend = true,
152 Some(ref mut bound) => {
153 if *bound >= r {
154 *bound = *bound - r;
155 do_extend = true;
156 } else {
157 *bound = 0;
158 do_extend = false;
159 if self.stdout.is_none() || self.stdout_bound.unwrap_or(1) == 0 {
160 match self.stdout {
161 Some(ref sub_stdout) =>
162 match poll.deregister(sub_stdout){
163 Err(e) => return Err(e),
164 _ => {},
165 },
166 _ => {},
167 }
168 drop(self.stdout.take()); eof = true;
170 }
171 }
172 },
173 }
174 if do_extend {
175 self.output_stderr.extend(&self.buf[0..r]);
176 }
177 }
178 }
179 Err(e) => {
180 return Err(e);
181 }
182 }
183 };
184 if eof {
185 match self.stderr {
186 Some(ref sub_stderr) =>
187 match poll.deregister(sub_stderr){
188 Err(e) => return Err(e),
189 _ => {},
190 },
191 _ => {},
192 }
193 drop(self.stderr.take());
194 if self.stdout.is_none() {
195 self.has_shutdown = true;
196 self.child_shutdown = true;
197 }
198 }
199 return Ok(());
200 }
201
202 fn writable(&mut self, poll: &mut Poll) -> io::Result<()> {
203 if self.has_shutdown {
204 return Ok(());
205 }
206
207 let mut ok = true;
208 match self.stdin {
209 None => unreachable!(),
210 Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
211 Ok(None) => {
212 },
213 Ok(Some(r)) => {
214 if r == 0 {
215 ok = false;
216 } else {
217 self.input_offset += r;
218 }
219 },
220 Err(_e) => {
221 ok = false;
222 },
223 }
224 }
225 if self.input_offset == self.input.len() || !ok {
226 match self.stdin {
227 Some(ref sub_stdin) =>
228 match poll.deregister(sub_stdin) {
229 Err(e) => return Err(e),
230 _ => {},
231 },
232 _ => {},
233 }
234 drop(self.stdin.take());
235 match self.stderr {
236 None => match self.stdout {
237 None => {
238 self.has_shutdown = true;
239 self.child_shutdown = true
240 },
241 Some(_) => {},
242 },
243 Some(_) => {},
244 }
245 }
246 return Ok(());
247 }
248
249 fn ready(&mut self, poll: &mut Poll, token: Token,
250 _events: Ready) {
251 if token == self.stderr_token {
252 let _x = self.readable_stderr(poll);
253 } else {
254 let _x = self.readable(poll);
255 }
256 if token == self.stdin_token {
257 let _y = self.writable(poll);
258 }
259 }
260}
261
262pub fn from_stdin(mut stdin: Option<process::ChildStdin>) -> io::Result<Option<PipeWriter> > {
263 match stdin {
264 None => return Ok(None),
265 Some(_) => {},
266 }
267 Ok(Some(PipeWriter::from_stdin(stdin.take().unwrap()).unwrap()))
268}
269
270pub fn from_stdout(mut stdout: Option<process::ChildStdout>) -> io::Result<Option<PipeReader> > {
271 match stdout {
272 None => return Ok(None),
273 Some(_) => {},
274 }
275 Ok(Some(PipeReader::from_stdout(stdout.take().unwrap()).unwrap()))
276}
277
278
279pub fn from_stderr(mut stderr: Option<process::ChildStderr>) -> io::Result<Option<PipeReader> > {
280 match stderr {
281 None => return Ok(None),
282 Some(_) => {},
283 }
284 Ok(Some(PipeReader::from_stderr(stderr.take().unwrap()).unwrap()))
285}
286
287pub fn subprocess_communicate(process : &mut Child,
299 input : &[u8],
300 stdout_bound : Option<usize>,
301 stderr_bound : Option<usize>,
302 return_on_stdout_fill : bool) -> (Vec<u8>, Vec<u8>, io::Result<()>) {
303 let stdin : Option<PipeWriter>;
304 match from_stdin(process.stdin.take()) {
305 Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
306 Ok(pipe) => stdin = pipe,
307 }
308
309 let stdout : Option<PipeReader>;
310 match from_stdout(process.stdout.take()) {
311 Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
312 Ok(pipe) => stdout = pipe,
313 }
314
315 let stderr : Option<PipeReader>;
316 match from_stderr(process.stderr.take()) {
317 Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
318 Ok(pipe) => stderr = pipe,
319 }
320
321
322 let mut subprocess = SubprocessClient::new(stdin,
323 stdout,
324 stderr,
325 input,
326 stdout_bound,
327 stderr_bound,
328 return_on_stdout_fill);
329 let mut poll = Poll::new().unwrap();
330 match subprocess.stdout {
331 Some(ref sub_stdout) =>
332 match poll.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
333 PollOpt::level()) {
334 Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
335 Ok(_) =>{},
336 },
337 None => {},
338 }
339
340 match subprocess.stderr {
341 Some(ref sub_stderr) => match poll.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
342 PollOpt::level()) {
343 Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
344 Ok(_) => {},
345 },
346 None => {},
347 }
348
349 match subprocess.stdin {
351 Some (ref sub_stdin) => match poll.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
352 PollOpt::level()) {
353 Err(e) => return (Vec::<u8>::new(), Vec::<u8>::new(), Err(e)),
354 Ok(_) => {},
355 },
356 None => {},
357 }
358 let mut events = Events::with_capacity(1024);
359 while !subprocess.child_shutdown {
360 poll.poll(&mut events, None).unwrap();
361 for event in events.iter() {
362 subprocess.ready(&mut poll, event.token(), event.kind())
363 }
364 }
365
366 let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
367 let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
368
369 return (ret_stdout, ret_stderr, Ok(()));
370}
371
372#[allow(dead_code)]
373const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
374
375#[test]
376fn test_subprocess_pipe() {
377 let mut process =
378 Command::new("/bin/cat")
379 .stdin(Stdio::piped())
380 .stdout(Stdio::piped())
381 .stderr(Stdio::piped())
382 .spawn().unwrap();
383 let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], None, None, true);
384 process.wait().unwrap();
385 err.unwrap();
386 assert_eq!(TEST_DATA.len(), ret_stdout.len());
387 assert_eq!(0usize, ret_stderr.len());
388 let mut i : usize = 0;
389 for item in TEST_DATA.iter() {
390 assert_eq!(*item, ret_stdout[i]);
391 i += 1;
392 }
393}
394
395
396#[test]
397fn test_subprocess_bounded_pipe() {
398 let mut process =
399 Command::new("/bin/cat")
400 .stdin(Stdio::piped())
401 .stdout(Stdio::piped())
402 .stderr(Stdio::piped())
403 .spawn().unwrap();
404 let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(TEST_DATA.len() - 1), None, true);
405 process.wait().unwrap();
406 err.unwrap();
407 assert_eq!(TEST_DATA.len() - 1, ret_stdout.len());
408 assert_eq!(0usize, ret_stderr.len());
409 let mut i : usize = 0;
410 for item in TEST_DATA[0..TEST_DATA.len() - 1].iter() {
411 assert_eq!(*item, ret_stdout[i]);
412 i += 1;
413 }
414}
415
416#[test]
417fn test_subprocess_bounded_yes_stderr0() {
418 let mut process =
419 Command::new("/usr/bin/yes")
420 .stdin(Stdio::piped())
421 .stdout(Stdio::piped())
422 .stderr(Stdio::piped())
423 .spawn().unwrap();
424 let bound : usize = 130000;
425 let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(bound), Some(0), false);
426 err.unwrap();
427 assert_eq!(bound, ret_stdout.len());
428 assert_eq!(0usize, ret_stderr.len());
429 let mut i : usize = 0;
430 for item in ret_stdout.iter() {
431 let val : u8;
432 if (i & 1) == 1 {
433 val = '\n' as u8;
434 } else {
435 val = 'y' as u8;
436 }
437 assert_eq!(*item, val);
438 i += 1;
439 }
440}
441
442#[test]
443fn test_subprocess_bounded_yes() {
444 let mut process =
445 Command::new("/usr/bin/yes")
446 .stdin(Stdio::piped())
447 .stdout(Stdio::piped())
448 .stderr(Stdio::piped())
449 .spawn().unwrap();
450 let bound : usize = 130000;
451 let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(bound), Some(bound), true);
452 err.unwrap();
453 assert_eq!(bound, ret_stdout.len());
454 assert_eq!(0usize, ret_stderr.len());
455 let mut i : usize = 0;
456 for item in ret_stdout.iter() {
457 let val : u8;
458 if (i & 1) == 1 {
459 val = '\n' as u8;
460 } else {
461 val = 'y' as u8;
462 }
463 assert_eq!(*item, val);
464 i += 1;
465 }
466}
467
468
469#[test]
470fn test_subprocess_bounded_yes_no_stderr() {
471 let mut process =
472 Command::new("/usr/bin/yes")
473 .stdin(Stdio::piped())
474 .stdout(Stdio::piped())
475 .spawn().unwrap();
476 let bound : usize = 130000;
477 let (ret_stdout, ret_stderr, err) = subprocess_communicate(&mut process, &TEST_DATA[..], Some(bound), None, false);
478 err.unwrap();
479 assert_eq!(bound, ret_stdout.len());
480 assert_eq!(0usize, ret_stderr.len());
481 let mut i : usize = 0;
482 for item in ret_stdout.iter() {
483 let val : u8;
484 if (i & 1) == 1 {
485 val = '\n' as u8;
486 } else {
487 val = 'y' as u8;
488 }
489 assert_eq!(*item, val);
490 i += 1;
491 }
492}