use std::{ io, time::Duration,ops::{Deref, DerefMut},task::{ Context, Poll },pin::Pin, };
use futures::{ future::{FusedFuture,FutureExt}, stream::{Stream,FusedStream}};
use pin_project_lite::pin_project;
use futures_timer::Delay;
pub trait Timeout限时特征: Sized {
fn timeout(self, duration: Duration) -> Timeout<Self> {
Timeout {
inner: self, timer: Some(Delay::new(duration)),
duration,
}
}
}
impl<T: Sized,R> Timeout限时特征 for T where T: Future<Output = R>{}
pin_project!{
#[derive(Debug)]
pub struct Timeout<T> {
#[pin]
inner: T, timer: Option<Delay>,
duration: Duration,
}
}
impl<T: Future> Future for Timeout<T> {
type Output = io::Result<T::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let Some(timer) = this.timer.as_mut() else {
return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
};
match this.inner.poll(cx) { Poll::Ready(value) => return Poll::Ready(Ok(value)),
Poll::Pending => {}
}
futures::ready!(timer.poll_unpin(cx)); this.timer.take(); Poll::Ready(Err(io::ErrorKind::TimedOut.into()))
}
}
impl<T: Future> FusedFuture for Timeout<T> {
fn is_terminated(&self) -> bool {
self.timer.is_none()
}
}
impl<T: Stream> Stream for Timeout<T> {
type Item = io::Result<T::Item>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let Some(timer) = this.timer.as_mut() else {
return Poll::Ready(None);
};
match this.inner.poll_next(cx) {
Poll::Ready(Some(value)) => {
timer.reset(*this.duration);
return Poll::Ready(Some(Ok(value)));
}
Poll::Ready(None) => {
this.timer.take();
return Poll::Ready(None);
}
Poll::Pending => {}
}
futures::ready!(timer.poll_unpin(cx));
this.timer.take();
Poll::Ready(Some(Err(io::ErrorKind::TimedOut.into())))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<T: Stream> FusedStream for Timeout<T> {
fn is_terminated(&self) -> bool {
self.timer.is_none()
}
}
impl<T> Timeout<T> {
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T> Deref for Timeout<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for Timeout<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}