async_std/stream/stream/
throttle.rs1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use pin_project_lite::pin_project;
6
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9use crate::utils::{timer_after, Timer};
10
11pin_project! {
12 #[doc(hidden)]
20 #[allow(missing_debug_implementations)]
21 pub struct Throttle<S> {
22 #[pin]
23 stream: S,
24 duration: Duration,
25 #[pin]
26 blocked: bool,
27 #[pin]
28 delay: Timer,
29 }
30}
31
32impl<S: Stream> Throttle<S> {
33 pub(super) fn new(stream: S, duration: Duration) -> Self {
34 Self {
35 stream,
36 duration,
37 blocked: false,
38 delay: timer_after(Duration::default()),
39 }
40 }
41}
42
43impl<S: Stream> Stream for Throttle<S> {
44 type Item = S::Item;
45
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
47 let mut this = self.project();
48 if *this.blocked {
49 let d = this.delay.as_mut();
50 if d.poll(cx).is_ready() {
51 *this.blocked = false;
52 } else {
53 return Poll::Pending;
54 }
55 }
56
57 match this.stream.poll_next(cx) {
58 Poll::Pending => Poll::Pending,
59 Poll::Ready(None) => Poll::Ready(None),
60 Poll::Ready(Some(v)) => {
61 *this.blocked = true;
62 let _ = std::mem::replace(&mut *this.delay, timer_after(*this.duration));
63 Poll::Ready(Some(v))
64 }
65 }
66 }
67}