1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#[cfg(unix)]
mod os {
    use crate::posix;
    use std::fs::File;
    use std::io::{Read, Write, Result as IoResult};
    use std::os::unix::io::AsRawFd;
    use std::cmp::min;

    fn poll3(fin: Option<&File>, fout: Option<&File>, ferr: Option<&File>)
             -> IoResult<(bool, bool, bool)> {
        fn to_poll(f: Option<&File>, for_read: bool) -> posix::PollFd {
            let optfd = f.map(File::as_raw_fd);
            let events = if for_read { posix::POLLIN } else { posix::POLLOUT };
            posix::PollFd::new(optfd, events)
        }

        let mut fds = [to_poll(fin, false),
                       to_poll(fout, true), to_poll(ferr, true)];
        posix::poll(&mut fds, None)?;

        Ok((fds[0].test(posix::POLLOUT | posix::POLLHUP),
            fds[1].test(posix::POLLIN | posix::POLLHUP),
            fds[2].test(posix::POLLIN | posix::POLLHUP)))
    }

    fn comm_poll(stdin_ref: &mut Option<File>,
                 stdout_ref: &mut Option<File>,
                 stderr_ref: &mut Option<File>,
                 mut input_data: &[u8])
                 -> IoResult<(Vec<u8>, Vec<u8>)> {
        // Note: chunk size for writing must be smaller than the pipe
        // buffer size.  A large enough write to a blocking deadlocks
        // despite the use of poll() to check that it's ok to write.
        const WRITE_SIZE: usize = 4096;

        let mut stdout_ref = stdout_ref.as_ref();
        let mut stderr_ref = stderr_ref.as_ref();

        let mut out = Vec::<u8>::new();
        let mut err = Vec::<u8>::new();

        loop {
            match (stdin_ref.as_ref(), stdout_ref, stderr_ref) {
                // When only a single stream remains for reading or
                // writing, we no longer need polling.  When no stream
                // remains, we are done.
                (Some(..), None, None) => {
                    stdin_ref.as_ref().unwrap().write_all(input_data)?;
                    // close stdin when done writing, so the child receives EOF
                    stdin_ref.take();
                    break;
                }
                (None, Some(ref mut stdout), None) => {
                    stdout.read_to_end(&mut out)?;
                    break;
                }
                (None, None, Some(ref mut stderr)) => {
                    stderr.read_to_end(&mut err)?;
                    break;
                }
                (None, None, None) => break,
                _ => ()
            }

            let (in_ready, out_ready, err_ready)
                = poll3(stdin_ref.as_ref(), stdout_ref, stderr_ref)?;
            if in_ready {
                let chunk = &input_data[..min(WRITE_SIZE, input_data.len())];
                let n = stdin_ref.as_ref().unwrap().write(chunk)?;
                input_data = &input_data[n..];
                if input_data.is_empty() {
                    // close stdin when done writing, so the child receives EOF
                    stdin_ref.take();
                }
            }
            if out_ready {
                let mut buf = [0u8; 4096];
                let n = stdout_ref.unwrap().read(&mut buf)?;
                if n != 0 {
                    out.extend(&buf[..n]);
                } else {
                    stdout_ref = None;
                }
            }
            if err_ready {
                let mut buf = [0u8; 4096];
                let n = stderr_ref.unwrap().read(&mut buf)?;
                if n != 0 {
                    err.extend(&buf[..n]);
                } else {
                    stderr_ref = None;
                }
            }
        }

        Ok((out, err))
    }

    pub fn communicate(stdin_ref: &mut Option<File>,
                       stdout_ref: &mut Option<File>,
                       stderr_ref: &mut Option<File>,
                       input_data: Option<&[u8]>)
                       -> IoResult<(Option<Vec<u8>>, Option<Vec<u8>>)> {
        if stdin_ref.is_some() {
            input_data.expect("must provide input to redirected stdin");
        } else {
            assert!(input_data.is_none(),
                    "cannot provide input to non-redirected stdin");
        }
        let input_data = input_data.unwrap_or(b"");
        let (out, err) = comm_poll(stdin_ref, stdout_ref, stderr_ref,
                                   input_data)?;
        Ok((stdout_ref.as_ref().map(|_| out),
            stderr_ref.as_ref().map(|_| err)))
    }
}

#[cfg(windows)]
mod os {
    use std::fs::File;
    use std::io::{Read, Write, Result as IoResult};

    fn comm_read(mut outfile: File) -> IoResult<Vec<u8>> {
        // take() ensures stdin is closed when done writing, so the
        // child receives EOF
        let mut contents = Vec::new();
        outfile.read_to_end(&mut contents)?;
        Ok(contents)
    }

    fn comm_write(mut infile: File, input_data: &[u8]) -> IoResult<()> {
        infile.write_all(input_data)?;
        Ok(())
    }

    pub fn comm_threaded(stdin_ref: &mut Option<File>,
                         stdout_ref: &mut Option<File>,
                         stderr_ref: &mut Option<File>,
                         input_data: Option<&[u8]>)
                         -> IoResult<(Option<Vec<u8>>, Option<Vec<u8>>)> {
        crossbeam_utils::thread::scope(move |scope| {
            let (mut out_thr, mut err_thr) = (None, None);
            if stdout_ref.is_some() {
                out_thr = Some(scope.spawn(
                    move |_| comm_read(stdout_ref.take().unwrap())))
            }
            if stderr_ref.is_some() {
                err_thr = Some(scope.spawn(
                    move |_| comm_read(stderr_ref.take().unwrap())))
            }
            if stdin_ref.is_some() {
                let input_data = input_data.expect(
                    "must provide input to redirected stdin");
                comm_write(stdin_ref.take().unwrap(), input_data)?;
            }
            Ok((if let Some(out_thr) = out_thr
                { Some(out_thr.join().unwrap()?) } else { None },
                if let Some(err_thr) = err_thr
                { Some(err_thr.join().unwrap()?) } else { None }))
        }).unwrap()
    }

    pub fn communicate(stdin: &mut Option<File>,
                       stdout: &mut Option<File>,
                       stderr: &mut Option<File>,
                       input_data: Option<&[u8]>)
                       -> IoResult<(Option<Vec<u8>>, Option<Vec<u8>>)> {
        match (stdin, stdout, stderr) {
            (stdin_ref @ &mut Some(..), &mut None, &mut None) => {
                let input_data = input_data.expect(
                    "must provide input to redirected stdin");
                comm_write(stdin_ref.take().unwrap(), input_data)?;
                Ok((None, None))
            }
            (&mut None, stdout_ref @ &mut Some(..), &mut None) => {
                assert!(input_data.is_none(),
                        "cannot provide input to non-redirected stdin");
                let out = comm_read(stdout_ref.take().unwrap())?;
                Ok((Some(out), None))
            }
            (&mut None, &mut None, stderr_ref @ &mut Some(..)) => {
                assert!(input_data.is_none(),
                        "cannot provide input to non-redirected stdin");
                let err = comm_read(stderr_ref.take().unwrap())?;
                Ok((None, Some(err)))
            }
            (ref mut stdin_ref, ref mut stdout_ref, ref mut stderr_ref) =>
                comm_threaded(stdin_ref, stdout_ref, stderr_ref, input_data)
        }
    }
}

pub use self::os::communicate;