kayrx_timer/
throttle.rs

1//! Slow down a stream by enforcing a delay between items.
2
3use futures_core::Stream;
4use crate::{Delay, Duration, Instant};
5
6use std::future::Future;
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::task::{self, Poll};
10
11use pin_project_lite::pin_project;
12
13macro_rules! ready {
14    ($e:expr $(,)?) => {
15        match $e {
16            std::task::Poll::Ready(t) => t,
17            std::task::Poll::Pending => return std::task::Poll::Pending,
18        }
19    };
20}
21
22/// Slow down a stream by enforcing a delay between items.
23/// They will be produced not more often than the specified interval.
24///
25/// # Example
26///
27/// Create a throttled stream.
28/// ```rust,norun
29/// use std::time::Duration;
30/// use futures_util::stream::StreamExt;
31/// use kayrx_timer::throttle;
32///
33/// # async fn dox() {
34/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
35///
36/// loop {
37///     // The string will be produced at most every 2 seconds
38///     println!("{:?}", item_stream.next().await);
39/// }
40/// # }
41/// ```
42pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
43where
44    T: Stream,
45{
46    let delay = if duration == Duration::from_millis(0) {
47        None
48    } else {
49        Some(Delay::new_timeout(Instant::now() + duration, duration))
50    };
51
52    Throttle {
53        delay,
54        duration,
55        has_delayed: true,
56        stream,
57    }
58}
59
60pin_project! {
61    /// Stream for the [`throttle`](throttle) function.
62    #[derive(Debug)]
63    #[must_use = "streams do nothing unless polled"]
64    pub struct Throttle<T> {
65        // `None` when duration is zero.
66        delay: Option<Delay>,
67        duration: Duration,
68
69        // Set to true when `delay` has returned ready, but `stream` hasn't.
70        has_delayed: bool,
71
72        // The stream to throttle
73        #[pin]
74        stream: T,
75    }
76}
77
78// XXX: are these safe if `T: !Unpin`?
79impl<T: Unpin> Throttle<T> {
80    /// Acquires a reference to the underlying stream that this combinator is
81    /// pulling from.
82    pub fn get_ref(&self) -> &T {
83        &self.stream
84    }
85
86    /// Acquires a mutable reference to the underlying stream that this combinator
87    /// is pulling from.
88    ///
89    /// Note that care must be taken to avoid tampering with the state of the stream
90    /// which may otherwise confuse this combinator.
91    pub fn get_mut(&mut self) -> &mut T {
92        &mut self.stream
93    }
94
95    /// Consumes this combinator, returning the underlying stream.
96    ///
97    /// Note that this may discard intermediate state of this combinator, so care
98    /// should be taken to avoid losing resources when this is called.
99    pub fn into_inner(self) -> T {
100        self.stream
101    }
102}
103
104impl<T: Stream> Stream for Throttle<T> {
105    type Item = T::Item;
106
107    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
108        if !self.has_delayed && self.delay.is_some() {
109            ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx));
110            *self.as_mut().project().has_delayed = true;
111        }
112
113        let value = ready!(self.as_mut().project().stream.poll_next(cx));
114
115        if value.is_some() {
116            let dur = self.duration;
117            if let Some(ref mut delay) = self.as_mut().project().delay {
118                delay.reset(Instant::now() + dur);
119            }
120
121            *self.as_mut().project().has_delayed = false;
122        }
123
124        Poll::Ready(value)
125    }
126}