futures_util/io/
read_to_end.rs1use std::io;
2use std::mem;
3use std::vec::Vec;
4
5use {Async, Poll, Future, task};
6
7use io::AsyncRead;
8
9#[derive(Debug)]
16pub struct ReadToEnd<A> {
17 state: State<A>,
18}
19
20#[derive(Debug)]
21enum State<A> {
22 Reading {
23 a: A,
24 buf: Vec<u8>,
25 },
26 Empty,
27}
28
29pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
30 where A: AsyncRead,
31{
32 ReadToEnd {
33 state: State::Reading {
34 a,
35 buf,
36 }
37 }
38}
39
40struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
41
42impl<'a> Drop for Guard<'a> {
43 fn drop(&mut self) {
44 unsafe { self.buf.set_len(self.len); }
45 }
46}
47
48fn read_to_end_internal<R: AsyncRead + ?Sized>(r: &mut R, cx: &mut task::Context, buf: &mut Vec<u8>)
58 -> Poll<usize, io::Error>
59{
60 let start_len = buf.len();
61 let mut g = Guard { len: buf.len(), buf: buf };
62 let ret;
63 loop {
64 if g.len == g.buf.len() {
65 unsafe {
66 g.buf.reserve(32);
67 let capacity = g.buf.capacity();
68 g.buf.set_len(capacity);
69 r.initializer().initialize(&mut g.buf[g.len..]);
70 }
71 }
72
73 match r.poll_read(cx, &mut g.buf[g.len..]) {
74 Ok(Async::Ready(0)) => {
75 ret = Ok(Async::Ready(g.len - start_len));
76 break;
77 }
78 Ok(Async::Ready(n)) => g.len += n,
79 Ok(Async::Pending) => return Ok(Async::Pending),
80 Err(e) => {
81 ret = Err(e);
82 break;
83 }
84 }
85 }
86
87 ret
88}
89
90impl<A> Future for ReadToEnd<A>
91 where A: AsyncRead,
92{
93 type Item = (A, Vec<u8>);
94 type Error = io::Error;
95
96 fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, Vec<u8>), io::Error> {
97 match self.state {
98 State::Reading { ref mut a, ref mut buf } => {
99 try_ready!(read_to_end_internal(a, cx, buf));
103 },
104 State::Empty => panic!("poll ReadToEnd after it's done"),
105 }
106
107 match mem::replace(&mut self.state, State::Empty) {
108 State::Reading { a, buf } => Ok((a, buf).into()),
109 State::Empty => unreachable!(),
110 }
111 }
112}