lightws/stream/
async_read.rs1use std::io::Result;
2use std::pin::Pin;
3use std::task::{Poll, Context};
4
5use tokio::io::AsyncRead;
6use tokio::io::ReadBuf;
7
8use super::{Stream, RoleHelper, Guarded};
9use super::detail::read_some;
10
11impl<IO, Role> AsyncRead for Stream<IO, Role>
12where
13 IO: AsyncRead + Unpin,
14 Stream<IO, Role>: Unpin,
15 Role: RoleHelper,
16{
17 #[rustfmt::skip]
19 fn poll_read(
20 self: Pin<&mut Self>,
21 cx: &mut Context<'_>,
22 buf: &mut ReadBuf<'_>,
23 ) -> Poll<Result<()>> {
24 read_some(self.get_mut(), |io, buf| {
25 let mut buf = ReadBuf::new(buf);
26 Pin::new(io).poll_read(cx, &mut buf)
27 .map_ok(|_| buf.filled().len())
28 },
29 buf.initialize_unfilled(),
30 ).map_ok(|n| buf.advance(n))
31 }
32}
33
34impl<IO, Role> AsyncRead for Stream<IO, Role, Guarded>
35where
36 IO: AsyncRead + Unpin,
37 Stream<IO, Role, Guarded>: Unpin,
38 Role: RoleHelper,
39{
40 fn poll_read(
43 self: Pin<&mut Self>,
44 cx: &mut Context<'_>,
45 buf: &mut ReadBuf<'_>,
46 ) -> Poll<Result<()>> {
47 let this = self.get_mut();
48
49 loop {
50 match read_some(
51 this,
52 |io, buf| {
53 let mut buf = ReadBuf::new(buf);
54 Pin::new(io)
55 .poll_read(cx, &mut buf)
56 .map_ok(|_| buf.filled().len())
57 },
58 buf.initialize_unfilled(),
59 ) {
60 Poll::Ready(Ok(0)) if this.is_read_partial_head() || !this.is_read_end() => {
61 continue
62 }
63 Poll::Ready(Ok(n)) => {
64 buf.advance(n);
65 return Poll::Ready(Ok(()));
66 }
67 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
68 Poll::Pending => return Poll::Pending,
69 }
70 }
71 }
72}