1use std::io::{Read, Result};
2use std::task::Poll;
3
4use super::{Stream, RoleHelper, Guarded};
5use super::detail::read_some;
6
7impl<IO: Read, Role: RoleHelper> Read for Stream<IO, Role> {
8 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
21 match read_some(self, |io, buf| io.read(buf).into(), buf) {
22 Poll::Ready(x) => x,
23 Poll::Pending => unreachable!(),
24 }
25 }
26
27 fn read_to_end(&mut self, _: &mut Vec<u8>) -> Result<usize> {
29 panic!("Unsupported");
30 }
31
32 fn read_exact(&mut self, _: &mut [u8]) -> Result<()> {
34 panic!("Unsupported");
35 }
36
37 fn read_to_string(&mut self, _: &mut String) -> Result<usize> {
39 panic!("Unsupported");
40 }
41}
42
43impl<IO: Read, Role: RoleHelper> Read for Stream<IO, Role, Guarded> {
44 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
47 loop {
48 match read_some(self, |io, buf| io.read(buf).into(), buf) {
49 Poll::Ready(Ok(0)) if self.is_read_partial_head() || !self.is_read_end() => {
50 continue
51 }
52 Poll::Ready(x) => return x,
53 Poll::Pending => unreachable!(),
54 }
55 }
56 }
57
58 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
61 use std::io::BorrowedBuf;
62 use std::io::ErrorKind;
63
64 let start_len = buf.len();
65 let start_cap = buf.capacity();
66
67 let mut initialized = 0; loop {
69 if buf.len() < buf.capacity() + 14 {
70 buf.reserve(32); }
72
73 let mut read_buf: BorrowedBuf<'_> = buf.spare_capacity_mut().into();
74
75 unsafe {
77 read_buf.set_init(initialized);
78 }
79
80 let mut cursor = read_buf.unfilled();
81 match self.read_buf(cursor.reborrow()) {
82 Ok(()) => {}
83 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
84 Err(e) => return Err(e),
85 }
86
87 if cursor.written() == 0 {
88 return Ok(buf.len() - start_len);
89 }
90
91 initialized = cursor.init_mut().len();
93
94 unsafe {
96 let new_len = read_buf.filled().len() + buf.len();
97 buf.set_len(new_len);
98 }
99
100 if buf.len() == buf.capacity() && buf.capacity() == start_cap {
101 let mut probe = [0u8; 32];
106
107 loop {
108 match self.read(&mut probe) {
109 Ok(0) => return Ok(buf.len() - start_len),
110 Ok(n) => {
111 buf.extend_from_slice(&probe[..n]);
112 break;
113 }
114 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
115 Err(e) => return Err(e),
116 }
117 }
118 }
119 }
120 }
121}
122
123#[cfg(test)]
124mod test {
125 use std::io::Read;
126 use super::*;
127 use super::super::test::{LimitReadWriter, make_frame};
128 use crate::frame::*;
129 use crate::role::*;
130
131 #[test]
132 fn read_from_stream() {
133 fn read<R1: RoleHelper, R2: RoleHelper>(n: usize) {
134 let (frame, data) = make_frame::<R1>(OpCode::Binary, n);
135
136 let mut stream = Stream::new(frame.as_slice(), R2::new());
137
138 let mut buf = vec![0; n + 14];
139 let read_n = stream.read(&mut buf).unwrap();
140
141 assert_eq!(read_n, n);
142 assert_eq!(&buf[..n], &data);
143 }
144
145 for i in 0..=0x2000 {
146 read::<Client, Server>(i);
147 read::<Server, Client>(i);
148 }
149
150 for i in [65536, 65537, 100000] {
151 read::<Client, Server>(i);
152 read::<Server, Client>(i);
153 }
154 }
155
156 #[test]
157 fn read_from_limit_stream() {
158 fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, limit: usize) {
159 let (frame, data) = make_frame::<R1>(OpCode::Binary, n);
160
161 let io = LimitReadWriter {
162 buf: frame,
163 rlimit: limit,
164 wlimit: 0,
165 cursor: 0,
166 };
167
168 let mut buf = Vec::new();
169 let mut stream = Stream::new(io, R2::new()).guard();
170
171 let read_n = stream.read_to_end(&mut buf).unwrap();
172
173 assert_eq!(read_n, n);
174 assert_eq!(&buf[..n], &data);
175 }
176
177 for i in 0..=256 {
178 for limit in 1..=300 {
179 read::<Client, Server>(i, limit);
180 read::<Server, Client>(i, limit);
181 }
182 }
183
184 for i in [65536, 65537, 100000] {
185 for limit in 1..=1024 {
186 read::<Client, Server>(i, limit);
187 read::<Server, Client>(i, limit);
188 }
189 }
190 }
191
192 #[test]
193 fn read_eof_from_stream() {
194 fn read<R: RoleHelper>() {
195 let io = LimitReadWriter {
196 buf: b"EOFFFF:)".to_vec(),
197 rlimit: 0,
198 wlimit: 0,
199 cursor: 0,
200 };
201 let mut stream = Stream::new(io, R::new());
202 let mut buf = vec![0; 32];
203 let n = stream.read(&mut buf).unwrap();
204 assert_eq!(n, 0);
205 assert!(stream.is_read_end());
206 assert!(stream.is_read_eof());
207
208 let mut stream = stream.guard();
209
210 let n = stream.read_to_end(&mut buf).unwrap();
211 assert_eq!(n, 0);
212 assert!(stream.is_read_end());
213 assert!(stream.is_read_eof());
214 }
215 read::<Client>();
216 read::<Server>();
217 }
218
219 #[test]
220 fn read_close_from_stream() {
221 fn read<R1: RoleHelper, R2: RoleHelper>(limit: usize) {
222 let (frame, _) = make_frame::<R1>(OpCode::Close, 1);
223 let io = LimitReadWriter {
224 buf: frame,
225 rlimit: limit,
226 wlimit: 0,
227 cursor: 0,
228 };
229
230 let mut stream = Stream::new(io, R2::new());
231
232 let mut buf = vec![0; 32];
233
234 let n = stream.read(&mut buf).unwrap();
235 assert_eq!(n, 0);
236
237 let mut stream = stream.guard();
238
239 let n = stream.read_to_end(&mut buf).unwrap();
240 assert_eq!(n, 0);
241 assert!(stream.is_read_end());
242 assert!(stream.is_read_close());
243 }
244
245 for i in 1..=32 {
246 read::<Client, Server>(i);
247 read::<Server, Client>(i);
248 }
249 }
250
251 #[test]
252 fn read_ping_from_stream() {
253 fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, limit: usize) {
254 let (frame, data) = make_frame::<R1>(OpCode::Ping, n);
255
256 let io = LimitReadWriter {
257 buf: frame,
258 rlimit: limit,
259 wlimit: 0,
260 cursor: 0,
261 };
262
263 let mut buf = Vec::new();
264 let mut stream = Stream::new(io, R2::new()).guard();
265
266 let read_n = stream.read_to_end(&mut buf).unwrap();
267
268 assert_eq!(read_n, 0);
269 assert_eq!(stream.ping_data(), &data);
270 }
271
272 for i in 0..=125 {
273 for limit in 1..=128 {
274 read::<Client, Server>(i, limit);
275 read::<Server, Client>(i, limit);
276 }
277 }
278 }
279
280 #[test]
281 fn read_multi_frame_from_stream() {
282 fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, step: usize, limit: usize) {
283 let mut len = 0;
284 let mut frame = Vec::new();
285 let mut data = Vec::new();
286
287 for i in 0..n {
288 let (mut f, mut d) = make_frame::<R1>(OpCode::Binary, step + i * step);
289 len += d.len();
290 frame.append(&mut f);
291 data.append(&mut d);
292 assert_eq!(len, (i + 1) * (i + 2) * step / 2);
293 }
294
295 let (mut close, _) = make_frame::<R1>(OpCode::Close, 1);
296 frame.append(&mut close);
297
298 let io = LimitReadWriter {
299 buf: frame,
300 rlimit: limit,
301 wlimit: 0,
302 cursor: 0,
303 };
304
305 let mut buf = Vec::new();
306 let mut stream = Stream::new(io, R2::new()).guard();
307
308 let read_n = stream.read_to_end(&mut buf).unwrap();
309
310 assert!(stream.is_read_end());
311 assert!(stream.is_read_close());
312 assert_eq!(read_n, len);
313 assert_eq!(&buf[..len], &data);
314 }
315
316 for n in 1..=20 {
317 for step in [1, 10, 100, 1000, 10000] {
318 for limit in [1, 10, 100, 1000, 10000, usize::MAX] {
319 read::<Client, Server>(n, step, limit);
320 read::<Server, Client>(n, step, limit);
321 }
322 }
323 }
324 }
325
326 #[test]
327 fn read_multi_ping_from_stream() {
328 fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, step: usize, limit: usize) {
329 let mut len = 0;
330 let mut frame = Vec::new();
331 let mut data = Vec::new();
332
333 for i in 0..n {
334 let (mut f, d) = make_frame::<R1>(OpCode::Ping, step + i * step);
335 len += d.len();
336 frame.append(&mut f);
337 data = d;
338 assert_eq!(len, (i + 1) * (i + 2) * step / 2);
339 }
340
341 let io = LimitReadWriter {
342 buf: frame,
343 rlimit: limit,
344 wlimit: 0,
345 cursor: 0,
346 };
347
348 let mut buf = Vec::new();
349 let mut stream = Stream::new(io, R2::new()).guard();
350
351 let read_n = stream.read_to_end(&mut buf).unwrap();
352
353 assert_eq!(read_n, 0);
354 assert_eq!(stream.ping_data(), &data);
355 }
356
357 for n in 1..=125 {
358 for limit in 1..=128 {
359 read::<Client, Server>(n, 1, limit);
360 read::<Server, Client>(n, 1, limit);
361 }
362 }
363 }
364}