kayrx_timer/throttle.rs
1//! Slow down a stream by enforcing a delay between items.
2
3use futures_core::Stream;
4use crate::{Delay, Duration, Instant};
5
6use std::future::Future;
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::task::{self, Poll};
10
11use pin_project_lite::pin_project;
12
13macro_rules! ready {
14 ($e:expr $(,)?) => {
15 match $e {
16 std::task::Poll::Ready(t) => t,
17 std::task::Poll::Pending => return std::task::Poll::Pending,
18 }
19 };
20}
21
22/// Slow down a stream by enforcing a delay between items.
23/// They will be produced not more often than the specified interval.
24///
25/// # Example
26///
27/// Create a throttled stream.
28/// ```rust,norun
29/// use std::time::Duration;
30/// use futures_util::stream::StreamExt;
31/// use kayrx_timer::throttle;
32///
33/// # async fn dox() {
34/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
35///
36/// loop {
37/// // The string will be produced at most every 2 seconds
38/// println!("{:?}", item_stream.next().await);
39/// }
40/// # }
41/// ```
42pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
43where
44 T: Stream,
45{
46 let delay = if duration == Duration::from_millis(0) {
47 None
48 } else {
49 Some(Delay::new_timeout(Instant::now() + duration, duration))
50 };
51
52 Throttle {
53 delay,
54 duration,
55 has_delayed: true,
56 stream,
57 }
58}
59
60pin_project! {
61 /// Stream for the [`throttle`](throttle) function.
62 #[derive(Debug)]
63 #[must_use = "streams do nothing unless polled"]
64 pub struct Throttle<T> {
65 // `None` when duration is zero.
66 delay: Option<Delay>,
67 duration: Duration,
68
69 // Set to true when `delay` has returned ready, but `stream` hasn't.
70 has_delayed: bool,
71
72 // The stream to throttle
73 #[pin]
74 stream: T,
75 }
76}
77
78// XXX: are these safe if `T: !Unpin`?
79impl<T: Unpin> Throttle<T> {
80 /// Acquires a reference to the underlying stream that this combinator is
81 /// pulling from.
82 pub fn get_ref(&self) -> &T {
83 &self.stream
84 }
85
86 /// Acquires a mutable reference to the underlying stream that this combinator
87 /// is pulling from.
88 ///
89 /// Note that care must be taken to avoid tampering with the state of the stream
90 /// which may otherwise confuse this combinator.
91 pub fn get_mut(&mut self) -> &mut T {
92 &mut self.stream
93 }
94
95 /// Consumes this combinator, returning the underlying stream.
96 ///
97 /// Note that this may discard intermediate state of this combinator, so care
98 /// should be taken to avoid losing resources when this is called.
99 pub fn into_inner(self) -> T {
100 self.stream
101 }
102}
103
104impl<T: Stream> Stream for Throttle<T> {
105 type Item = T::Item;
106
107 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
108 if !self.has_delayed && self.delay.is_some() {
109 ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx));
110 *self.as_mut().project().has_delayed = true;
111 }
112
113 let value = ready!(self.as_mut().project().stream.poll_next(cx));
114
115 if value.is_some() {
116 let dur = self.duration;
117 if let Some(ref mut delay) = self.as_mut().project().delay {
118 delay.reset(Instant::now() + dur);
119 }
120
121 *self.as_mut().project().has_delayed = false;
122 }
123
124 Poll::Ready(value)
125 }
126}