futures_concurrency/stream/
wait_until.rs1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4
5use futures_core::stream::Stream;
6use pin_project::pin_project;
7
8#[derive(Debug)]
16#[must_use = "streams do nothing unless polled or .awaited"]
17#[pin_project]
18pub struct WaitUntil<S, D> {
19 #[pin]
20 stream: S,
21 #[pin]
22 deadline: D,
23 state: State,
24}
25
26#[derive(Debug)]
27enum State {
28 Timer,
29 Streaming,
30}
31
32impl<S, D> WaitUntil<S, D> {
33 pub(crate) fn new(stream: S, deadline: D) -> Self {
34 WaitUntil {
35 stream,
36 deadline,
37 state: State::Timer,
38 }
39 }
40}
41
42impl<S, D> Stream for WaitUntil<S, D>
43where
44 S: Stream,
45 D: Future,
46{
47 type Item = S::Item;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let this = self.project();
51
52 match this.state {
53 State::Timer => match this.deadline.poll(cx) {
54 Poll::Pending => Poll::Pending,
55 Poll::Ready(_) => {
56 *this.state = State::Streaming;
57 this.stream.poll_next(cx)
58 }
59 },
60 State::Streaming => this.stream.poll_next(cx),
61 }
62 }
63}