variable_len_reader/impls/
tokio.rs1use core::pin::Pin;
2use core::task::{Context, Poll, ready};
3use tokio::io::{AsyncRead, AsyncWrite, Error, ErrorKind};
4use crate::asynchronous::{AsyncVariableReadable, AsyncVariableWritable};
5use crate::asynchronous::reader::AsyncVariableReader;
6use crate::asynchronous::writer::AsyncVariableWriter;
7use crate::util::read_buf::ReadBuf;
8use crate::util::write_buf::WriteBuf;
9
10impl<R: AsyncRead + Unpin> AsyncVariableReadable for R {
11 type Error = Error;
12
13 fn poll_read_single(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut Option<u8>) -> Poll<Result<u8, Self::Error>> {
14 if let Some(b) = buf.as_ref() { return Poll::Ready(Ok(*b)); }
15 let mut b = [0];
16 ready!(R::poll_read(self, cx, &mut tokio::io::ReadBuf::new(&mut b)))?;
17 buf.replace(b[0]);
18 Poll::Ready(Ok(b[0]))
19 }
20
21 fn poll_read_more(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<Result<(), Self::Error>> {
22 let origin = buf.left();
23 if origin == 0 { return Poll::Ready(Ok(())); }
24 let mut tokio_buf = buf.into();
25 ready!(R::poll_read(self, cx, &mut tokio_buf))?;
26 let filled = tokio_buf.filled().len();
27 buf.set_position(filled);
28 let left = buf.left();
29 if left == 0 {
30 Poll::Ready(Ok(()))
31 } else if left == origin {
32 Poll::Ready(Err(Error::new(ErrorKind::UnexpectedEof, "read 0 byte")))
33 } else {
34 cx.waker().wake_by_ref();
35 Poll::Pending
36 }
37 }
38}
39
40impl<R: AsyncRead + Unpin> AsyncVariableReader for R {
41 #[inline]
42 fn read_bool_error(future_name: &'static str, byte: u8) -> Self::Error {
43 Error::new(ErrorKind::InvalidData, alloc::format!("Invalid bool. value {} at future {}.", byte, future_name))
44 }
45
46 #[cfg(feature = "async_bools")]
47 #[inline]
48 fn read_bools_error(future_name: &'static str, byte: u8) -> Self::Error {
49 Error::new(ErrorKind::InvalidData, alloc::format!("Invalid bools. value {} at future {}.", byte, future_name))
50 }
51
52 #[cfg(feature = "async_varint")]
53 #[inline]
54 fn read_varint_error(future_name: &'static str, value: u128) -> Self::Error {
55 Error::new(ErrorKind::InvalidData, alloc::format!("Too long varint value. {} at future {}.", value, future_name))
56 }
57
58 #[cfg(feature = "async_string")]
59 #[inline]
60 fn read_string_error(_future_name: &'static str, error: alloc::string::FromUtf8Error) -> Self::Error {
61 Error::new(ErrorKind::InvalidData, error)
62 }
63}
64
65impl<W: AsyncWrite + Unpin> AsyncVariableWritable for W {
66 type Error = Error;
67
68 fn poll_write_single(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut Option<u8>) -> Poll<Result<(), Self::Error>> {
69 match buf.as_ref() {
70 None => Poll::Ready(Ok(())),
71 Some(b) => W::poll_write(self, cx, &[*b]).map_ok(|_i| { *buf = None; () })
72 }
73 }
74
75 fn poll_write_more(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut WriteBuf<'_>) -> Poll<Result<(), Self::Error>> {
76 while buf.left() > 0 {
77 let position = buf.position();
78 let n = core::task::ready!(W::poll_write(self.as_mut(), cx, &buf.buf()[position..]))?;
79 if n == 0 {
80 return Poll::Ready(Err(Error::new(ErrorKind::WriteZero, "write 0 bytes")));
81 }
82 buf.skip(n);
83 }
84 Poll::Ready(Ok(()))
85 }
86}
87
88impl<W: AsyncWrite + Unpin> AsyncVariableWriter for W { }