Skip to main content

async_std/stream/stream/
throttle.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use pin_project_lite::pin_project;
6
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9use crate::utils::{timer_after, Timer};
10
11pin_project! {
12    /// A stream that only yields one element once every `duration`.
13    ///
14    /// This `struct` is created by the [`throttle`] method on [`Stream`]. See its
15    /// documentation for more.
16    ///
17    /// [`throttle`]: trait.Stream.html#method.throttle
18    /// [`Stream`]: trait.Stream.html
19    #[doc(hidden)]
20    #[allow(missing_debug_implementations)]
21    pub struct Throttle<S> {
22        #[pin]
23        stream: S,
24        duration: Duration,
25        #[pin]
26        blocked: bool,
27        #[pin]
28        delay: Timer,
29    }
30}
31
32impl<S: Stream> Throttle<S> {
33    pub(super) fn new(stream: S, duration: Duration) -> Self {
34        Self {
35            stream,
36            duration,
37            blocked: false,
38            delay: timer_after(Duration::default()),
39        }
40    }
41}
42
43impl<S: Stream> Stream for Throttle<S> {
44    type Item = S::Item;
45
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
47        let mut this = self.project();
48        if *this.blocked {
49            let d = this.delay.as_mut();
50            if d.poll(cx).is_ready() {
51                *this.blocked = false;
52            } else {
53                return Poll::Pending;
54            }
55        }
56
57        match this.stream.poll_next(cx) {
58            Poll::Pending => Poll::Pending,
59            Poll::Ready(None) => Poll::Ready(None),
60            Poll::Ready(Some(v)) => {
61                *this.blocked = true;
62                let _ = std::mem::replace(&mut *this.delay, timer_after(*this.duration));
63                Poll::Ready(Some(v))
64            }
65        }
66    }
67}