1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::collections::VecDeque;
use std::pin::pin;
use std::time::Duration;

use async_stream::stream;
use futures::{Stream, StreamExt};
use tokio::time::{sleep, Instant};

pub async fn smooth_stream<S>(input: S, max_interval: Duration) -> impl Stream<Item = S::Item>
where
    S: Stream + Unpin + Send + 'static,
    S::Item: Send,
{
    stream! {
        let mut last_output_time = Instant::now();
        let mut last_input_time = Instant::now();
        let mut intervals = VecDeque::with_capacity(10);

        let mut input = pin!(input);
        while let Some(item) = input.next().await {
            let now = Instant::now();
            let input_interval = now.duration_since(last_input_time);
            intervals.push_back(input_interval);
            if intervals.len() > 10 {
                intervals.pop_front();
            }

            let avg_input_interval = intervals.iter().sum::<Duration>() / u32::try_from(intervals.len()).unwrap_or(u32::MAX);
            let interval = avg_input_interval.min(max_interval);

            let time_since_last_output = now.duration_since(last_output_time);
            if time_since_last_output < interval {
                sleep(interval - time_since_last_output).await;
            }

            yield item;
            last_output_time = Instant::now();
            last_input_time = now;
        }
    }
}