futures_rx/stream_ext/
timing.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4    time::{Duration, Instant},
5};
6
7use futures::{
8    stream::{Fuse, FusedStream},
9    Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13pin_project! {
14    /// Stream for the [`timing`](RxStreamExt::timing) method.
15    #[must_use = "streams do nothing unless polled"]
16    pub struct Timing<S: Stream> {
17        #[pin]
18        stream: Fuse<S>,
19        last_time: Option<Instant>,
20    }
21}
22
23impl<S: Stream> Timing<S> {
24    pub(crate) fn new(stream: S) -> Self {
25        Self {
26            stream: stream.fuse(),
27            last_time: None,
28        }
29    }
30}
31
32impl<S: Stream> FusedStream for Timing<S> {
33    fn is_terminated(&self) -> bool {
34        self.stream.is_terminated()
35    }
36}
37
38impl<S: Stream> Stream for Timing<S> {
39    type Item = Timed<S::Item>;
40
41    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        let this = self.project();
43
44        match this.stream.poll_next(cx) {
45            Poll::Ready(Some(event)) => {
46                let timestamp = Instant::now();
47                let interval = this.last_time.map(|it| timestamp.duration_since(it));
48
49                *this.last_time = Some(timestamp);
50
51                Poll::Ready(Some(Timed {
52                    event,
53                    timestamp,
54                    interval,
55                }))
56            }
57            Poll::Ready(None) => Poll::Ready(None),
58            Poll::Pending => Poll::Pending,
59        }
60    }
61
62    fn size_hint(&self) -> (usize, Option<usize>) {
63        self.stream.size_hint()
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct Timed<T> {
69    pub event: T,
70    pub timestamp: Instant,
71    pub interval: Option<Duration>,
72}
73
74#[cfg(test)]
75mod test {
76    use std::time::Instant;
77
78    use futures::{executor::block_on, stream, Stream, StreamExt};
79    use futures_time::{future::FutureExt, time::Duration};
80
81    use crate::RxExt;
82
83    #[test]
84    fn smoke() {
85        block_on(async {
86            let stream = create_stream();
87            let start = Instant::now();
88            let all_events = stream.timing().collect::<Vec<_>>().await;
89            let timestamps = all_events
90                .iter()
91                .map(|it| it.timestamp)
92                .enumerate()
93                .collect::<Vec<_>>();
94            let intervals = all_events
95                .iter()
96                .map(|it| it.interval)
97                .enumerate()
98                .collect::<Vec<_>>();
99
100            for (index, timestamp) in timestamps {
101                assert!(
102                    timestamp.duration_since(start).as_millis() >= (50 * index).try_into().unwrap()
103                );
104            }
105
106            for (index, interval) in intervals {
107                if index == 0 {
108                    assert!(interval.is_none());
109                } else {
110                    assert!(interval.expect("interval is None!").as_millis() >= 50);
111                }
112            }
113        });
114    }
115
116    fn create_stream() -> impl Stream<Item = usize> {
117        stream::unfold(0, move |count| async move {
118            if count < 10 {
119                async { true }.delay(Duration::from_millis(50)).await;
120
121                Some((count, count + 1))
122            } else {
123                None
124            }
125        })
126    }
127}