async_shared_timeout/wrapper/
tokio_read_write.rs

1use crate::runtime::Runtime;
2use core::{
3    pin::Pin,
4    task::{Context, Poll},
5};
6use std::io;
7use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
8
9use super::Wrapper;
10
11#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
12impl<R: Runtime, T: AsyncRead> AsyncRead for Wrapper<'_, R, T> {
13    fn poll_read(
14        self: Pin<&mut Self>,
15        cx: &mut Context<'_>,
16        buf: &mut tokio::io::ReadBuf<'_>,
17    ) -> Poll<io::Result<()>> {
18        let pinned = self.project();
19        match pinned.inner.poll_read(cx, buf) {
20            Poll::Ready(Ok(())) if !buf.filled().is_empty() => {
21                pinned.timeout.as_ref().reset();
22                Poll::Ready(Ok(()))
23            }
24            x => x,
25        }
26    }
27}
28
29#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
30impl<R: Runtime, T: AsyncWrite> AsyncWrite for Wrapper<'_, R, T> {
31    fn is_write_vectored(&self) -> bool {
32        self.inner.is_write_vectored()
33    }
34    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
35        self.project().inner.poll_flush(cx)
36    }
37    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
38        self.project().inner.poll_shutdown(cx)
39    }
40    fn poll_write(
41        self: Pin<&mut Self>,
42        cx: &mut Context<'_>,
43        buf: &[u8],
44    ) -> Poll<Result<usize, io::Error>> {
45        let pinned = self.project();
46        match pinned.inner.poll_write(cx, buf) {
47            Poll::Ready(Ok(written)) if written > 0 => {
48                pinned.timeout.as_ref().reset();
49                Poll::Ready(Ok(written))
50            }
51            x => x,
52        }
53    }
54    fn poll_write_vectored(
55        self: Pin<&mut Self>,
56        cx: &mut Context<'_>,
57        bufs: &[io::IoSlice<'_>],
58    ) -> Poll<Result<usize, io::Error>> {
59        let pinned = self.project();
60        match pinned.inner.poll_write_vectored(cx, bufs) {
61            Poll::Ready(Ok(written)) if written > 0 => {
62                pinned.timeout.as_ref().reset();
63                Poll::Ready(Ok(written))
64            }
65            x => x,
66        }
67    }
68}
69
70#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
71impl<R: Runtime, T: AsyncBufRead> AsyncBufRead for Wrapper<'_, R, T> {
72    fn consume(self: Pin<&mut Self>, amt: usize) {
73        self.project().inner.consume(amt);
74    }
75    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
76        let pinned = self.project();
77        match pinned.inner.poll_fill_buf(cx) {
78            Poll::Ready(Ok(bytes)) => {
79                if !bytes.is_empty() {
80                    pinned.timeout.as_ref().reset();
81                }
82                Poll::Ready(Ok(bytes))
83            }
84            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
85            Poll::Pending => Poll::Pending,
86        }
87    }
88}
89
90#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
91impl<R: Runtime, T: AsyncSeek> AsyncSeek for Wrapper<'_, R, T> {
92    fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> {
93        self.project().inner.start_seek(position)
94    }
95    fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
96        let pinned = self.project();
97        match pinned.inner.poll_complete(cx) {
98            Poll::Ready(Ok(pos)) => {
99                pinned.timeout.as_ref().reset();
100                Poll::Ready(Ok(pos))
101            }
102            x => x,
103        }
104    }
105}