smooth_stream/
lib.rs

1use std::collections::VecDeque;
2use std::pin::pin;
3use std::time::Duration;
4
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use tokio::time::{sleep, Instant};
8
9pub fn smooth_stream<S>(input: S, max_interval: Duration) -> impl Stream<Item = S::Item>
10where
11    S: Stream + Unpin + Send + 'static,
12    S::Item: Send,
13{
14    stream! {
15        let mut last_output_time = Instant::now();
16        let mut last_input_time = Instant::now();
17        let mut intervals = VecDeque::with_capacity(10);
18
19        let mut input = pin!(input);
20        while let Some(item) = input.next().await {
21            let now = Instant::now();
22            let input_interval = now.duration_since(last_input_time);
23            intervals.push_back(input_interval);
24            if intervals.len() > 10 {
25                intervals.pop_front();
26            }
27
28            let avg_input_interval = intervals.iter().sum::<Duration>() / u32::try_from(intervals.len()).unwrap_or(u32::MAX);
29            let interval = avg_input_interval.min(max_interval);
30
31            let time_since_last_output = now.duration_since(last_output_time);
32            if time_since_last_output < interval {
33                sleep(interval - time_since_last_output).await;
34            }
35
36            yield item;
37            last_output_time = Instant::now();
38            last_input_time = now;
39        }
40    }
41}