futures_concurrency/stream/
wait_until.rs

1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_core::stream::Stream;
6use pin_project::pin_project;
7
8/// Delay execution of a stream once for the specified duration.
9///
10/// This `struct` is created by the [`wait_until`] method on [`StreamExt`]. See its
11/// documentation for more.
12///
13/// [`wait_until`]: crate::stream::StreamExt::wait_until
14/// [`StreamExt`]: crate::stream::StreamExt
15#[derive(Debug)]
16#[must_use = "streams do nothing unless polled or .awaited"]
17#[pin_project]
18pub struct WaitUntil<S, D> {
19    #[pin]
20    stream: S,
21    #[pin]
22    deadline: D,
23    state: State,
24}
25
26#[derive(Debug)]
27enum State {
28    Timer,
29    Streaming,
30}
31
32impl<S, D> WaitUntil<S, D> {
33    pub(crate) fn new(stream: S, deadline: D) -> Self {
34        WaitUntil {
35            stream,
36            deadline,
37            state: State::Timer,
38        }
39    }
40}
41
42impl<S, D> Stream for WaitUntil<S, D>
43where
44    S: Stream,
45    D: Future,
46{
47    type Item = S::Item;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let this = self.project();
51
52        match this.state {
53            State::Timer => match this.deadline.poll(cx) {
54                Poll::Pending => Poll::Pending,
55                Poll::Ready(_) => {
56                    *this.state = State::Streaming;
57                    this.stream.poll_next(cx)
58                }
59            },
60            State::Streaming => this.stream.poll_next(cx),
61        }
62    }
63}