async_sleep/
rw.rs

1use core::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Duration,
6};
7use std::io::{Error as IoError, ErrorKind as IoErrorKind};
8
9use futures_util::io::{AsyncRead, AsyncWrite};
10
11use crate::{Sleepble, SleepbleWaitBoxFuture};
12
13//
14//
15//
16pub trait AsyncReadWithTimeoutExt: AsyncRead {
17    // ref https://github.com/rust-lang/futures-rs/blob/0.3.25/futures-util/src/io/mod.rs#L204
18    fn read_with_timeout<'a, SLEEP: Sleepble>(
19        &'a mut self,
20        buf: &'a mut [u8],
21        dur: Duration,
22    ) -> ReadWithTimeout<'a, Self>
23    where
24        Self: Unpin,
25    {
26        ReadWithTimeout::new::<SLEEP>(self, buf, dur)
27    }
28}
29
30// ref https://github.com/rust-lang/futures-rs/blob/0.3.25/futures-util/src/io/mod.rs#L398
31impl<R: AsyncRead + ?Sized> AsyncReadWithTimeoutExt for R {}
32
33// ref https://github.com/rust-lang/futures-rs/blob/0.3.25/futures-util/src/io/read.rs
34pub struct ReadWithTimeout<'a, R: ?Sized> {
35    reader: &'a mut R,
36    buf: &'a mut [u8],
37    pub dur: Duration,
38    sleepble_wait_box_future: SleepbleWaitBoxFuture,
39}
40
41impl<R: ?Sized + core::fmt::Debug> core::fmt::Debug for ReadWithTimeout<'_, R> {
42    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
43        f.debug_struct("ReadWithTimeout")
44            .field("reader", &self.reader)
45            .field("buf", &self.buf)
46            .field("dur", &self.dur)
47            .finish()
48    }
49}
50
51impl<R: ?Sized + Unpin> Unpin for ReadWithTimeout<'_, R> {}
52
53impl<'a, R: AsyncRead + ?Sized + Unpin> ReadWithTimeout<'a, R> {
54    pub fn new<SLEEP: Sleepble>(reader: &'a mut R, buf: &'a mut [u8], dur: Duration) -> Self {
55        Self {
56            reader,
57            buf,
58            dur,
59            sleepble_wait_box_future: SLEEP::sleep(dur).wait(),
60        }
61    }
62}
63
64impl<R: AsyncRead + ?Sized + Unpin> Future for ReadWithTimeout<'_, R> {
65    type Output = Result<usize, IoError>;
66
67    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68        let this = &mut *self;
69
70        async_read_poll(
71            &mut this.reader,
72            this.buf,
73            &mut this.sleepble_wait_box_future,
74            cx,
75        )
76    }
77}
78
79//
80//
81//
82pub trait AsyncWriteWithTimeoutExt: AsyncWrite {
83    // ref https://github.com/rust-lang/futures-rs/blob/0.3.25/futures-util/src/io/mod.rs#L443
84    fn write_with_timeout<'a, SLEEP: Sleepble>(
85        &'a mut self,
86        buf: &'a [u8],
87        dur: Duration,
88    ) -> WriteWithTimeout<'a, Self>
89    where
90        Self: Unpin,
91    {
92        WriteWithTimeout::new::<SLEEP>(self, buf, dur)
93    }
94}
95
96// ref https://github.com/rust-lang/futures-rs/blob/0.3.25/futures-util/src/io/mod.rs#L592
97impl<W: AsyncWrite + ?Sized> AsyncWriteWithTimeoutExt for W {}
98
99// ref https://github.com/rust-lang/futures-rs/blob/0.3.25/futures-util/src/io/write.rs
100pub struct WriteWithTimeout<'a, W: ?Sized> {
101    writer: &'a mut W,
102    buf: &'a [u8],
103    pub dur: Duration,
104    sleepble_wait_box_future: SleepbleWaitBoxFuture,
105}
106
107impl<W: ?Sized + core::fmt::Debug> core::fmt::Debug for WriteWithTimeout<'_, W> {
108    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
109        f.debug_struct("WriteWithTimeout")
110            .field("writer", &self.writer)
111            .field("buf", &self.buf)
112            .field("dur", &self.dur)
113            .finish()
114    }
115}
116
117impl<W: ?Sized + Unpin> Unpin for WriteWithTimeout<'_, W> {}
118
119impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteWithTimeout<'a, W> {
120    pub fn new<SLEEP: Sleepble>(writer: &'a mut W, buf: &'a [u8], dur: Duration) -> Self {
121        Self {
122            writer,
123            buf,
124            dur,
125            sleepble_wait_box_future: SLEEP::sleep(dur).wait(),
126        }
127    }
128}
129
130impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteWithTimeout<'_, W> {
131    type Output = Result<usize, IoError>;
132
133    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134        let this = &mut *self;
135
136        async_write_poll(
137            &mut this.writer,
138            this.buf,
139            &mut this.sleepble_wait_box_future,
140            cx,
141        )
142    }
143}
144
145//
146//
147//
148pub fn async_read_poll<R: AsyncRead + ?Sized + Unpin>(
149    reader: &mut R,
150    buf: &mut [u8],
151    sleepble_wait_box_future: &mut SleepbleWaitBoxFuture,
152    cx: &mut Context<'_>,
153) -> Poll<Result<usize, IoError>> {
154    let poll_ret = Pin::new(reader).poll_read(cx, buf);
155
156    match poll_ret {
157        Poll::Ready(ret) => Poll::Ready(ret),
158        Poll::Pending => match sleepble_wait_box_future.as_mut().poll(cx) {
159            Poll::Ready(_) => Poll::Ready(Err(IoError::new(IoErrorKind::TimedOut, "read timeout"))),
160            Poll::Pending => Poll::Pending,
161        },
162    }
163}
164
165pub fn async_write_poll<W: AsyncWrite + ?Sized + Unpin>(
166    writer: &mut W,
167    buf: &[u8],
168    sleepble_wait_box_future: &mut SleepbleWaitBoxFuture,
169    cx: &mut Context<'_>,
170) -> Poll<Result<usize, IoError>> {
171    let poll_ret = Pin::new(writer).poll_write(cx, buf);
172
173    match poll_ret {
174        Poll::Ready(ret) => Poll::Ready(ret),
175        Poll::Pending => match sleepble_wait_box_future.as_mut().poll(cx) {
176            Poll::Ready(_) => {
177                Poll::Ready(Err(IoError::new(IoErrorKind::TimedOut, "write timeout")))
178            }
179            Poll::Pending => Poll::Pending,
180        },
181    }
182}