async_interval/
impl_tokio.rs1use alloc::boxed::Box;
2use core::{future::Future, pin::Pin, time::Duration};
3
4use futures_util::FutureExt as _;
5pub use tokio::time::{Interval, Interval as TokioTimeInterval};
6
7use crate::Intervalable;
8
9impl Intervalable for Interval {
11 fn interval(dur: Duration) -> Self {
12 tokio::time::interval(tokio::time::Duration::from_micros(dur.as_micros() as u64))
13 }
14
15 fn wait<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
16 self.reset();
17 Box::pin(self.tick().map(|_| ()))
18 }
19
20 #[cfg(feature = "std")]
21 fn wait_for_std<'a>(
22 &'a mut self,
23 ) -> Pin<Box<dyn Future<Output = Option<std::time::Instant>> + Send + 'a>> {
24 self.reset();
25 Box::pin(self.tick().map(|x| Some(x.into_std())))
26 }
27}
28
29#[cfg(test)]
30mod tests {
31 #[allow(unused_imports)]
32 use super::*;
33
34 #[cfg(feature = "std")]
35 #[tokio::test]
36 async fn test_impl() {
37 #[cfg(feature = "std")]
38 let now = std::time::Instant::now();
39
40 let mut interval = <Interval as Intervalable>::interval(Duration::from_millis(100));
41
42 interval.wait().await;
44
45 #[cfg(feature = "std")]
46 {
47 let elapsed_dur = now.elapsed();
48 assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
49 }
50
51 interval.wait().await;
53
54 #[cfg(feature = "std")]
55 {
56 let elapsed_dur = now.elapsed();
57 assert!(elapsed_dur.as_millis() >= 200 && elapsed_dur.as_millis() <= 210);
58 }
59
60 #[cfg(feature = "std")]
61 {
62 assert!(interval.wait_for_std().await.is_some());
63
64 let elapsed_dur = now.elapsed();
65 assert!(elapsed_dur.as_millis() >= 300 && elapsed_dur.as_millis() <= 315);
66 }
67 }
68
69 #[cfg(all(feature = "std", feature = "stream"))]
70 #[tokio::test]
71 async fn test_intervalable_iter_stream() {
72 use alloc::{vec, vec::Vec};
73
74 use futures_util::StreamExt as _;
75
76 let st = crate::intervalable_iter_stream(
78 0..=2,
79 <Interval as Intervalable>::interval(Duration::from_millis(100)),
80 );
81
82 #[cfg(feature = "std")]
83 let now = std::time::Instant::now();
84
85 assert_eq!(st.collect::<Vec<_>>().await, vec![0, 1, 2]);
86
87 #[cfg(feature = "std")]
88 {
89 let elapsed_dur = now.elapsed();
90 assert!(elapsed_dur.as_millis() >= 300 && elapsed_dur.as_millis() <= 310);
91 }
92 }
93
94 #[cfg(all(feature = "std", feature = "stream"))]
95 #[tokio::test]
96 async fn test_intervalable_repeat_stream() {
97 use alloc::{vec, vec::Vec};
98
99 use futures_util::StreamExt as _;
100
101 let st = crate::intervalable_repeat_stream(
103 0,
104 <Interval as Intervalable>::interval(Duration::from_millis(100)),
105 );
106
107 #[cfg(feature = "std")]
108 let now = std::time::Instant::now();
109
110 assert_eq!(st.take(3).collect::<Vec<_>>().await, vec![0, 0, 0]);
111
112 #[cfg(feature = "std")]
113 {
114 let elapsed_dur = now.elapsed();
115 assert!(elapsed_dur.as_millis() >= 300 && elapsed_dur.as_millis() <= 310);
116 }
117 }
118
119 #[cfg(all(feature = "std", feature = "stream"))]
120 #[tokio::test]
121 async fn test_intervalable_repeat_with_stream() {
122 use alloc::{vec, vec::Vec};
123
124 use futures_util::StreamExt as _;
125
126 let st = crate::intervalable_repeat_with_stream(
128 || 0,
129 <Interval as Intervalable>::interval(Duration::from_millis(100)),
130 );
131
132 #[cfg(feature = "std")]
133 let now = std::time::Instant::now();
134
135 assert_eq!(st.take(3).collect::<Vec<_>>().await, vec![0, 0, 0]);
136
137 #[cfg(feature = "std")]
138 {
139 let elapsed_dur = now.elapsed();
140 assert!(elapsed_dur.as_millis() >= 300 && elapsed_dur.as_millis() <= 310);
141 }
142 }
143}