lightws/stream/
async_read.rs

1use std::io::Result;
2use std::pin::Pin;
3use std::task::{Poll, Context};
4
5use tokio::io::AsyncRead;
6use tokio::io::ReadBuf;
7
8use super::{Stream, RoleHelper, Guarded};
9use super::detail::read_some;
10
11impl<IO, Role> AsyncRead for Stream<IO, Role>
12where
13    IO: AsyncRead + Unpin,
14    Stream<IO, Role>: Unpin,
15    Role: RoleHelper,
16{
17    /// Async version of `Stream::read`.
18    #[rustfmt::skip]
19    fn poll_read(
20        self: Pin<&mut Self>,
21        cx: &mut Context<'_>,
22        buf: &mut ReadBuf<'_>,
23    ) -> Poll<Result<()>> {
24        read_some(self.get_mut(), |io, buf| {
25                let mut buf = ReadBuf::new(buf);
26                Pin::new(io).poll_read(cx, &mut buf)
27                .map_ok(|_| buf.filled().len())
28            },
29            buf.initialize_unfilled(),
30        ).map_ok(|n| buf.advance(n))
31    }
32}
33
34impl<IO, Role> AsyncRead for Stream<IO, Role, Guarded>
35where
36    IO: AsyncRead + Unpin,
37    Stream<IO, Role, Guarded>: Unpin,
38    Role: RoleHelper,
39{
40    /// Async version of `Stream::read`.
41    /// Continue to read if frame head is not complete.
42    fn poll_read(
43        self: Pin<&mut Self>,
44        cx: &mut Context<'_>,
45        buf: &mut ReadBuf<'_>,
46    ) -> Poll<Result<()>> {
47        let this = self.get_mut();
48
49        loop {
50            match read_some(
51                this,
52                |io, buf| {
53                    let mut buf = ReadBuf::new(buf);
54                    Pin::new(io)
55                        .poll_read(cx, &mut buf)
56                        .map_ok(|_| buf.filled().len())
57                },
58                buf.initialize_unfilled(),
59            ) {
60                Poll::Ready(Ok(0)) if this.is_read_partial_head() || !this.is_read_end() => {
61                    continue
62                }
63                Poll::Ready(Ok(n)) => {
64                    buf.advance(n);
65                    return Poll::Ready(Ok(()));
66                }
67                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
68                Poll::Pending => return Poll::Pending,
69            }
70        }
71    }
72}