futures_rx/stream_ext/
timing.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4 time::{Duration, Instant},
5};
6
7use futures::{
8 stream::{Fuse, FusedStream},
9 Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13pin_project! {
14 #[must_use = "streams do nothing unless polled"]
16 pub struct Timing<S: Stream> {
17 #[pin]
18 stream: Fuse<S>,
19 last_time: Option<Instant>,
20 }
21}
22
23impl<S: Stream> Timing<S> {
24 pub(crate) fn new(stream: S) -> Self {
25 Self {
26 stream: stream.fuse(),
27 last_time: None,
28 }
29 }
30}
31
32impl<S: Stream> FusedStream for Timing<S> {
33 fn is_terminated(&self) -> bool {
34 self.stream.is_terminated()
35 }
36}
37
38impl<S: Stream> Stream for Timing<S> {
39 type Item = Timed<S::Item>;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let this = self.project();
43
44 match this.stream.poll_next(cx) {
45 Poll::Ready(Some(event)) => {
46 let timestamp = Instant::now();
47 let interval = this.last_time.map(|it| timestamp.duration_since(it));
48
49 *this.last_time = Some(timestamp);
50
51 Poll::Ready(Some(Timed {
52 event,
53 timestamp,
54 interval,
55 }))
56 }
57 Poll::Ready(None) => Poll::Ready(None),
58 Poll::Pending => Poll::Pending,
59 }
60 }
61
62 fn size_hint(&self) -> (usize, Option<usize>) {
63 self.stream.size_hint()
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct Timed<T> {
69 pub event: T,
70 pub timestamp: Instant,
71 pub interval: Option<Duration>,
72}
73
74#[cfg(test)]
75mod test {
76 use std::time::Instant;
77
78 use futures::{executor::block_on, stream, Stream, StreamExt};
79 use futures_time::{future::FutureExt, time::Duration};
80
81 use crate::RxExt;
82
83 #[test]
84 fn smoke() {
85 block_on(async {
86 let stream = create_stream();
87 let start = Instant::now();
88 let all_events = stream.timing().collect::<Vec<_>>().await;
89 let timestamps = all_events
90 .iter()
91 .map(|it| it.timestamp)
92 .enumerate()
93 .collect::<Vec<_>>();
94 let intervals = all_events
95 .iter()
96 .map(|it| it.interval)
97 .enumerate()
98 .collect::<Vec<_>>();
99
100 for (index, timestamp) in timestamps {
101 assert!(
102 timestamp.duration_since(start).as_millis() >= (50 * index).try_into().unwrap()
103 );
104 }
105
106 for (index, interval) in intervals {
107 if index == 0 {
108 assert!(interval.is_none());
109 } else {
110 assert!(interval.expect("interval is None!").as_millis() >= 50);
111 }
112 }
113 });
114 }
115
116 fn create_stream() -> impl Stream<Item = usize> {
117 stream::unfold(0, move |count| async move {
118 if count < 10 {
119 async { true }.delay(Duration::from_millis(50)).await;
120
121 Some((count, count + 1))
122 } else {
123 None
124 }
125 })
126 }
127}