1use core::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::Duration,
6};
7use std::io::{Error as IoError, ErrorKind as IoErrorKind};
8
9use futures_util::io::{AsyncRead, AsyncWrite};
10
11use crate::{Sleepble, SleepbleWaitBoxFuture};
12
13pub trait AsyncReadWithTimeoutExt: AsyncRead {
17 fn read_with_timeout<'a, SLEEP: Sleepble>(
19 &'a mut self,
20 buf: &'a mut [u8],
21 dur: Duration,
22 ) -> ReadWithTimeout<'a, Self>
23 where
24 Self: Unpin,
25 {
26 ReadWithTimeout::new::<SLEEP>(self, buf, dur)
27 }
28}
29
30impl<R: AsyncRead + ?Sized> AsyncReadWithTimeoutExt for R {}
32
33pub struct ReadWithTimeout<'a, R: ?Sized> {
35 reader: &'a mut R,
36 buf: &'a mut [u8],
37 pub dur: Duration,
38 sleepble_wait_box_future: SleepbleWaitBoxFuture,
39}
40
41impl<R: ?Sized + core::fmt::Debug> core::fmt::Debug for ReadWithTimeout<'_, R> {
42 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
43 f.debug_struct("ReadWithTimeout")
44 .field("reader", &self.reader)
45 .field("buf", &self.buf)
46 .field("dur", &self.dur)
47 .finish()
48 }
49}
50
51impl<R: ?Sized + Unpin> Unpin for ReadWithTimeout<'_, R> {}
52
53impl<'a, R: AsyncRead + ?Sized + Unpin> ReadWithTimeout<'a, R> {
54 pub fn new<SLEEP: Sleepble>(reader: &'a mut R, buf: &'a mut [u8], dur: Duration) -> Self {
55 Self {
56 reader,
57 buf,
58 dur,
59 sleepble_wait_box_future: SLEEP::sleep(dur).wait(),
60 }
61 }
62}
63
64impl<R: AsyncRead + ?Sized + Unpin> Future for ReadWithTimeout<'_, R> {
65 type Output = Result<usize, IoError>;
66
67 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68 let this = &mut *self;
69
70 async_read_poll(
71 &mut this.reader,
72 this.buf,
73 &mut this.sleepble_wait_box_future,
74 cx,
75 )
76 }
77}
78
79pub trait AsyncWriteWithTimeoutExt: AsyncWrite {
83 fn write_with_timeout<'a, SLEEP: Sleepble>(
85 &'a mut self,
86 buf: &'a [u8],
87 dur: Duration,
88 ) -> WriteWithTimeout<'a, Self>
89 where
90 Self: Unpin,
91 {
92 WriteWithTimeout::new::<SLEEP>(self, buf, dur)
93 }
94}
95
96impl<W: AsyncWrite + ?Sized> AsyncWriteWithTimeoutExt for W {}
98
99pub struct WriteWithTimeout<'a, W: ?Sized> {
101 writer: &'a mut W,
102 buf: &'a [u8],
103 pub dur: Duration,
104 sleepble_wait_box_future: SleepbleWaitBoxFuture,
105}
106
107impl<W: ?Sized + core::fmt::Debug> core::fmt::Debug for WriteWithTimeout<'_, W> {
108 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
109 f.debug_struct("WriteWithTimeout")
110 .field("writer", &self.writer)
111 .field("buf", &self.buf)
112 .field("dur", &self.dur)
113 .finish()
114 }
115}
116
117impl<W: ?Sized + Unpin> Unpin for WriteWithTimeout<'_, W> {}
118
119impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteWithTimeout<'a, W> {
120 pub fn new<SLEEP: Sleepble>(writer: &'a mut W, buf: &'a [u8], dur: Duration) -> Self {
121 Self {
122 writer,
123 buf,
124 dur,
125 sleepble_wait_box_future: SLEEP::sleep(dur).wait(),
126 }
127 }
128}
129
130impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteWithTimeout<'_, W> {
131 type Output = Result<usize, IoError>;
132
133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134 let this = &mut *self;
135
136 async_write_poll(
137 &mut this.writer,
138 this.buf,
139 &mut this.sleepble_wait_box_future,
140 cx,
141 )
142 }
143}
144
145pub fn async_read_poll<R: AsyncRead + ?Sized + Unpin>(
149 reader: &mut R,
150 buf: &mut [u8],
151 sleepble_wait_box_future: &mut SleepbleWaitBoxFuture,
152 cx: &mut Context<'_>,
153) -> Poll<Result<usize, IoError>> {
154 let poll_ret = Pin::new(reader).poll_read(cx, buf);
155
156 match poll_ret {
157 Poll::Ready(ret) => Poll::Ready(ret),
158 Poll::Pending => match sleepble_wait_box_future.as_mut().poll(cx) {
159 Poll::Ready(_) => Poll::Ready(Err(IoError::new(IoErrorKind::TimedOut, "read timeout"))),
160 Poll::Pending => Poll::Pending,
161 },
162 }
163}
164
165pub fn async_write_poll<W: AsyncWrite + ?Sized + Unpin>(
166 writer: &mut W,
167 buf: &[u8],
168 sleepble_wait_box_future: &mut SleepbleWaitBoxFuture,
169 cx: &mut Context<'_>,
170) -> Poll<Result<usize, IoError>> {
171 let poll_ret = Pin::new(writer).poll_write(cx, buf);
172
173 match poll_ret {
174 Poll::Ready(ret) => Poll::Ready(ret),
175 Poll::Pending => match sleepble_wait_box_future.as_mut().poll(cx) {
176 Poll::Ready(_) => {
177 Poll::Ready(Err(IoError::new(IoErrorKind::TimedOut, "write timeout")))
178 }
179 Poll::Pending => Poll::Pending,
180 },
181 }
182}