1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use core::pin::Pin;
use core::task::{Context, Poll, ready};
use tokio::io::{AsyncRead, AsyncWrite, Error, ErrorKind};
use crate::asynchronous::{AsyncVariableReadable, AsyncVariableWritable};
use crate::asynchronous::reader::AsyncVariableReader;
use crate::asynchronous::writer::AsyncVariableWriter;
use crate::util::read_buf::ReadBuf;
use crate::util::write_buf::WriteBuf;

impl<R: AsyncRead + Unpin> AsyncVariableReadable for R {
    type Error = Error;

    fn poll_read_single(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut Option<u8>) -> Poll<Result<u8, Self::Error>> {
        if let Some(b) = buf.as_ref() { return Poll::Ready(Ok(*b)); }
        let mut b = [0];
        ready!(R::poll_read(self, cx, &mut tokio::io::ReadBuf::new(&mut b)))?;
        buf.replace(b[0]);
        Poll::Ready(Ok(b[0]))
    }

    fn poll_read_more(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<Result<(), Self::Error>> {
        let origin = buf.left();
        if origin == 0 { return Poll::Ready(Ok(())); }
        let mut tokio_buf = buf.into();
        ready!(R::poll_read(self, cx, &mut tokio_buf))?;
        let filled = tokio_buf.filled().len();
        buf.set_position(filled);
        let left = buf.left();
        if left == 0 {
            Poll::Ready(Ok(()))
        } else if left == origin {
            Poll::Ready(Err(Error::new(ErrorKind::UnexpectedEof, "read 0 byte")))
        } else {
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

impl<R: AsyncRead + Unpin> AsyncVariableReader for R {
    #[inline]
    fn read_bool_error(future_name: &'static str, byte: u8) -> Self::Error {
        Error::new(ErrorKind::InvalidData, alloc::format!("Invalid bool. value {} at future {}.", byte, future_name))
    }

    #[cfg(feature = "async_bools")]
    #[inline]
    fn read_bools_error(future_name: &'static str, byte: u8) -> Self::Error {
        Error::new(ErrorKind::InvalidData, alloc::format!("Invalid bools. value {} at future {}.", byte, future_name))
    }

    #[cfg(feature = "async_varint")]
    #[inline]
    fn read_varint_error(future_name: &'static str, value: u128) -> Self::Error {
        Error::new(ErrorKind::InvalidData, alloc::format!("Too long varint value. {} at future {}.", value, future_name))
    }

    #[cfg(feature = "async_string")]
    #[inline]
    fn read_string_error(_future_name: &'static str, error: alloc::string::FromUtf8Error) -> Self::Error {
        Error::new(ErrorKind::InvalidData, error)
    }
}

impl<W: AsyncWrite + Unpin> AsyncVariableWritable for W {
    type Error = Error;

    fn poll_write_single(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut Option<u8>) -> Poll<Result<(), Self::Error>> {
        match buf.as_ref() {
            None => Poll::Ready(Ok(())),
            Some(b) => W::poll_write(self, cx, &[*b]).map_ok(|_i| { *buf = None; () })
        }
    }

    fn poll_write_more(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut WriteBuf<'_>) -> Poll<Result<(), Self::Error>> {
        while buf.left() > 0 {
            let position = buf.position();
            let n = core::task::ready!(W::poll_write(self.as_mut(), cx, &buf.buf()[position..]))?;
            if n == 0 {
                return Poll::Ready(Err(Error::new(ErrorKind::WriteZero, "write 0 bytes")));
            }
            buf.skip(n);
        }
        Poll::Ready(Ok(()))
    }
}

impl<W: AsyncWrite + Unpin> AsyncVariableWriter for W { }