1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::io;
use std::mem;
use std::vec::Vec;
use {Async, Poll, Future, task};
use io::AsyncRead;
#[derive(Debug)]
pub struct ReadToEnd<A> {
state: State<A>,
}
#[derive(Debug)]
enum State<A> {
Reading {
a: A,
buf: Vec<u8>,
},
Empty,
}
pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
where A: AsyncRead,
{
ReadToEnd {
state: State::Reading {
a,
buf,
}
}
}
struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
unsafe { self.buf.set_len(self.len); }
}
}
fn read_to_end_internal<R: AsyncRead + ?Sized>(r: &mut R, cx: &mut task::Context, buf: &mut Vec<u8>)
-> Poll<usize, io::Error>
{
let start_len = buf.len();
let mut g = Guard { len: buf.len(), buf: buf };
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
r.initializer().initialize(&mut g.buf[g.len..]);
}
}
match r.poll_read(cx, &mut g.buf[g.len..]) {
Ok(Async::Ready(0)) => {
ret = Ok(Async::Ready(g.len - start_len));
break;
}
Ok(Async::Ready(n)) => g.len += n,
Ok(Async::Pending) => return Ok(Async::Pending),
Err(e) => {
ret = Err(e);
break;
}
}
}
ret
}
impl<A> Future for ReadToEnd<A>
where A: AsyncRead,
{
type Item = (A, Vec<u8>);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(A, Vec<u8>), io::Error> {
match self.state {
State::Reading { ref mut a, ref mut buf } => {
try_ready!(read_to_end_internal(a, cx, buf));
},
State::Empty => panic!("poll ReadToEnd after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf } => Ok((a, buf).into()),
State::Empty => unreachable!(),
}
}
}