variable_len_reader/impls/
tokio.rs

1use core::pin::Pin;
2use core::task::{Context, Poll, ready};
3use tokio::io::{AsyncRead, AsyncWrite, Error, ErrorKind};
4use crate::asynchronous::{AsyncVariableReadable, AsyncVariableWritable};
5use crate::asynchronous::reader::AsyncVariableReader;
6use crate::asynchronous::writer::AsyncVariableWriter;
7use crate::util::read_buf::ReadBuf;
8use crate::util::write_buf::WriteBuf;
9
10impl<R: AsyncRead + Unpin> AsyncVariableReadable for R {
11    type Error = Error;
12
13    fn poll_read_single(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut Option<u8>) -> Poll<Result<u8, Self::Error>> {
14        if let Some(b) = buf.as_ref() { return Poll::Ready(Ok(*b)); }
15        let mut b = [0];
16        ready!(R::poll_read(self, cx, &mut tokio::io::ReadBuf::new(&mut b)))?;
17        buf.replace(b[0]);
18        Poll::Ready(Ok(b[0]))
19    }
20
21    fn poll_read_more(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<Result<(), Self::Error>> {
22        let origin = buf.left();
23        if origin == 0 { return Poll::Ready(Ok(())); }
24        let mut tokio_buf = buf.into();
25        ready!(R::poll_read(self, cx, &mut tokio_buf))?;
26        let filled = tokio_buf.filled().len();
27        buf.set_position(filled);
28        let left = buf.left();
29        if left == 0 {
30            Poll::Ready(Ok(()))
31        } else if left == origin {
32            Poll::Ready(Err(Error::new(ErrorKind::UnexpectedEof, "read 0 byte")))
33        } else {
34            cx.waker().wake_by_ref();
35            Poll::Pending
36        }
37    }
38}
39
40impl<R: AsyncRead + Unpin> AsyncVariableReader for R {
41    #[inline]
42    fn read_bool_error(future_name: &'static str, byte: u8) -> Self::Error {
43        Error::new(ErrorKind::InvalidData, alloc::format!("Invalid bool. value {} at future {}.", byte, future_name))
44    }
45
46    #[cfg(feature = "async_bools")]
47    #[inline]
48    fn read_bools_error(future_name: &'static str, byte: u8) -> Self::Error {
49        Error::new(ErrorKind::InvalidData, alloc::format!("Invalid bools. value {} at future {}.", byte, future_name))
50    }
51
52    #[cfg(feature = "async_varint")]
53    #[inline]
54    fn read_varint_error(future_name: &'static str, value: u128) -> Self::Error {
55        Error::new(ErrorKind::InvalidData, alloc::format!("Too long varint value. {} at future {}.", value, future_name))
56    }
57
58    #[cfg(feature = "async_string")]
59    #[inline]
60    fn read_string_error(_future_name: &'static str, error: alloc::string::FromUtf8Error) -> Self::Error {
61        Error::new(ErrorKind::InvalidData, error)
62    }
63}
64
65impl<W: AsyncWrite + Unpin> AsyncVariableWritable for W {
66    type Error = Error;
67
68    fn poll_write_single(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut Option<u8>) -> Poll<Result<(), Self::Error>> {
69        match buf.as_ref() {
70            None => Poll::Ready(Ok(())),
71            Some(b) => W::poll_write(self, cx, &[*b]).map_ok(|_i| { *buf = None; () })
72        }
73    }
74
75    fn poll_write_more(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut WriteBuf<'_>) -> Poll<Result<(), Self::Error>> {
76        while buf.left() > 0 {
77            let position = buf.position();
78            let n = core::task::ready!(W::poll_write(self.as_mut(), cx, &buf.buf()[position..]))?;
79            if n == 0 {
80                return Poll::Ready(Err(Error::new(ErrorKind::WriteZero, "write 0 bytes")));
81            }
82            buf.skip(n);
83        }
84        Poll::Ready(Ok(()))
85    }
86}
87
88impl<W: AsyncWrite + Unpin> AsyncVariableWriter for W { }