1use std::collections::VecDeque;
2use std::pin::pin;
3use std::time::Duration;
4
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use tokio::time::{sleep, Instant};
8
9pub fn smooth_stream<S>(input: S, max_interval: Duration) -> impl Stream<Item = S::Item>
10where
11 S: Stream + Unpin + Send + 'static,
12 S::Item: Send,
13{
14 stream! {
15 let mut last_output_time = Instant::now();
16 let mut last_input_time = Instant::now();
17 let mut intervals = VecDeque::with_capacity(10);
18
19 let mut input = pin!(input);
20 while let Some(item) = input.next().await {
21 let now = Instant::now();
22 let input_interval = now.duration_since(last_input_time);
23 intervals.push_back(input_interval);
24 if intervals.len() > 10 {
25 intervals.pop_front();
26 }
27
28 let avg_input_interval = intervals.iter().sum::<Duration>() / u32::try_from(intervals.len()).unwrap_or(u32::MAX);
29 let interval = avg_input_interval.min(max_interval);
30
31 let time_since_last_output = now.duration_since(last_output_time);
32 if time_since_last_output < interval {
33 sleep(interval - time_since_last_output).await;
34 }
35
36 yield item;
37 last_output_time = Instant::now();
38 last_input_time = now;
39 }
40 }
41}