async_shared_timeout/wrapper/
futures_io_read_write.rs

1use crate::runtime::Runtime;
2use core::{
3    pin::Pin,
4    task::{Context, Poll},
5};
6use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
7
8use super::Wrapper;
9
10#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-io", feature = "read-write"))))]
11impl<R: Runtime, T: AsyncRead> AsyncRead for Wrapper<'_, R, T> {
12    fn poll_read(
13        self: Pin<&mut Self>,
14        cx: &mut Context<'_>,
15        buf: &mut [u8],
16    ) -> Poll<futures_io::Result<usize>> {
17        let pinned = self.project();
18        match pinned.inner.poll_read(cx, buf) {
19            Poll::Ready(Ok(read)) if read > 0 => {
20                pinned.timeout.as_ref().reset();
21                Poll::Ready(Ok(read))
22            }
23            x => x,
24        }
25    }
26}
27
28#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-io", feature = "read-write"))))]
29impl<R: Runtime, T: AsyncWrite> AsyncWrite for Wrapper<'_, R, T> {
30    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
31        self.project().inner.poll_flush(cx)
32    }
33    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
34        self.project().inner.poll_close(cx)
35    }
36    fn poll_write(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39        buf: &[u8],
40    ) -> Poll<futures_io::Result<usize>> {
41        let pinned = self.project();
42        match pinned.inner.poll_write(cx, buf) {
43            Poll::Ready(Ok(written)) if written > 0 => {
44                pinned.timeout.as_ref().reset();
45                Poll::Ready(Ok(written))
46            }
47            x => x,
48        }
49    }
50    fn poll_write_vectored(
51        self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53        bufs: &[futures_io::IoSlice<'_>],
54    ) -> Poll<futures_io::Result<usize>> {
55        let pinned = self.project();
56        match pinned.inner.poll_write_vectored(cx, bufs) {
57            Poll::Ready(Ok(written)) if written > 0 => {
58                pinned.timeout.as_ref().reset();
59                Poll::Ready(Ok(written))
60            }
61            x => x,
62        }
63    }
64}
65
66#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-io", feature = "read-write"))))]
67impl<R: Runtime, T: AsyncBufRead> AsyncBufRead for Wrapper<'_, R, T> {
68    fn consume(self: Pin<&mut Self>, amt: usize) {
69        self.project().inner.consume(amt);
70    }
71    fn poll_fill_buf(
72        self: Pin<&mut Self>,
73        cx: &mut Context<'_>,
74    ) -> Poll<futures_io::Result<&[u8]>> {
75        let pinned = self.project();
76        match pinned.inner.poll_fill_buf(cx) {
77            Poll::Ready(Ok(bytes)) => {
78                if !bytes.is_empty() {
79                    pinned.timeout.as_ref().reset();
80                }
81                Poll::Ready(Ok(bytes))
82            }
83            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
84            Poll::Pending => Poll::Pending,
85        }
86    }
87}
88
89#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-io", feature = "read-write"))))]
90impl<R: Runtime, T: AsyncSeek> AsyncSeek for Wrapper<'_, R, T> {
91    fn poll_seek(
92        self: Pin<&mut Self>,
93        cx: &mut Context<'_>,
94        pos: futures_io::SeekFrom,
95    ) -> Poll<futures_io::Result<u64>> {
96        let pinned = self.project();
97        match pinned.inner.poll_seek(cx, pos) {
98            Poll::Ready(Ok(pos)) => {
99                pinned.timeout.as_ref().reset();
100                Poll::Ready(Ok(pos))
101            }
102            x => x,
103        }
104    }
105}