async_speed_limit/
clock.rs

1// Copyright 2019 TiKV Project Authors. Licensed under MIT or Apache-2.0.
2
3//! Clocks
4
5#[cfg(feature = "standard-clock")]
6use futures_timer::Delay;
7#[cfg(feature = "standard-clock")]
8use std::time::Instant;
9use std::{
10    convert::TryInto,
11    fmt::Debug,
12    future::Future,
13    marker::Unpin,
14    mem,
15    ops::{Add, Sub},
16    pin::Pin,
17    sync::{
18        atomic::{AtomicU64, Ordering},
19        Arc, Mutex,
20    },
21    task::{Context, Poll, Waker},
22    time::Duration,
23};
24
25/// A `Clock` controls the passing of time.
26///
27/// [`Limiter`](crate::Limiter) uses [`sleep()`](Clock::sleep()) to impose speed
28/// limit, and it relies on the current and past timestamps to determine how
29/// long to sleep. Both of these time-related features are encapsulated into
30/// this `Clock` trait.
31///
32/// # Implementing
33///
34/// The [`StandardClock`] should be enough in most situation. However, these are
35/// cases for a custom clock, e.g. use a coarse clock instead of the standard
36/// high-precision clock, or use a specialized future associated with an
37/// executor instead of the generic `futures-timer`.
38///
39/// Types implementing `Clock` must be cheap to clone (e.g. using `Arc`), and
40/// the default value must be ready to use.
41pub trait Clock: Clone + Default {
42    /// Type to represent a point of time.
43    ///
44    /// Subtracting two instances should return the duration elapsed between
45    /// them. The subtraction must never block or panic when they are properly
46    /// ordered.
47    type Instant: Copy + Sub<Output = Duration>;
48
49    /// Future type returned by [`sleep()`](Clock::sleep()).
50    type Delay: Future<Output = ()> + Unpin;
51
52    /// Returns the current time instant. It should be monotonically increasing,
53    /// but not necessarily high-precision or steady.
54    ///
55    /// This function must never block or panic.
56    fn now(&self) -> Self::Instant;
57
58    /// Asynchronously sleeps the current task for the given duration.
59    ///
60    /// This method should return a future which fulfilled as `()` after a
61    /// duration of `dur`. It should *not* block the current thread.
62    ///
63    /// `sleep()` is often called with a duration of 0 s. This should result in
64    /// a future being resolved immediately.
65    fn sleep(&self, dur: Duration) -> Self::Delay;
66}
67
68/// A `BlockingClock` is a [`Clock`] which supports synchronous sleeping.
69pub trait BlockingClock: Clock {
70    /// Sleeps and blocks the current thread for the given duration.
71    fn blocking_sleep(&self, dur: Duration);
72}
73
74/// The physical clock using [`std::time::Instant`].
75///
76/// The sleeping future is based on [`futures-timer`]. Blocking sleep uses
77/// [`std::thread::sleep()`].
78///
79/// [`futures-timer`]: https://docs.rs/futures-timer/
80#[cfg(feature = "standard-clock")]
81#[derive(Copy, Clone, Debug, Default)]
82pub struct StandardClock;
83
84#[cfg(feature = "standard-clock")]
85impl Clock for StandardClock {
86    type Instant = Instant;
87    type Delay = Delay;
88
89    fn now(&self) -> Self::Instant {
90        Instant::now()
91    }
92
93    fn sleep(&self, dur: Duration) -> Self::Delay {
94        Delay::new(dur)
95    }
96}
97
98#[cfg(feature = "standard-clock")]
99impl BlockingClock for StandardClock {
100    fn blocking_sleep(&self, dur: Duration) {
101        std::thread::sleep(dur);
102    }
103}
104
105/// Number of nanoseconds since an arbitrary epoch.
106///
107/// This is the instant type of [`ManualClock`].
108#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
109pub struct Nanoseconds(pub u64);
110
111impl Sub for Nanoseconds {
112    type Output = Duration;
113    fn sub(self, other: Self) -> Duration {
114        Duration::from_nanos(self.0 - other.0)
115    }
116}
117
118impl Add<Duration> for Nanoseconds {
119    type Output = Self;
120    fn add(self, other: Duration) -> Self {
121        let dur: u64 = other
122            .as_nanos()
123            .try_into()
124            .expect("cannot increase more than 2^64 ns");
125        Self(self.0 + dur)
126    }
127}
128
129/// The future returned by [`ManualClock`]`::sleep()`.
130#[derive(Debug)]
131pub struct ManualDelay {
132    clock: Arc<ManualClockContent>,
133    timeout: Nanoseconds,
134}
135
136impl Future for ManualDelay {
137    type Output = ();
138
139    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140        let now = self.clock.now();
141        if now >= self.timeout {
142            Poll::Ready(())
143        } else {
144            self.clock.register(cx);
145            Poll::Pending
146        }
147    }
148}
149
150/// Internal, shared part of [`ManualClock`]. `ManualClock` itself is an `Arc`
151/// of `ManualClockContent`.
152#[derive(Default, Debug)]
153struct ManualClockContent {
154    now: AtomicU64,
155    wakers: Mutex<Vec<Waker>>,
156}
157
158impl ManualClockContent {
159    fn now(&self) -> Nanoseconds {
160        Nanoseconds(self.now.load(Ordering::SeqCst))
161    }
162
163    fn set_time(&self, time: u64) {
164        let old_time = self.now.swap(time, Ordering::SeqCst);
165        assert!(old_time <= time, "cannot move the time backwards");
166
167        let wakers = { mem::take(&mut *self.wakers.lock().unwrap()) };
168        wakers.into_iter().for_each(Waker::wake);
169    }
170
171    fn register(&self, cx: &mut Context<'_>) {
172        self.wakers.lock().unwrap().push(cx.waker().clone());
173    }
174}
175
176/// A [`Clock`] where the passage of time can be manually controlled.
177///
178/// This type is mainly used for testing behavior of speed limiter only.
179///
180/// This clock only supports up to 2<sup>64</sup> ns (about 584.5 years).
181///
182/// # Examples
183///
184/// ```rust
185/// use async_speed_limit::clock::{Clock, ManualClock, Nanoseconds};
186///
187/// let clock = ManualClock::new();
188/// assert_eq!(clock.now(), Nanoseconds(0));
189/// clock.set_time(Nanoseconds(1_000_000_000));
190/// assert_eq!(clock.now(), Nanoseconds(1_000_000_000));
191/// ```
192#[derive(Default, Debug, Clone)]
193pub struct ManualClock(Arc<ManualClockContent>);
194
195impl ManualClock {
196    /// Creates a new clock with time set to 0.
197    pub fn new() -> Self {
198        Self::default()
199    }
200
201    /// Set the current time of this clock to the given value.
202    ///
203    /// # Panics
204    ///
205    /// Since [`now()`](Clock::now()) must be monotonically increasing, if the
206    /// new time is less than the previous time, this function will panic.
207    pub fn set_time(&self, time: Nanoseconds) {
208        self.0.set_time(time.0);
209    }
210}
211
212impl Clock for ManualClock {
213    type Instant = Nanoseconds;
214    type Delay = ManualDelay;
215
216    fn now(&self) -> Self::Instant {
217        self.0.now()
218    }
219
220    fn sleep(&self, dur: Duration) -> Self::Delay {
221        ManualDelay {
222            timeout: self.0.now() + dur,
223            clock: self.0.clone(),
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use futures_executor::LocalPool;
232    use futures_util::task::SpawnExt;
233    use std::sync::{
234        atomic::{AtomicUsize, Ordering},
235        Arc,
236    };
237
238    #[test]
239    fn manual_clock_basics() {
240        let clock = ManualClock::new();
241        let t1 = clock.now();
242        assert_eq!(t1, Nanoseconds(0));
243
244        clock.set_time(Nanoseconds(1_000_000_000));
245
246        let t2 = clock.now();
247        assert_eq!(t2, Nanoseconds(1_000_000_000));
248        assert_eq!(t2 - t1, Duration::from_secs(1));
249
250        clock.set_time(Nanoseconds(1_000_000_007));
251
252        let t3 = clock.now();
253        assert_eq!(t3, Nanoseconds(1_000_000_007));
254        assert_eq!(t3 - t2, Duration::from_nanos(7));
255    }
256
257    #[test]
258    fn manual_clock_sleep() {
259        let counter = Arc::new(AtomicUsize::new(0));
260        let clock = ManualClock::new();
261        let mut pool = LocalPool::new();
262        let sp = pool.spawner();
263
264        // expected sequence:
265        //
266        //   t=0    .     .
267        //   t=1    .     .
268        //   t=2    +1    .
269        //   t=3    .     +16
270        //   t=4    .     +64
271        //   t=5    +4    .
272
273        sp.spawn({
274            let counter = counter.clone();
275            let clock = clock.clone();
276            async move {
277                clock.sleep(Duration::from_secs(2)).await;
278                counter.fetch_add(1, Ordering::Relaxed);
279                clock.sleep(Duration::from_secs(3)).await;
280                counter.fetch_add(4, Ordering::Relaxed);
281            }
282        })
283        .unwrap();
284
285        sp.spawn({
286            let counter = counter.clone();
287            let clock = clock.clone();
288            async move {
289                clock.sleep(Duration::from_secs(3)).await;
290                counter.fetch_add(16, Ordering::Relaxed);
291                clock.sleep(Duration::from_secs(1)).await;
292                counter.fetch_add(64, Ordering::Relaxed);
293            }
294        })
295        .unwrap();
296
297        clock.set_time(Nanoseconds(0));
298        pool.run_until_stalled();
299        assert_eq!(counter.load(Ordering::Relaxed), 0);
300
301        clock.set_time(Nanoseconds(1_000_000_000));
302        pool.run_until_stalled();
303        assert_eq!(counter.load(Ordering::Relaxed), 0);
304
305        clock.set_time(Nanoseconds(2_000_000_000));
306        pool.run_until_stalled();
307        assert_eq!(counter.load(Ordering::Relaxed), 1);
308
309        clock.set_time(Nanoseconds(3_000_000_000));
310        pool.run_until_stalled();
311        assert_eq!(counter.load(Ordering::Relaxed), 17);
312
313        clock.set_time(Nanoseconds(4_000_000_000));
314        pool.run_until_stalled();
315        assert_eq!(counter.load(Ordering::Relaxed), 81);
316
317        clock.set_time(Nanoseconds(5_000_000_000));
318        pool.run_until_stalled();
319        assert_eq!(counter.load(Ordering::Relaxed), 85);
320
321        // all futures should be exhausted.
322        assert!(!pool.try_run_one());
323    }
324
325    #[test]
326    #[cfg(feature = "standard-clock")]
327    fn standard_clock() {
328        let res = futures_executor::block_on(async {
329            let init = StandardClock.now();
330            StandardClock.sleep(Duration::from_secs(1)).await;
331            StandardClock.now() - init
332        });
333        assert!(
334            Duration::from_millis(900) <= res && res <= Duration::from_millis(1100),
335            "standard clock slept too long at {:?}",
336            res
337        );
338    }
339}