futures_util/io/
read_to_end.rs

1use std::io;
2use std::mem;
3use std::vec::Vec;
4
5use {Async, Poll, Future, task};
6
7use io::AsyncRead;
8
9/// A future which can be used to easily read the entire contents of a stream
10/// into a vector.
11///
12/// Created by the [`read_to_end`] function.
13///
14/// [`read_to_end`]: fn.read_to_end.html
15#[derive(Debug)]
16pub struct ReadToEnd<A> {
17    state: State<A>,
18}
19
20#[derive(Debug)]
21enum State<A> {
22    Reading {
23        a: A,
24        buf: Vec<u8>,
25    },
26    Empty,
27}
28
29pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
30    where A: AsyncRead,
31{
32    ReadToEnd {
33        state: State::Reading {
34            a,
35            buf,
36        }
37    }
38}
39
40struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
41
42impl<'a> Drop for Guard<'a> {
43    fn drop(&mut self) {
44        unsafe { self.buf.set_len(self.len); }
45    }
46}
47
48// This uses an adaptive system to extend the vector when it fills. We want to
49// avoid paying to allocate and zero a huge chunk of memory if the reader only
50// has 4 bytes while still making large reads if the reader does have a ton
51// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
52// time is 4,500 times (!) slower than this if the reader has a very small
53// amount of data to return.
54//
55// Because we're extending the buffer with uninitialized data for trusted
56// readers, we need to make sure to truncate that if any of this panics.
57fn read_to_end_internal<R: AsyncRead + ?Sized>(r: &mut R, cx: &mut task::Context, buf: &mut Vec<u8>)
58    -> Poll<usize, io::Error>
59{
60    let start_len = buf.len();
61    let mut g = Guard { len: buf.len(), buf: buf };
62    let ret;
63    loop {
64        if g.len == g.buf.len() {
65            unsafe {
66                g.buf.reserve(32);
67                let capacity = g.buf.capacity();
68                g.buf.set_len(capacity);
69                r.initializer().initialize(&mut g.buf[g.len..]);
70            }
71        }
72
73        match r.poll_read(cx, &mut g.buf[g.len..]) {
74            Ok(Async::Ready(0)) => {
75                ret = Ok(Async::Ready(g.len - start_len));
76                break;
77            }
78            Ok(Async::Ready(n)) => g.len += n,
79            Ok(Async::Pending) => return Ok(Async::Pending),
80            Err(e) => {
81                ret = Err(e);
82                break;
83            }
84        }
85    }
86
87    ret
88}
89
90impl<A> Future for ReadToEnd<A>
91    where A: AsyncRead,
92{
93    type Item = (A, Vec<u8>);
94    type Error = io::Error;
95
96    fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, Vec<u8>), io::Error> {
97        match self.state {
98            State::Reading { ref mut a, ref mut buf } => {
99                // If we get `Ok`, then we know the stream hit EOF and we're done. If we
100                // hit "would block" then all the read data so far is in our buffer, and
101                // otherwise we propagate errors
102                try_ready!(read_to_end_internal(a, cx, buf));
103            },
104            State::Empty => panic!("poll ReadToEnd after it's done"),
105        }
106
107        match mem::replace(&mut self.state, State::Empty) {
108            State::Reading { a, buf } => Ok((a, buf).into()),
109            State::Empty => unreachable!(),
110        }
111    }
112}