1use crate::{deadline::TimedOutError, Deadline};
4use core::future::Future;
5use core::pin::Pin;
6
7use futures_core::Stream;
8use pin_project_lite::pin_project;
9use std::task::{Context, Poll};
10
11pub trait StreamExt: Stream {
13 fn timeout_at<T>(self, target: T) -> TimeoutAt<Self>
16 where
17 Self: Sized,
18 T: Into<Deadline>,
19 {
20 TimeoutAt {
21 stream: self,
22 deadline: target.into(),
23 }
24 }
25}
26
27impl<S: Stream> StreamExt for S {}
28
29pin_project! {
30 #[must_use = "Futures do nothing unless polled or .awaited"]
34 #[derive(Debug)]
35 pub struct TimeoutAt<S> {
36 #[pin]
37 stream: S,
38 #[pin]
39 deadline: Deadline,
40 }
41}
42
43impl<S> TimeoutAt<S> {
44 pub fn into_inner(self) -> S {
46 self.stream
47 }
48}
49
50impl<S> Stream for TimeoutAt<S>
51where
52 S: Stream,
53{
54 type Item = Result<S::Item, TimedOutError>;
55
56 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 let this = self.project();
58 if let Poll::Ready(()) = this.deadline.poll(cx) {
59 return Poll::Ready(Some(Err(TimedOutError::new())));
60 }
61 this.stream.poll_next(cx).map(|el| el.map(|el| Ok(el)))
62 }
63}