lightws/stream/
read.rs

1use std::io::{Read, Result};
2use std::task::Poll;
3
4use super::{Stream, RoleHelper, Guarded};
5use super::detail::read_some;
6
7impl<IO: Read, Role: RoleHelper> Read for Stream<IO, Role> {
8    /// Read some data from the underlying IO source,
9    /// returns `Ok(0)` until a complete frame head is present.
10    /// Caller should ensure the available buffer size is larger
11    /// than **14** before a read.
12    ///
13    /// Read a control frame(like Ping) returns `Ok(0)`,
14    /// which could be detected via [`Stream::is_pinged`].
15    ///
16    /// Any read after receiving a `Close` frame or reaching `EOF`
17    /// will return `Ok(0)`,
18    /// which could be checked via [`Stream::is_read_end`],
19    /// [`Stream::is_read_close`], [`Stream::is_read_eof`].
20    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
21        match read_some(self, |io, buf| io.read(buf).into(), buf) {
22            Poll::Ready(x) => x,
23            Poll::Pending => unreachable!(),
24        }
25    }
26
27    /// **This is NOT supported!**
28    fn read_to_end(&mut self, _: &mut Vec<u8>) -> Result<usize> {
29        panic!("Unsupported");
30    }
31
32    /// **This is NOT supported!**
33    fn read_exact(&mut self, _: &mut [u8]) -> Result<()> {
34        panic!("Unsupported");
35    }
36
37    /// **This is NOT supported!**
38    fn read_to_string(&mut self, _: &mut String) -> Result<usize> {
39        panic!("Unsupported");
40    }
41}
42
43impl<IO: Read, Role: RoleHelper> Read for Stream<IO, Role, Guarded> {
44    /// Wrap read in a loop.
45    /// Continue to read if frame head is not complete.
46    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
47        loop {
48            match read_some(self, |io, buf| io.read(buf).into(), buf) {
49                Poll::Ready(Ok(0)) if self.is_read_partial_head() || !self.is_read_end() => {
50                    continue
51                }
52                Poll::Ready(x) => return x,
53                Poll::Pending => unreachable!(),
54            }
55        }
56    }
57
58    /// Override default implement, extend reserved buffer size,
59    /// so that there is enough space to accommodate frame head.
60    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
61        use std::io::BorrowedBuf;
62        use std::io::ErrorKind;
63
64        let start_len = buf.len();
65        let start_cap = buf.capacity();
66
67        let mut initialized = 0; // Extra initialized bytes from previous loop iteration
68        loop {
69            if buf.len() < buf.capacity() + 14 {
70                buf.reserve(32); // buf is full, need more space
71            }
72
73            let mut read_buf: BorrowedBuf<'_> = buf.spare_capacity_mut().into();
74
75            // SAFETY: These bytes were initialized but not filled in the previous loop
76            unsafe {
77                read_buf.set_init(initialized);
78            }
79
80            let mut cursor = read_buf.unfilled();
81            match self.read_buf(cursor.reborrow()) {
82                Ok(()) => {}
83                Err(e) if e.kind() == ErrorKind::Interrupted => continue,
84                Err(e) => return Err(e),
85            }
86
87            if cursor.written() == 0 {
88                return Ok(buf.len() - start_len);
89            }
90
91            // store how much was initialized but not filled
92            initialized = cursor.init_mut().len();
93
94            // SAFETY: BorrowedBuf's invariants mean this much memory is init
95            unsafe {
96                let new_len = read_buf.filled().len() + buf.len();
97                buf.set_len(new_len);
98            }
99
100            if buf.len() == buf.capacity() && buf.capacity() == start_cap {
101                // The buffer might be an exact fit. Let's read into a probe buffer
102                // and see if it returns `Ok(0)`. If so, we've avoided an
103                // unnecessary doubling of the capacity. But if not, append the
104                // probe buffer to the primary buffer and let its capacity grow.
105                let mut probe = [0u8; 32];
106
107                loop {
108                    match self.read(&mut probe) {
109                        Ok(0) => return Ok(buf.len() - start_len),
110                        Ok(n) => {
111                            buf.extend_from_slice(&probe[..n]);
112                            break;
113                        }
114                        Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
115                        Err(e) => return Err(e),
116                    }
117                }
118            }
119        }
120    }
121}
122
123#[cfg(test)]
124mod test {
125    use std::io::Read;
126    use super::*;
127    use super::super::test::{LimitReadWriter, make_frame};
128    use crate::frame::*;
129    use crate::role::*;
130
131    #[test]
132    fn read_from_stream() {
133        fn read<R1: RoleHelper, R2: RoleHelper>(n: usize) {
134            let (frame, data) = make_frame::<R1>(OpCode::Binary, n);
135
136            let mut stream = Stream::new(frame.as_slice(), R2::new());
137
138            let mut buf = vec![0; n + 14];
139            let read_n = stream.read(&mut buf).unwrap();
140
141            assert_eq!(read_n, n);
142            assert_eq!(&buf[..n], &data);
143        }
144
145        for i in 0..=0x2000 {
146            read::<Client, Server>(i);
147            read::<Server, Client>(i);
148        }
149
150        for i in [65536, 65537, 100000] {
151            read::<Client, Server>(i);
152            read::<Server, Client>(i);
153        }
154    }
155
156    #[test]
157    fn read_from_limit_stream() {
158        fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, limit: usize) {
159            let (frame, data) = make_frame::<R1>(OpCode::Binary, n);
160
161            let io = LimitReadWriter {
162                buf: frame,
163                rlimit: limit,
164                wlimit: 0,
165                cursor: 0,
166            };
167
168            let mut buf = Vec::new();
169            let mut stream = Stream::new(io, R2::new()).guard();
170
171            let read_n = stream.read_to_end(&mut buf).unwrap();
172
173            assert_eq!(read_n, n);
174            assert_eq!(&buf[..n], &data);
175        }
176
177        for i in 0..=256 {
178            for limit in 1..=300 {
179                read::<Client, Server>(i, limit);
180                read::<Server, Client>(i, limit);
181            }
182        }
183
184        for i in [65536, 65537, 100000] {
185            for limit in 1..=1024 {
186                read::<Client, Server>(i, limit);
187                read::<Server, Client>(i, limit);
188            }
189        }
190    }
191
192    #[test]
193    fn read_eof_from_stream() {
194        fn read<R: RoleHelper>() {
195            let io = LimitReadWriter {
196                buf: b"EOFFFF:)".to_vec(),
197                rlimit: 0,
198                wlimit: 0,
199                cursor: 0,
200            };
201            let mut stream = Stream::new(io, R::new());
202            let mut buf = vec![0; 32];
203            let n = stream.read(&mut buf).unwrap();
204            assert_eq!(n, 0);
205            assert!(stream.is_read_end());
206            assert!(stream.is_read_eof());
207
208            let mut stream = stream.guard();
209
210            let n = stream.read_to_end(&mut buf).unwrap();
211            assert_eq!(n, 0);
212            assert!(stream.is_read_end());
213            assert!(stream.is_read_eof());
214        }
215        read::<Client>();
216        read::<Server>();
217    }
218
219    #[test]
220    fn read_close_from_stream() {
221        fn read<R1: RoleHelper, R2: RoleHelper>(limit: usize) {
222            let (frame, _) = make_frame::<R1>(OpCode::Close, 1);
223            let io = LimitReadWriter {
224                buf: frame,
225                rlimit: limit,
226                wlimit: 0,
227                cursor: 0,
228            };
229
230            let mut stream = Stream::new(io, R2::new());
231
232            let mut buf = vec![0; 32];
233
234            let n = stream.read(&mut buf).unwrap();
235            assert_eq!(n, 0);
236
237            let mut stream = stream.guard();
238
239            let n = stream.read_to_end(&mut buf).unwrap();
240            assert_eq!(n, 0);
241            assert!(stream.is_read_end());
242            assert!(stream.is_read_close());
243        }
244
245        for i in 1..=32 {
246            read::<Client, Server>(i);
247            read::<Server, Client>(i);
248        }
249    }
250
251    #[test]
252    fn read_ping_from_stream() {
253        fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, limit: usize) {
254            let (frame, data) = make_frame::<R1>(OpCode::Ping, n);
255
256            let io = LimitReadWriter {
257                buf: frame,
258                rlimit: limit,
259                wlimit: 0,
260                cursor: 0,
261            };
262
263            let mut buf = Vec::new();
264            let mut stream = Stream::new(io, R2::new()).guard();
265
266            let read_n = stream.read_to_end(&mut buf).unwrap();
267
268            assert_eq!(read_n, 0);
269            assert_eq!(stream.ping_data(), &data);
270        }
271
272        for i in 0..=125 {
273            for limit in 1..=128 {
274                read::<Client, Server>(i, limit);
275                read::<Server, Client>(i, limit);
276            }
277        }
278    }
279
280    #[test]
281    fn read_multi_frame_from_stream() {
282        fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, step: usize, limit: usize) {
283            let mut len = 0;
284            let mut frame = Vec::new();
285            let mut data = Vec::new();
286
287            for i in 0..n {
288                let (mut f, mut d) = make_frame::<R1>(OpCode::Binary, step + i * step);
289                len += d.len();
290                frame.append(&mut f);
291                data.append(&mut d);
292                assert_eq!(len, (i + 1) * (i + 2) * step / 2);
293            }
294
295            let (mut close, _) = make_frame::<R1>(OpCode::Close, 1);
296            frame.append(&mut close);
297
298            let io = LimitReadWriter {
299                buf: frame,
300                rlimit: limit,
301                wlimit: 0,
302                cursor: 0,
303            };
304
305            let mut buf = Vec::new();
306            let mut stream = Stream::new(io, R2::new()).guard();
307
308            let read_n = stream.read_to_end(&mut buf).unwrap();
309
310            assert!(stream.is_read_end());
311            assert!(stream.is_read_close());
312            assert_eq!(read_n, len);
313            assert_eq!(&buf[..len], &data);
314        }
315
316        for n in 1..=20 {
317            for step in [1, 10, 100, 1000, 10000] {
318                for limit in [1, 10, 100, 1000, 10000, usize::MAX] {
319                    read::<Client, Server>(n, step, limit);
320                    read::<Server, Client>(n, step, limit);
321                }
322            }
323        }
324    }
325
326    #[test]
327    fn read_multi_ping_from_stream() {
328        fn read<R1: RoleHelper, R2: RoleHelper>(n: usize, step: usize, limit: usize) {
329            let mut len = 0;
330            let mut frame = Vec::new();
331            let mut data = Vec::new();
332
333            for i in 0..n {
334                let (mut f, d) = make_frame::<R1>(OpCode::Ping, step + i * step);
335                len += d.len();
336                frame.append(&mut f);
337                data = d;
338                assert_eq!(len, (i + 1) * (i + 2) * step / 2);
339            }
340
341            let io = LimitReadWriter {
342                buf: frame,
343                rlimit: limit,
344                wlimit: 0,
345                cursor: 0,
346            };
347
348            let mut buf = Vec::new();
349            let mut stream = Stream::new(io, R2::new()).guard();
350
351            let read_n = stream.read_to_end(&mut buf).unwrap();
352
353            assert_eq!(read_n, 0);
354            assert_eq!(stream.ping_data(), &data);
355        }
356
357        for n in 1..=125 {
358            for limit in 1..=128 {
359                read::<Client, Server>(n, 1, limit);
360                read::<Server, Client>(n, 1, limit);
361            }
362        }
363    }
364}