stop_token/
stream.rs

1//! Extension methods and types for the `Stream` trait.
2
3use crate::{deadline::TimedOutError, Deadline};
4use core::future::Future;
5use core::pin::Pin;
6
7use futures_core::Stream;
8use pin_project_lite::pin_project;
9use std::task::{Context, Poll};
10
11/// Extend the `Stream` trait with the `until` method.
12pub trait StreamExt: Stream {
13    /// Applies the token to the `stream`, such that the resulting stream
14    /// produces no more items once the token becomes cancelled.
15    fn timeout_at<T>(self, target: T) -> TimeoutAt<Self>
16    where
17        Self: Sized,
18        T: Into<Deadline>,
19    {
20        TimeoutAt {
21            stream: self,
22            deadline: target.into(),
23        }
24    }
25}
26
27impl<S: Stream> StreamExt for S {}
28
29pin_project! {
30    /// Run a future until it resolves, or until a deadline is hit.
31    ///
32    /// This method is returned by [`FutureExt::deadline`].
33    #[must_use = "Futures do nothing unless polled or .awaited"]
34    #[derive(Debug)]
35    pub struct TimeoutAt<S> {
36        #[pin]
37        stream: S,
38        #[pin]
39        deadline: Deadline,
40    }
41}
42
43impl<S> TimeoutAt<S> {
44    /// Unwraps this `Stop` stream, returning the underlying stream.
45    pub fn into_inner(self) -> S {
46        self.stream
47    }
48}
49
50impl<S> Stream for TimeoutAt<S>
51where
52    S: Stream,
53{
54    type Item = Result<S::Item, TimedOutError>;
55
56    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        let this = self.project();
58        if let Poll::Ready(()) = this.deadline.poll(cx) {
59            return Poll::Ready(Some(Err(TimedOutError::new())));
60        }
61        this.stream.poll_next(cx).map(|el| el.map(|el| Ok(el)))
62    }
63}