async_speed_limit/
clock.rs1#[cfg(feature = "standard-clock")]
6use futures_timer::Delay;
7#[cfg(feature = "standard-clock")]
8use std::time::Instant;
9use std::{
10 convert::TryInto,
11 fmt::Debug,
12 future::Future,
13 marker::Unpin,
14 mem,
15 ops::{Add, Sub},
16 pin::Pin,
17 sync::{
18 atomic::{AtomicU64, Ordering},
19 Arc, Mutex,
20 },
21 task::{Context, Poll, Waker},
22 time::Duration,
23};
24
25pub trait Clock: Clone + Default {
42 type Instant: Copy + Sub<Output = Duration>;
48
49 type Delay: Future<Output = ()> + Unpin;
51
52 fn now(&self) -> Self::Instant;
57
58 fn sleep(&self, dur: Duration) -> Self::Delay;
66}
67
68pub trait BlockingClock: Clock {
70 fn blocking_sleep(&self, dur: Duration);
72}
73
74#[cfg(feature = "standard-clock")]
81#[derive(Copy, Clone, Debug, Default)]
82pub struct StandardClock;
83
84#[cfg(feature = "standard-clock")]
85impl Clock for StandardClock {
86 type Instant = Instant;
87 type Delay = Delay;
88
89 fn now(&self) -> Self::Instant {
90 Instant::now()
91 }
92
93 fn sleep(&self, dur: Duration) -> Self::Delay {
94 Delay::new(dur)
95 }
96}
97
98#[cfg(feature = "standard-clock")]
99impl BlockingClock for StandardClock {
100 fn blocking_sleep(&self, dur: Duration) {
101 std::thread::sleep(dur);
102 }
103}
104
105#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
109pub struct Nanoseconds(pub u64);
110
111impl Sub for Nanoseconds {
112 type Output = Duration;
113 fn sub(self, other: Self) -> Duration {
114 Duration::from_nanos(self.0 - other.0)
115 }
116}
117
118impl Add<Duration> for Nanoseconds {
119 type Output = Self;
120 fn add(self, other: Duration) -> Self {
121 let dur: u64 = other
122 .as_nanos()
123 .try_into()
124 .expect("cannot increase more than 2^64 ns");
125 Self(self.0 + dur)
126 }
127}
128
129#[derive(Debug)]
131pub struct ManualDelay {
132 clock: Arc<ManualClockContent>,
133 timeout: Nanoseconds,
134}
135
136impl Future for ManualDelay {
137 type Output = ();
138
139 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140 let now = self.clock.now();
141 if now >= self.timeout {
142 Poll::Ready(())
143 } else {
144 self.clock.register(cx);
145 Poll::Pending
146 }
147 }
148}
149
150#[derive(Default, Debug)]
153struct ManualClockContent {
154 now: AtomicU64,
155 wakers: Mutex<Vec<Waker>>,
156}
157
158impl ManualClockContent {
159 fn now(&self) -> Nanoseconds {
160 Nanoseconds(self.now.load(Ordering::SeqCst))
161 }
162
163 fn set_time(&self, time: u64) {
164 let old_time = self.now.swap(time, Ordering::SeqCst);
165 assert!(old_time <= time, "cannot move the time backwards");
166
167 let wakers = { mem::take(&mut *self.wakers.lock().unwrap()) };
168 wakers.into_iter().for_each(Waker::wake);
169 }
170
171 fn register(&self, cx: &mut Context<'_>) {
172 self.wakers.lock().unwrap().push(cx.waker().clone());
173 }
174}
175
176#[derive(Default, Debug, Clone)]
193pub struct ManualClock(Arc<ManualClockContent>);
194
195impl ManualClock {
196 pub fn new() -> Self {
198 Self::default()
199 }
200
201 pub fn set_time(&self, time: Nanoseconds) {
208 self.0.set_time(time.0);
209 }
210}
211
212impl Clock for ManualClock {
213 type Instant = Nanoseconds;
214 type Delay = ManualDelay;
215
216 fn now(&self) -> Self::Instant {
217 self.0.now()
218 }
219
220 fn sleep(&self, dur: Duration) -> Self::Delay {
221 ManualDelay {
222 timeout: self.0.now() + dur,
223 clock: self.0.clone(),
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use futures_executor::LocalPool;
232 use futures_util::task::SpawnExt;
233 use std::sync::{
234 atomic::{AtomicUsize, Ordering},
235 Arc,
236 };
237
238 #[test]
239 fn manual_clock_basics() {
240 let clock = ManualClock::new();
241 let t1 = clock.now();
242 assert_eq!(t1, Nanoseconds(0));
243
244 clock.set_time(Nanoseconds(1_000_000_000));
245
246 let t2 = clock.now();
247 assert_eq!(t2, Nanoseconds(1_000_000_000));
248 assert_eq!(t2 - t1, Duration::from_secs(1));
249
250 clock.set_time(Nanoseconds(1_000_000_007));
251
252 let t3 = clock.now();
253 assert_eq!(t3, Nanoseconds(1_000_000_007));
254 assert_eq!(t3 - t2, Duration::from_nanos(7));
255 }
256
257 #[test]
258 fn manual_clock_sleep() {
259 let counter = Arc::new(AtomicUsize::new(0));
260 let clock = ManualClock::new();
261 let mut pool = LocalPool::new();
262 let sp = pool.spawner();
263
264 sp.spawn({
274 let counter = counter.clone();
275 let clock = clock.clone();
276 async move {
277 clock.sleep(Duration::from_secs(2)).await;
278 counter.fetch_add(1, Ordering::Relaxed);
279 clock.sleep(Duration::from_secs(3)).await;
280 counter.fetch_add(4, Ordering::Relaxed);
281 }
282 })
283 .unwrap();
284
285 sp.spawn({
286 let counter = counter.clone();
287 let clock = clock.clone();
288 async move {
289 clock.sleep(Duration::from_secs(3)).await;
290 counter.fetch_add(16, Ordering::Relaxed);
291 clock.sleep(Duration::from_secs(1)).await;
292 counter.fetch_add(64, Ordering::Relaxed);
293 }
294 })
295 .unwrap();
296
297 clock.set_time(Nanoseconds(0));
298 pool.run_until_stalled();
299 assert_eq!(counter.load(Ordering::Relaxed), 0);
300
301 clock.set_time(Nanoseconds(1_000_000_000));
302 pool.run_until_stalled();
303 assert_eq!(counter.load(Ordering::Relaxed), 0);
304
305 clock.set_time(Nanoseconds(2_000_000_000));
306 pool.run_until_stalled();
307 assert_eq!(counter.load(Ordering::Relaxed), 1);
308
309 clock.set_time(Nanoseconds(3_000_000_000));
310 pool.run_until_stalled();
311 assert_eq!(counter.load(Ordering::Relaxed), 17);
312
313 clock.set_time(Nanoseconds(4_000_000_000));
314 pool.run_until_stalled();
315 assert_eq!(counter.load(Ordering::Relaxed), 81);
316
317 clock.set_time(Nanoseconds(5_000_000_000));
318 pool.run_until_stalled();
319 assert_eq!(counter.load(Ordering::Relaxed), 85);
320
321 assert!(!pool.try_run_one());
323 }
324
325 #[test]
326 #[cfg(feature = "standard-clock")]
327 fn standard_clock() {
328 let res = futures_executor::block_on(async {
329 let init = StandardClock.now();
330 StandardClock.sleep(Duration::from_secs(1)).await;
331 StandardClock.now() - init
332 });
333 assert!(
334 Duration::from_millis(900) <= res && res <= Duration::from_millis(1100),
335 "standard clock slept too long at {:?}",
336 res
337 );
338 }
339}