use crate::io::util::timeout::get_deadline;
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomPinned;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::time::Instant;
pub(crate) fn read_exact_timeout<'a, A>(
reader: &'a mut A,
buf: &'a mut [u8],
timeout: Duration,
) -> ReadExactTimeout<'a, A>
where
A: AsyncRead + Unpin + ?Sized,
{
let deadline = get_deadline(timeout);
ReadExactTimeout {
reader,
buf: ReadBuf::new(buf),
deadline,
_pin: PhantomPinned,
}
}
pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadExactTimeout<'a, A: ?Sized> {
reader: &'a mut A,
buf: ReadBuf<'a>,
deadline: Instant,
#[pin]
_pin: PhantomPinned,
}
}
fn eof() -> io::Error {
io::Error::new(ErrorKind::UnexpectedEof, "early eof")
}
impl<A> Future for ReadExactTimeout<'_, A>
where
A: AsyncRead + Unpin + ?Sized,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = self.project();
loop {
if *me.deadline < Instant::now() {
return Poll::Ready(Ok(me.buf.capacity()));
}
let rem = me.buf.remaining();
if rem != 0 {
match ready!(Pin::new(&mut *me.reader).poll_read(cx, me.buf)) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::TimedOut => {
return Poll::Ready(Ok(me.buf.capacity()));
}
Err(e) => {
return Poll::Ready(Err(e.into()));
}
};
if me.buf.remaining() == rem {
return Err(eof()).into();
}
} else {
return Poll::Ready(Ok(me.buf.capacity()));
}
}
}
}