lwleen-rpc 1.3.3

RPC (信令路由), 组件间数据通信
Documentation
//! ## 来自开源库 futures-timeout  `Apache-2.0 OR MIT`
//! - <https://crates.io/crates/futures-timeout>  0.1.3
//! - 代码 <https://docs.rs/futures-timeout/latest/src/futures_timeout/lib.rs.html>
//! 


use std::{ io, time::Duration,ops::{Deref, DerefMut},task::{ Context, Poll },pin::Pin,  };
use futures::{ future::{FusedFuture,FutureExt}, stream::{Stream,FusedStream}};
use pin_project_lite::pin_project;

use futures_timer::Delay;




pub trait Timeout限时特征: Sized {
    /// Requires a [`Future`] or [`Stream`] to complete before the specific duration has elapsed.
    ///
    /// **Note: If a [`Stream`] returns an item, the timer will reset until `Poll::Ready(None)` is returned**
    fn timeout(self, duration: Duration) -> Timeout<Self> {
        Timeout {
            inner: self,         //外来的异步函数
            timer: Some(Delay::new(duration)),
            duration,
        }
    }
}
  
impl<T: Sized,R> Timeout限时特征 for T where T: Future<Output = R>{}



pin_project!{
    #[derive(Debug)]
    pub struct Timeout<T> {
        #[pin]
        inner: T,                    //外来的异步函数
        timer: Option<Delay>,
        duration: Duration,
    }
}

impl<T: Future> Future for Timeout<T> {
type Output = io::Result<T::Output>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let this = self.project();      

    let Some(timer) = this.timer.as_mut() else {
        return Poll::Ready(Err(io::ErrorKind::TimedOut.into()));
    };

    match this.inner.poll(cx) {                       //启动  外来的异步函数
        Poll::Ready(value) => return Poll::Ready(Ok(value)),   
        Poll::Pending => {}
    }
    
    futures::ready!(timer.poll_unpin(cx));            //启动 定时器  只返回 pendding 情况    
    this.timer.take();                                //运行到这里,定时器超时了    
    Poll::Ready(Err(io::ErrorKind::TimedOut.into()))
}
}

impl<T: Future> FusedFuture for Timeout<T> {
fn is_terminated(&self) -> bool {
    self.timer.is_none()
}
}

impl<T: Stream> Stream for Timeout<T> {
type Item = io::Result<T::Item>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    let this = self.project();

    let Some(timer) = this.timer.as_mut() else {
        return Poll::Ready(None);
    };

    match this.inner.poll_next(cx) {
        Poll::Ready(Some(value)) => {
            timer.reset(*this.duration);
            return Poll::Ready(Some(Ok(value)));
        }
        Poll::Ready(None) => {
            this.timer.take();
            return Poll::Ready(None);
        }
        Poll::Pending => {}
    }

    futures::ready!(timer.poll_unpin(cx));
    this.timer.take();
    Poll::Ready(Some(Err(io::ErrorKind::TimedOut.into())))
}

fn size_hint(&self) -> (usize, Option<usize>) {
    self.inner.size_hint()
}
}

impl<T: Stream> FusedStream for Timeout<T> {
    fn is_terminated(&self) -> bool {
        self.timer.is_none()
    }
}
impl<T> Timeout<T> {
    /// Consumes Timeout and returns the inner value
    pub fn into_inner(self) -> T {
        self.inner
    }
}
impl<T> Deref for Timeout<T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}
impl<T> DerefMut for Timeout<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}