async_shared_timeout/wrapper/
tokio_read_write.rs1use crate::runtime::Runtime;
2use core::{
3 pin::Pin,
4 task::{Context, Poll},
5};
6use std::io;
7use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
8
9use super::Wrapper;
10
11#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
12impl<R: Runtime, T: AsyncRead> AsyncRead for Wrapper<'_, R, T> {
13 fn poll_read(
14 self: Pin<&mut Self>,
15 cx: &mut Context<'_>,
16 buf: &mut tokio::io::ReadBuf<'_>,
17 ) -> Poll<io::Result<()>> {
18 let pinned = self.project();
19 match pinned.inner.poll_read(cx, buf) {
20 Poll::Ready(Ok(())) if !buf.filled().is_empty() => {
21 pinned.timeout.as_ref().reset();
22 Poll::Ready(Ok(()))
23 }
24 x => x,
25 }
26 }
27}
28
29#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
30impl<R: Runtime, T: AsyncWrite> AsyncWrite for Wrapper<'_, R, T> {
31 fn is_write_vectored(&self) -> bool {
32 self.inner.is_write_vectored()
33 }
34 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
35 self.project().inner.poll_flush(cx)
36 }
37 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
38 self.project().inner.poll_shutdown(cx)
39 }
40 fn poll_write(
41 self: Pin<&mut Self>,
42 cx: &mut Context<'_>,
43 buf: &[u8],
44 ) -> Poll<Result<usize, io::Error>> {
45 let pinned = self.project();
46 match pinned.inner.poll_write(cx, buf) {
47 Poll::Ready(Ok(written)) if written > 0 => {
48 pinned.timeout.as_ref().reset();
49 Poll::Ready(Ok(written))
50 }
51 x => x,
52 }
53 }
54 fn poll_write_vectored(
55 self: Pin<&mut Self>,
56 cx: &mut Context<'_>,
57 bufs: &[io::IoSlice<'_>],
58 ) -> Poll<Result<usize, io::Error>> {
59 let pinned = self.project();
60 match pinned.inner.poll_write_vectored(cx, bufs) {
61 Poll::Ready(Ok(written)) if written > 0 => {
62 pinned.timeout.as_ref().reset();
63 Poll::Ready(Ok(written))
64 }
65 x => x,
66 }
67 }
68}
69
70#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
71impl<R: Runtime, T: AsyncBufRead> AsyncBufRead for Wrapper<'_, R, T> {
72 fn consume(self: Pin<&mut Self>, amt: usize) {
73 self.project().inner.consume(amt);
74 }
75 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
76 let pinned = self.project();
77 match pinned.inner.poll_fill_buf(cx) {
78 Poll::Ready(Ok(bytes)) => {
79 if !bytes.is_empty() {
80 pinned.timeout.as_ref().reset();
81 }
82 Poll::Ready(Ok(bytes))
83 }
84 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
85 Poll::Pending => Poll::Pending,
86 }
87 }
88}
89
90#[cfg_attr(docsrs, doc(cfg(all(feature = "tokio", feature = "read-write"))))]
91impl<R: Runtime, T: AsyncSeek> AsyncSeek for Wrapper<'_, R, T> {
92 fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> {
93 self.project().inner.start_seek(position)
94 }
95 fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
96 let pinned = self.project();
97 match pinned.inner.poll_complete(cx) {
98 Poll::Ready(Ok(pos)) => {
99 pinned.timeout.as_ref().reset();
100 Poll::Ready(Ok(pos))
101 }
102 x => x,
103 }
104 }
105}