async_speed_limit/
limiter.rs

1// Copyright 2019 TiKV Project Authors. Licensed under MIT or Apache-2.0.
2
3//! Speed limiter
4
5#[cfg(feature = "standard-clock")]
6use crate::clock::StandardClock;
7use crate::clock::{BlockingClock, Clock};
8use pin_project_lite::pin_project;
9use std::{
10    future::Future,
11    mem,
12    ops::Sub,
13    pin::Pin,
14    sync::Arc,
15    sync::{
16        atomic::{AtomicBool, AtomicUsize, Ordering},
17        Mutex,
18    },
19    task::{Context, Poll},
20    time::Duration,
21};
22
23/// Stores the current state of the limiter.
24#[derive(Debug, Clone, Copy)]
25struct Bucket<I> {
26    /// Last updated instant of the bucket. This is used to compare with the
27    /// current instant to deduce the current bucket value.
28    last_updated: I,
29    /// The speed limit (unit: B/s).
30    speed_limit: f64,
31    /// Time needed to refill the entire bucket (unit: s).
32    refill: f64,
33    /// The number of bytes the bucket is carrying at the time `last_updated`.
34    /// This value can be negative.
35    value: f64,
36    /// The minimum duration to wait if self.value is smaller than 0 after
37    /// call `self.consume` (unit: s).
38    /// By default, is the same as `refill`.
39    min_wait: f64,
40}
41
42impl<I> Bucket<I> {
43    /// Returns the maximum number of bytes this bucket can carry.
44    fn capacity(&self) -> f64 {
45        self.speed_limit * self.refill
46    }
47
48    /// Consumes the given number of bytes from the bucket.
49    ///
50    /// Returns the duration we need for the consumed bytes to recover.
51    ///
52    /// This method should only be called when the speed is finite.
53    fn consume(&mut self, size: f64) -> Duration {
54        self.value -= size;
55        if self.value > 0.0 {
56            Duration::from_secs(0)
57        } else {
58            let sleep_secs = self.min_wait - self.value / self.speed_limit;
59            Duration::from_secs_f64(sleep_secs)
60        }
61    }
62
63    /// Reverts the previous consumption of the given number of bytes.
64    ///
65    /// This method should only be called when the speed is finite
66    fn unconsume(&mut self, size: f64) {
67        self.value += size;
68    }
69
70    /// Changes the speed limit.
71    ///
72    /// The current value will be raised or lowered so that the number of
73    /// consumed bytes remains constant.
74    fn set_speed_limit(&mut self, new_speed_limit: f64) {
75        let old_capacity = self.capacity();
76        self.speed_limit = new_speed_limit;
77        if new_speed_limit.is_finite() {
78            let new_capacity = self.capacity();
79            if old_capacity.is_finite() {
80                self.value += new_capacity - old_capacity;
81            } else {
82                self.value = new_capacity;
83            }
84        }
85    }
86}
87
88impl<I: Copy + Sub<Output = Duration>> Bucket<I> {
89    /// Refills the bucket to match the value at current time.
90    ///
91    /// This method should only be called when the speed is finite.
92    fn refill(&mut self, now: I) {
93        let elapsed = (now - self.last_updated).as_secs_f64();
94        let refilled = self.speed_limit * elapsed;
95        self.value = self.capacity().min(self.value + refilled);
96        self.last_updated = now;
97    }
98}
99
100/// Builder for [`Limiter`].
101///
102/// # Examples
103///
104#[cfg_attr(feature = "standard-clock", doc = "```rust")]
105#[cfg_attr(not(feature = "standard-clock"), doc = "```ignore")]
106/// use async_speed_limit::Limiter;
107/// use std::time::Duration;
108///
109/// let limiter = <Limiter>::builder(1_048_576.0)
110///     .refill(Duration::from_millis(100))
111///     .build();
112/// # drop(limiter);
113/// ```
114#[derive(Debug)]
115pub struct Builder<C: Clock> {
116    clock: C,
117    bucket: Bucket<C::Instant>,
118    min_wait: Option<f64>,
119}
120
121impl<C: Clock> Builder<C> {
122    /// Creates a new limiter builder.
123    ///
124    /// Use [infinity](`std::f64::INFINITY`) to make the speed unlimited.
125    pub fn new(speed_limit: f64) -> Self {
126        let clock = C::default();
127        let mut result = Self {
128            bucket: Bucket {
129                last_updated: clock.now(),
130                speed_limit: 0.0,
131                refill: 0.1,
132                value: 0.0,
133                min_wait: 0.1,
134            },
135            clock,
136            min_wait: None,
137        };
138        result.speed_limit(speed_limit);
139        result
140    }
141
142    /// Sets the speed limit of the limiter.
143    ///
144    /// Use [infinity](`std::f64::INFINITY`) to make the speed unlimited.
145    ///
146    /// # Panics
147    ///
148    /// The speed limit must be positive. Panics if the speed limit is negative,
149    /// zero, or NaN.
150    pub fn speed_limit(&mut self, speed_limit: f64) -> &mut Self {
151        assert!(speed_limit > 0.0, "speed limit must be positive");
152        self.bucket.speed_limit = speed_limit;
153        self
154    }
155
156    /// Sets the refill period of the limiter.
157    ///
158    /// The default value is 0.1 s, which should be good for most use cases. The
159    /// refill period is ignored if the speed is [infinity](`std::f64::INFINITY`).
160    ///
161    /// # Panics
162    ///
163    /// The duration must not be zero, otherwise this method panics.
164    pub fn refill(&mut self, dur: Duration) -> &mut Self {
165        assert!(
166            dur > Duration::from_secs(0),
167            "refill duration must not be zero"
168        );
169        self.bucket.refill = dur.as_secs_f64();
170        self
171    }
172
173    /// Sets the minimum wait duration when the speed limit was exceeded.
174    ///
175    /// The default value is same as the refill period.
176    pub fn min_wait(&mut self, dur: Duration) -> &mut Self {
177        self.min_wait = Some(dur.as_secs_f64());
178        self
179    }
180
181    /// Sets the clock instance used by the limiter.
182    pub fn clock(&mut self, clock: C) -> &mut Self {
183        self.clock = clock;
184        self
185    }
186
187    /// Builds the limiter.
188    pub fn build(&mut self) -> Limiter<C> {
189        self.bucket.value = self.bucket.capacity();
190        self.bucket.last_updated = self.clock.now();
191        let is_unlimited = self.bucket.speed_limit.is_infinite();
192        let min_wait = self.min_wait.unwrap_or(self.bucket.refill);
193        self.bucket.min_wait = min_wait;
194        Limiter {
195            bucket: Arc::new(Mutex::new(self.bucket)),
196            clock: mem::take(&mut self.clock),
197            total_bytes_consumed: Arc::new(AtomicUsize::new(0)),
198            is_unlimited: Arc::new(AtomicBool::new(is_unlimited)),
199        }
200    }
201}
202
203macro_rules! declare_limiter {
204    ($($default_clock:tt)*) => {
205        /// A type to control the maximum speed limit of multiple streams.
206        ///
207        /// When a `Limiter` is cloned, the instances would share the same
208        /// queue. Multiple tasks can cooperatively respect a global speed limit
209        /// via clones. Cloning a `Limiter` is cheap (equals to cloning two
210        /// `Arc`s).
211        ///
212        /// The speed limit is imposed by awaiting
213        /// [`consume()`](Limiter::consume()). The method returns a future which
214        /// sleeps until rate falls below the limit.
215        ///
216        /// # Examples
217        ///
218        /// Upload some small files atomically in parallel, while maintaining a
219        /// global speed limit of 1 MiB/s.
220        ///
221        #[cfg_attr(feature = "standard-clock", doc = "```rust")]
222        #[cfg_attr(not(feature = "standard-clock"), doc = "```ignore")]
223        /// use async_speed_limit::Limiter;
224        /// use futures_util::future::try_join_all;
225        ///
226        /// # async {
227        /// # let files = &[""];
228        /// # async fn upload(file: &str) -> Result<(), ()> { Ok(()) }
229        /// let limiter = <Limiter>::new(1_048_576.0);
230        /// let processes = files
231        ///     .iter()
232        ///     .map(|file| {
233        ///         let limiter = limiter.clone();
234        ///         async move {
235        ///             limiter.consume(file.len()).await;
236        ///             upload(file).await?;
237        ///             Ok(())
238        ///         }
239        ///     });
240        /// try_join_all(processes).await?;
241        /// # Ok::<_, ()>(()) };
242        /// ```
243        #[derive(Debug, Clone)]
244        pub struct Limiter<C: Clock $($default_clock)*> {
245            /// State of the limiter.
246            // TODO avoid using Arc<Mutex>?
247            bucket: Arc<Mutex<Bucket<C::Instant>>>,
248            /// Clock used for time calculation.
249            clock: C,
250            /// Statistics of the number of bytes consumed for record. When this
251            /// number reaches `usize::MAX` it will wrap around.
252            total_bytes_consumed: Arc<AtomicUsize>,
253            /// A flag indicates unlimited speed.
254            is_unlimited: Arc<AtomicBool>,
255        }
256    }
257}
258
259#[cfg(feature = "standard-clock")]
260declare_limiter! { = StandardClock }
261
262#[cfg(not(feature = "standard-clock"))]
263declare_limiter! {}
264
265impl<C: Clock> Limiter<C> {
266    /// Creates a new speed limiter.
267    ///
268    /// Use [infinity](`std::f64::INFINITY`) to make the speed unlimited.
269    pub fn new(speed_limit: f64) -> Self {
270        Builder::new(speed_limit).build()
271    }
272
273    /// Makes a [`Builder`] for further configurating this limiter.
274    ///
275    /// Use [infinity](`std::f64::INFINITY`) to make the speed unlimited.
276    pub fn builder(speed_limit: f64) -> Builder<C> {
277        Builder::new(speed_limit)
278    }
279
280    /// Returns the clock associated with this limiter.
281    pub fn clock(&self) -> &C {
282        &self.clock
283    }
284
285    /// Dynamically changes the speed limit. The new limit applies to all clones
286    /// of this instance.
287    ///
288    /// Use [infinity](`std::f64::INFINITY`) to make the speed unlimited.
289    ///
290    /// This change will not affect any tasks scheduled _before_ this call.
291    pub fn set_speed_limit(&self, speed_limit: f64) {
292        debug_assert!(speed_limit > 0.0, "speed limit must be positive");
293        self.bucket.lock().unwrap().set_speed_limit(speed_limit);
294        self.is_unlimited
295            .store(speed_limit.is_infinite(), Ordering::Relaxed);
296    }
297
298    /// Returns the current speed limit.
299    ///
300    /// This method returns [infinity](`std::f64::INFINITY`) if the speed is
301    /// unlimited.
302    pub fn speed_limit(&self) -> f64 {
303        self.bucket.lock().unwrap().speed_limit
304    }
305
306    /// Obtains the total number of bytes consumed by this limiter so far.
307    ///
308    /// If more than `usize::MAX` bytes have been consumed, the count will wrap
309    /// around.
310    pub fn total_bytes_consumed(&self) -> usize {
311        self.total_bytes_consumed.load(Ordering::Relaxed)
312    }
313
314    /// Resets the total number of bytes consumed to 0.
315    pub fn reset_statistics(&self) {
316        self.total_bytes_consumed.store(0, Ordering::Relaxed);
317    }
318
319    /// Consumes several bytes from the speed limiter, returns the duration
320    /// needed to sleep to maintain the speed limit.
321    pub fn consume_duration(&self, byte_size: usize) -> Duration {
322        self.total_bytes_consumed
323            .fetch_add(byte_size, Ordering::Relaxed);
324
325        if self.is_unlimited.load(Ordering::Relaxed) {
326            return Duration::from_secs(0);
327        }
328
329        #[allow(clippy::cast_precision_loss)]
330        let size = byte_size as f64;
331
332        // Using a lock should be fine,
333        // as we're not blocking for a long time.
334        let mut bucket = self.bucket.lock().unwrap();
335        bucket.refill(self.clock.now());
336        bucket.consume(size)
337    }
338
339    /// Reverts the consumption of the given bytes size.
340    pub fn unconsume(&self, byte_size: usize) {
341        self.total_bytes_consumed
342            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |x| {
343                Some(x.saturating_sub(byte_size))
344            })
345            .unwrap();
346
347        if !self.is_unlimited.load(Ordering::Relaxed) {
348            #[allow(clippy::cast_precision_loss)]
349            let size = byte_size as f64;
350
351            let mut bucket = self.bucket.lock().unwrap();
352            bucket.unconsume(size);
353        }
354    }
355
356    /// Consumes several bytes from the speed limiter.
357    ///
358    /// The consumption happens at the beginning, *before* the speed limit is
359    /// applied. The returned future is fulfilled after the speed limit is
360    /// satified.
361    pub fn consume(&self, byte_size: usize) -> Consume<C, ()> {
362        let sleep_dur = self.consume_duration(byte_size);
363        // TODO use Duration::is_zero after `duration_zero` is stable.
364        let future = if sleep_dur == Duration::from_secs(0) {
365            None
366        } else {
367            Some(self.clock.sleep(sleep_dur))
368        };
369        Consume {
370            future,
371            result: Some(()),
372        }
373    }
374
375    /// Wraps a streaming resource with speed limiting. See documentation of
376    /// [`Resource`] for details.
377    ///
378    /// If you want to reuse the limiter after calling this function, `clone()`
379    /// the limiter first.
380    pub fn limit<R>(self, resource: R) -> Resource<R, C> {
381        Resource::new(self, resource)
382    }
383
384    /// Returns the number of active clones of this limiter.
385    ///
386    /// Currently only used for testing, and thus not exported.
387    #[cfg(test)]
388    fn shared_count(&self) -> usize {
389        Arc::strong_count(&self.bucket)
390    }
391}
392
393impl<C: BlockingClock> Limiter<C> {
394    /// Consumes several bytes, and sleeps the current thread to maintain the
395    /// speed limit.
396    ///
397    /// The consumption happens at the beginning, *before* the speed limit is
398    /// applied. This method blocks the current thread (e.g. using
399    /// [`std::thread::sleep()`] given a [`StandardClock`]), and *must not* be
400    /// used in `async` context.
401    ///
402    /// Prefer using this method instead of
403    /// [`futures_executor::block_on`]`(limiter.`[`consume`](Limiter::consume())`(size))`.
404    ///
405    /// [`futures_executor::block_on`]: https://docs.rs/futures-executor/0.3/futures_executor/fn.block_on.html
406    pub fn blocking_consume(&self, byte_size: usize) {
407        let sleep_dur = self.consume_duration(byte_size);
408        self.clock.blocking_sleep(sleep_dur);
409    }
410}
411
412/// The future returned by [`Limiter::consume()`].
413#[derive(Debug)]
414pub struct Consume<C: Clock, R> {
415    future: Option<C::Delay>,
416    result: Option<R>,
417}
418
419#[allow(clippy::use_self)] // https://github.com/rust-lang/rust-clippy/issues/3410
420impl<C: Clock, R> Consume<C, R> {
421    /// Replaces the return value of the future.
422    pub fn map<T, F: FnOnce(R) -> T>(self, f: F) -> Consume<C, T> {
423        Consume {
424            future: self.future,
425            result: self.result.map(f),
426        }
427    }
428}
429
430impl<C: Clock, R: Unpin> Future for Consume<C, R> {
431    type Output = R;
432
433    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434        let this = self.get_mut();
435        let is_ready = match &mut this.future {
436            Some(future) => Pin::new(future).poll(cx).is_ready(),
437            None => true,
438        };
439        if is_ready {
440            if let Some(value) = this.result.take() {
441                return Poll::Ready(value);
442            }
443        }
444        Poll::Pending
445    }
446}
447
448#[cfg(feature = "fused-future")]
449impl<C: Clock, R: Unpin> futures_core::future::FusedFuture for Consume<C, R> {
450    fn is_terminated(&self) -> bool {
451        self.result.is_none()
452    }
453}
454
455pin_project! {
456    /// A speed-limited wrapper of a byte stream.
457    ///
458    /// The `Resource` can be used to limit speed of
459    ///
460    /// * [`AsyncRead`](futures_io::AsyncRead)
461    /// * [`AsyncWrite`](futures_io::AsyncWrite)
462    ///
463    /// Just like [`Limiter`], the delay is inserted *after* the data are sent
464    /// or received, in which we know the exact amount of bytes transferred to
465    /// give an accurate delay. The instantaneous speed can exceed the limit if
466    /// many read/write tasks are started simultaneously. Therefore, restricting
467    /// the concurrency is also important to avoid breaching the constraint.
468    pub struct Resource<R, C: Clock> {
469        limiter: Limiter<C>,
470        #[pin]
471        resource: R,
472        waiter: Option<Consume<C, ()>>,
473    }
474}
475
476impl<R, C: Clock> Resource<R, C> {
477    /// Creates a new speed-limited resource.
478    ///
479    /// To make the resouce have unlimited speed, set the speed of [`Limiter`]
480    /// to [infinity](`std::f64::INFINITY`).
481    pub fn new(limiter: Limiter<C>, resource: R) -> Self {
482        Self {
483            limiter,
484            resource,
485            waiter: None,
486        }
487    }
488
489    /// Unwraps this value, returns the underlying resource.
490    pub fn into_inner(self) -> R {
491        self.resource
492    }
493
494    /// Gets a reference to the underlying resource.
495    ///
496    /// It is inadvisable to directly operate the underlying resource.
497    pub fn get_ref(&self) -> &R {
498        &self.resource
499    }
500
501    /// Gets a mutable reference to the underlying resource.
502    ///
503    /// It is inadvisable to directly operate the underlying resource.
504    pub fn get_mut(&mut self) -> &mut R {
505        &mut self.resource
506    }
507
508    /// Gets a pinned reference to the underlying resource.
509    ///
510    /// It is inadvisable to directly operate the underlying resource.
511    pub fn get_pin_ref(self: Pin<&Self>) -> Pin<&R> {
512        self.project_ref().resource
513    }
514
515    /// Gets a pinned mutable reference to the underlying resource.
516    ///
517    /// It is inadvisable to directly operate the underlying resource.
518    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
519        self.project().resource
520    }
521}
522
523impl<R, C: Clock> Resource<R, C> {
524    /// Wraps a poll function with a delay after it.
525    ///
526    /// This method calls the given `poll` function until it is fulfilled. After
527    /// that, the result is saved into this `Resource` instance (therefore
528    /// different `poll_***` calls should not be interleaving), while returning
529    /// `Pending` until the limiter has completely consumed the result.
530    #[allow(dead_code)]
531    pub(crate) fn poll_limited<T, B>(
532        self: Pin<&mut Self>,
533        cx: &mut Context<'_>,
534        mut buf: B,
535        length: impl FnOnce(&T, &B) -> usize,
536        poll: impl FnOnce(Pin<&mut R>, &mut Context<'_>, &mut B) -> Poll<T>,
537    ) -> Poll<T> {
538        let this = self.project();
539
540        if let Some(waiter) = this.waiter {
541            let res = Pin::new(waiter).poll(cx);
542            if res.is_pending() {
543                return Poll::Pending;
544            }
545            *this.waiter = None;
546        }
547
548        let res = poll(this.resource, cx, &mut buf);
549        if let Poll::Ready(obj) = &res {
550            let len = length(obj, &buf);
551            if len > 0 {
552                *this.waiter = Some(this.limiter.consume(len));
553            }
554        }
555        res
556    }
557}
558
559//------------------------------------------------------------------------------
560
561#[cfg(test)]
562mod tests_with_manual_clock {
563    use super::*;
564    use crate::clock::{Clock, ManualClock, Nanoseconds};
565    use futures_executor::LocalPool;
566    use futures_util::task::SpawnExt;
567    use std::{future::Future, thread::panicking};
568
569    /// Part of the `Fixture` which is to be shared with the spawned tasks.
570    #[derive(Clone)]
571    struct SharedFixture {
572        limiter: Limiter<ManualClock>,
573    }
574
575    impl SharedFixture {
576        fn now(&self) -> u64 {
577            self.limiter.clock().now().0
578        }
579
580        fn sleep(&self, nanos: u64) -> impl Future<Output = ()> + '_ {
581            self.limiter.clock().sleep(Duration::from_nanos(nanos))
582        }
583
584        fn consume(&self, bytes: usize) -> impl Future<Output = ()> + '_ {
585            self.limiter.consume(bytes)
586        }
587
588        fn unconsume(&self, bytes: usize) {
589            self.limiter.unconsume(bytes);
590        }
591    }
592
593    /// The test fixture used by all test cases.
594    struct Fixture {
595        shared: SharedFixture,
596        pool: LocalPool,
597    }
598
599    impl Fixture {
600        fn new() -> Self {
601            Self::with_min_wait(Duration::from_secs(1))
602        }
603
604        fn with_min_wait(min_wait: Duration) -> Self {
605            Self {
606                shared: SharedFixture {
607                    limiter: Limiter::builder(512.0)
608                        .refill(Duration::from_secs(1))
609                        .min_wait(min_wait)
610                        .build(),
611                },
612                pool: LocalPool::new(),
613            }
614        }
615
616        fn spawn<F, G>(&self, f: F)
617        where
618            F: FnOnce(SharedFixture) -> G,
619            G: Future<Output = ()> + Send + 'static,
620        {
621            self.pool.spawner().spawn(f(self.shared.clone())).unwrap();
622        }
623
624        fn set_time(&mut self, time: u64) {
625            self.shared.limiter.clock().set_time(Nanoseconds(time));
626            self.pool.run_until_stalled();
627        }
628
629        fn set_speed_limit(&self, limit: f64) {
630            self.shared.limiter.set_speed_limit(limit);
631        }
632
633        fn total_bytes_consumed(&self) -> usize {
634            self.shared.limiter.total_bytes_consumed()
635        }
636    }
637
638    impl Drop for Fixture {
639        fn drop(&mut self) {
640            if !panicking() {
641                // the count is 1 only if all spawned futures are finished.
642                assert_eq!(self.shared.limiter.shared_count(), 1);
643            }
644        }
645    }
646
647    #[test]
648    fn under_limit_single_thread() {
649        let mut fx = Fixture::new();
650
651        fx.spawn(|sfx| async move {
652            sfx.consume(50).await;
653            assert_eq!(sfx.now(), 0);
654            sfx.consume(51).await;
655            assert_eq!(sfx.now(), 0);
656            sfx.consume(52).await;
657            assert_eq!(sfx.now(), 0);
658            sfx.consume(53).await;
659            assert_eq!(sfx.now(), 0);
660            sfx.consume(54).await;
661            assert_eq!(sfx.now(), 0);
662            sfx.consume(55).await;
663            assert_eq!(sfx.now(), 0);
664        });
665
666        fx.set_time(0);
667        assert_eq!(fx.total_bytes_consumed(), 315);
668    }
669
670    #[test]
671    fn over_limit_single_thread() {
672        let mut fx = Fixture::new();
673
674        fx.spawn(|sfx| {
675            async move {
676                sfx.consume(200).await;
677                assert_eq!(sfx.now(), 0);
678                sfx.consume(201).await;
679                assert_eq!(sfx.now(), 0);
680                sfx.consume(202).await;
681                assert_eq!(sfx.now(), 1_177_734_375);
682                // 1_177_734_375 ns = (200+201+202)/512 seconds
683
684                sfx.consume(203).await;
685                assert_eq!(sfx.now(), 1_177_734_375);
686                sfx.consume(204).await;
687                assert_eq!(sfx.now(), 1_177_734_375);
688                sfx.consume(205).await;
689                assert_eq!(sfx.now(), 2_373_046_875);
690            }
691        });
692
693        fx.set_time(0);
694        assert_eq!(fx.total_bytes_consumed(), 603);
695        fx.set_time(1_177_734_374);
696        assert_eq!(fx.total_bytes_consumed(), 603);
697        fx.set_time(1_177_734_375);
698        assert_eq!(fx.total_bytes_consumed(), 1215);
699        fx.set_time(2_373_046_874);
700        assert_eq!(fx.total_bytes_consumed(), 1215);
701        fx.set_time(2_373_046_875);
702        assert_eq!(fx.total_bytes_consumed(), 1215);
703    }
704
705    #[test]
706    fn over_limit_single_thread_with_min_wait() {
707        let mut fx = Fixture::with_min_wait(Duration::from_millis(100));
708
709        fx.spawn(|sfx| {
710            async move {
711                sfx.consume(200).await;
712                assert_eq!(sfx.now(), 0);
713                sfx.consume(201).await;
714                assert_eq!(sfx.now(), 0);
715                sfx.consume(202).await;
716                assert_eq!(sfx.now(), 277_734_375);
717                // 277_734_375 ns = 100_000_000 + (200+201+202-512)/512 seconds
718
719                sfx.consume(203).await;
720                assert_eq!(sfx.now(), 674_218_750);
721                sfx.consume(204).await;
722                assert_eq!(sfx.now(), 1_072_656_250);
723                sfx.consume(205).await;
724                assert_eq!(sfx.now(), 1_473_046_875);
725            }
726        });
727
728        fx.set_time(0);
729        assert_eq!(fx.total_bytes_consumed(), 603);
730        fx.set_time(277_734_374);
731        assert_eq!(fx.total_bytes_consumed(), 603);
732        fx.set_time(277_734_375);
733        assert_eq!(fx.total_bytes_consumed(), 806);
734        fx.set_time(674_218_750);
735        assert_eq!(fx.total_bytes_consumed(), 1010);
736        fx.set_time(1_072_656_250);
737        assert_eq!(fx.total_bytes_consumed(), 1215);
738        fx.set_time(1_473_046_875);
739        assert_eq!(fx.total_bytes_consumed(), 1215);
740    }
741
742    #[test]
743    fn over_limit_multi_thread() {
744        let mut fx = Fixture::new();
745
746        // Due to how LocalPool does scheduling, the first task is always polled
747        // before the second task. Nevertheless, the second task can still send
748        // stuff using the timing difference.
749
750        fx.spawn(|sfx| async move {
751            sfx.consume(200).await;
752            assert_eq!(sfx.now(), 0);
753            sfx.consume(202).await;
754            assert_eq!(sfx.now(), 0);
755            sfx.consume(204).await;
756            assert_eq!(sfx.now(), 1_183_593_750);
757            sfx.consume(206).await;
758            assert_eq!(sfx.now(), 1_183_593_750);
759            sfx.consume(208).await;
760            assert_eq!(sfx.now(), 2_384_765_625);
761        });
762        fx.spawn(|sfx| async move {
763            sfx.consume(201).await;
764            assert_eq!(sfx.now(), 1_576_171_875);
765            sfx.consume(203).await;
766            assert_eq!(sfx.now(), 2_781_250_000);
767            sfx.consume(205).await;
768            assert_eq!(sfx.now(), 2_781_250_000);
769            sfx.consume(207).await;
770            assert_eq!(sfx.now(), 2_781_250_000);
771            sfx.consume(209).await;
772            assert_eq!(sfx.now(), 3_994_140_625);
773        });
774
775        fx.set_time(0);
776        assert_eq!(fx.total_bytes_consumed(), 807);
777        fx.set_time(1_183_593_749);
778        assert_eq!(fx.total_bytes_consumed(), 807);
779        fx.set_time(1_183_593_750);
780        assert_eq!(fx.total_bytes_consumed(), 1221);
781        fx.set_time(1_576_171_874);
782        assert_eq!(fx.total_bytes_consumed(), 1221);
783        fx.set_time(1_576_171_875);
784        assert_eq!(fx.total_bytes_consumed(), 1424);
785        fx.set_time(2_384_765_624);
786        assert_eq!(fx.total_bytes_consumed(), 1424);
787        fx.set_time(2_384_765_625);
788        assert_eq!(fx.total_bytes_consumed(), 1424);
789        fx.set_time(2_781_249_999);
790        assert_eq!(fx.total_bytes_consumed(), 1424);
791        fx.set_time(2_781_250_000);
792        assert_eq!(fx.total_bytes_consumed(), 2045);
793        fx.set_time(3_994_140_624);
794        assert_eq!(fx.total_bytes_consumed(), 2045);
795        fx.set_time(3_994_140_625);
796        assert_eq!(fx.total_bytes_consumed(), 2045);
797    }
798
799    #[test]
800    fn over_limit_multi_thread_2() {
801        let mut fx = Fixture::new();
802
803        fx.spawn(|sfx| async move {
804            sfx.consume(300).await;
805            assert_eq!(sfx.now(), 0);
806            sfx.consume(301).await;
807            assert_eq!(sfx.now(), 1_173_828_125);
808            sfx.consume(302).await;
809            assert_eq!(sfx.now(), 1_173_828_125);
810            sfx.consume(303).await;
811            assert_eq!(sfx.now(), 2_550_781_250);
812            sfx.consume(304).await;
813            assert_eq!(sfx.now(), 2_550_781_250);
814        });
815        fx.spawn(|sfx| async move {
816            sfx.consume(100).await;
817            assert_eq!(sfx.now(), 1_369_140_625);
818            sfx.consume(101).await;
819            assert_eq!(sfx.now(), 2_748_046_875);
820            sfx.consume(102).await;
821            assert_eq!(sfx.now(), 2_748_046_875);
822            sfx.consume(103).await;
823            assert_eq!(sfx.now(), 2_748_046_875);
824            sfx.consume(104).await;
825            assert_eq!(sfx.now(), 3_945_312_500);
826        });
827
828        fx.set_time(0);
829        assert_eq!(fx.total_bytes_consumed(), 701);
830        fx.set_time(1_173_828_125);
831        assert_eq!(fx.total_bytes_consumed(), 1306);
832        fx.set_time(1_369_140_625);
833        assert_eq!(fx.total_bytes_consumed(), 1407);
834        fx.set_time(2_550_781_250);
835        assert_eq!(fx.total_bytes_consumed(), 1711);
836        fx.set_time(2_748_046_875);
837        assert_eq!(fx.total_bytes_consumed(), 2020);
838        fx.set_time(3_945_312_500);
839        assert_eq!(fx.total_bytes_consumed(), 2020);
840    }
841
842    #[test]
843    fn over_limit_multi_thread_yielded() {
844        let mut fx = Fixture::new();
845
846        // we're adding 1ns sleeps between each consume() to act as yield points,
847        // so the consume() are evenly distributed, and can take advantage of
848        // single bursting.
849
850        fx.spawn(|sfx| async move {
851            sfx.consume(300).await;
852            assert_eq!(sfx.now(), 0);
853            sfx.sleep(1).await;
854            sfx.consume(301).await;
855            assert_eq!(sfx.now(), 1_369_140_625);
856            sfx.sleep(1).await;
857            sfx.consume(302).await;
858            assert_eq!(sfx.now(), 1_369_140_626);
859            sfx.sleep(1).await;
860            sfx.consume(303).await;
861            assert_eq!(sfx.now(), 2_748_046_875);
862            sfx.sleep(1).await;
863            sfx.consume(304).await;
864            assert_eq!(sfx.now(), 2_748_046_876);
865        });
866        fx.spawn(|sfx| async move {
867            sfx.consume(100).await;
868            assert_eq!(sfx.now(), 0);
869            sfx.sleep(1).await;
870            sfx.consume(101).await;
871            assert_eq!(sfx.now(), 1_566_406_250);
872            sfx.sleep(1).await;
873            sfx.consume(102).await;
874            assert_eq!(sfx.now(), 2_947_265_625);
875            sfx.sleep(1).await;
876            sfx.consume(103).await;
877            assert_eq!(sfx.now(), 2_947_265_626);
878            sfx.sleep(1).await;
879            sfx.consume(104).await;
880            assert_eq!(sfx.now(), 2_947_265_627);
881        });
882
883        fx.set_time(0);
884        assert_eq!(fx.total_bytes_consumed(), 400);
885        fx.set_time(1);
886        assert_eq!(fx.total_bytes_consumed(), 802);
887        fx.set_time(1_369_140_625);
888        assert_eq!(fx.total_bytes_consumed(), 802);
889        fx.set_time(1_369_140_626);
890        assert_eq!(fx.total_bytes_consumed(), 1104);
891        fx.set_time(1_566_406_250);
892        assert_eq!(fx.total_bytes_consumed(), 1407);
893        fx.set_time(1_566_406_251);
894        assert_eq!(fx.total_bytes_consumed(), 1509);
895        fx.set_time(2_748_046_875);
896        assert_eq!(fx.total_bytes_consumed(), 1509);
897        fx.set_time(2_748_046_876);
898        assert_eq!(fx.total_bytes_consumed(), 1813);
899        fx.set_time(2_947_265_625);
900        assert_eq!(fx.total_bytes_consumed(), 1813);
901        fx.set_time(2_947_265_626);
902        assert_eq!(fx.total_bytes_consumed(), 1916);
903        fx.set_time(2_947_265_627);
904        assert_eq!(fx.total_bytes_consumed(), 2020);
905    }
906
907    #[test]
908    fn unconsume() {
909        let mut fx = Fixture::new();
910
911        fx.spawn(|sfx| async move {
912            sfx.consume(200).await;
913            assert_eq!(sfx.now(), 0);
914            sfx.consume(201).await;
915            assert_eq!(sfx.now(), 0);
916            sfx.unconsume(200);
917            sfx.consume(202).await;
918            assert_eq!(sfx.now(), 0);
919            sfx.consume(200).await;
920            assert_eq!(sfx.now(), 1_177_734_375);
921
922            sfx.consume(203).await;
923            assert_eq!(sfx.now(), 1_177_734_375);
924            sfx.consume(204).await;
925            assert_eq!(sfx.now(), 1_177_734_375);
926            sfx.consume(205).await;
927            assert_eq!(sfx.now(), 2_373_046_875);
928            sfx.unconsume(2000);
929        });
930
931        fx.set_time(0);
932        assert_eq!(fx.total_bytes_consumed(), 603);
933        fx.set_time(1_177_734_374);
934        assert_eq!(fx.total_bytes_consumed(), 603);
935        fx.set_time(1_177_734_375);
936        assert_eq!(fx.total_bytes_consumed(), 1215);
937        fx.set_time(2_373_046_874);
938        assert_eq!(fx.total_bytes_consumed(), 1215);
939        fx.set_time(2_373_046_875);
940        assert_eq!(fx.total_bytes_consumed(), 0);
941    }
942
943    /// Ensures the speed limiter won't forget to enforce until a long pause
944    /// i.e. we're observing the _maximum_ speed, not the _average_ speed.
945    #[test]
946    fn hiatus() {
947        let mut fx = Fixture::new();
948
949        fx.spawn(|sfx| async move {
950            sfx.consume(400).await;
951            assert_eq!(sfx.now(), 0);
952            sfx.consume(401).await;
953            assert_eq!(sfx.now(), 1_564_453_125);
954
955            sfx.sleep(10_000_000_000).await;
956            assert_eq!(sfx.now(), 11_564_453_125);
957
958            sfx.consume(402).await;
959            assert_eq!(sfx.now(), 11_564_453_125);
960            sfx.consume(403).await;
961            assert_eq!(sfx.now(), 13_136_718_750);
962        });
963
964        fx.set_time(0);
965        assert_eq!(fx.total_bytes_consumed(), 801);
966        fx.set_time(1_564_453_125);
967        assert_eq!(fx.total_bytes_consumed(), 801);
968        fx.set_time(11_564_453_125);
969        assert_eq!(fx.total_bytes_consumed(), 1606);
970        fx.set_time(13_136_718_750);
971        assert_eq!(fx.total_bytes_consumed(), 1606);
972    }
973
974    // Ensures we could still send something much higher than the speed limit
975    #[test]
976    fn burst() {
977        let mut fx = Fixture::new();
978
979        fx.spawn(|sfx| async move {
980            sfx.consume(5000).await;
981            assert_eq!(sfx.now(), 9_765_625_000);
982            sfx.consume(5001).await;
983            assert_eq!(sfx.now(), 19_533_203_125);
984            sfx.consume(5002).await;
985            assert_eq!(sfx.now(), 29_302_734_375);
986        });
987
988        fx.set_time(0);
989        assert_eq!(fx.total_bytes_consumed(), 5000);
990        fx.set_time(9_765_625_000);
991        assert_eq!(fx.total_bytes_consumed(), 10001);
992        fx.set_time(19_533_203_125);
993        assert_eq!(fx.total_bytes_consumed(), 15003);
994        fx.set_time(29_302_734_375);
995        assert_eq!(fx.total_bytes_consumed(), 15003);
996    }
997
998    #[test]
999    fn change_speed_limit() {
1000        let mut fx = Fixture::new();
1001
1002        // we try to send 5120 bytes at granularity of 256 bytes each.
1003        fx.spawn(|sfx| async move {
1004            for _ in 0..20 {
1005                sfx.consume(256).await;
1006            }
1007        });
1008
1009        // at first, we will send 512 B/s.
1010        fx.set_time(0);
1011        assert_eq!(fx.total_bytes_consumed(), 512);
1012        fx.set_time(500_000_000);
1013        assert_eq!(fx.total_bytes_consumed(), 512);
1014        fx.set_time(1_000_000_000);
1015        assert_eq!(fx.total_bytes_consumed(), 1024);
1016        fx.set_time(1_500_000_000);
1017        assert_eq!(fx.total_bytes_consumed(), 1024);
1018
1019        // decrease the speed to 256 B/s
1020        fx.set_speed_limit(256.0);
1021        fx.set_time(1_500_000_001);
1022        assert_eq!(fx.total_bytes_consumed(), 1024);
1023
1024        fx.set_time(2_000_000_000);
1025        assert_eq!(fx.total_bytes_consumed(), 1280);
1026        fx.set_time(2_500_000_000);
1027        assert_eq!(fx.total_bytes_consumed(), 1280);
1028        fx.set_time(3_000_000_000);
1029        assert_eq!(fx.total_bytes_consumed(), 1280);
1030        fx.set_time(3_500_000_000);
1031        assert_eq!(fx.total_bytes_consumed(), 1280);
1032        fx.set_time(4_000_000_000);
1033        assert_eq!(fx.total_bytes_consumed(), 1536);
1034        fx.set_time(4_500_000_000);
1035        assert_eq!(fx.total_bytes_consumed(), 1536);
1036
1037        // increase the speed to 1024 B/s
1038        fx.set_speed_limit(1024.0);
1039        fx.set_time(4_500_000_001);
1040        assert_eq!(fx.total_bytes_consumed(), 1536);
1041
1042        fx.set_time(5_000_000_000);
1043        assert_eq!(fx.total_bytes_consumed(), 2560);
1044        fx.set_time(5_500_000_000);
1045        assert_eq!(fx.total_bytes_consumed(), 2560);
1046        fx.set_time(6_000_000_000);
1047        assert_eq!(fx.total_bytes_consumed(), 3584);
1048        fx.set_time(6_500_000_000);
1049        assert_eq!(fx.total_bytes_consumed(), 3584);
1050        fx.set_time(7_000_000_000);
1051        assert_eq!(fx.total_bytes_consumed(), 4608);
1052        fx.set_time(7_500_000_000);
1053        assert_eq!(fx.total_bytes_consumed(), 4608);
1054        fx.set_time(8_000_000_000);
1055        assert_eq!(fx.total_bytes_consumed(), 5120);
1056    }
1057
1058    /// Ensures lots of small takes won't prevent scheduling of a large take.
1059    #[test]
1060    fn thousand_cuts() {
1061        let mut fx = Fixture::new();
1062
1063        fx.spawn(|sfx| async move {
1064            for _ in 0..64 {
1065                sfx.consume(16).await;
1066            }
1067        });
1068
1069        fx.spawn(|sfx| async move {
1070            sfx.consume(555).await;
1071            assert_eq!(sfx.now(), 2_083_984_375);
1072            sfx.consume(556).await;
1073            assert_eq!(sfx.now(), 3_201_171_875);
1074        });
1075
1076        fx.set_time(0);
1077        assert_eq!(fx.total_bytes_consumed(), 1067);
1078        fx.set_time(1_000_000_000);
1079        assert_eq!(fx.total_bytes_consumed(), 1083);
1080        fx.set_time(2_000_000_000);
1081        assert_eq!(fx.total_bytes_consumed(), 1083);
1082        fx.set_time(2_083_984_375);
1083        assert_eq!(fx.total_bytes_consumed(), 1639);
1084        fx.set_time(3_000_000_000);
1085        assert_eq!(fx.total_bytes_consumed(), 2055);
1086        fx.set_time(3_201_171_875);
1087        assert_eq!(fx.total_bytes_consumed(), 2055);
1088        fx.set_time(4_000_000_000);
1089        assert_eq!(fx.total_bytes_consumed(), 2055);
1090        fx.set_time(4_169_921_875);
1091        assert_eq!(fx.total_bytes_consumed(), 2135);
1092    }
1093
1094    #[test]
1095    fn set_infinite_speed_limit() {
1096        let mut fx = Fixture::new();
1097
1098        fx.spawn(|sfx| async move {
1099            for _ in 0..1000 {
1100                sfx.consume(512).await;
1101            }
1102            sfx.sleep(1).await;
1103            for _ in 0..1000 {
1104                sfx.consume(512).await;
1105            }
1106            sfx.sleep(1).await;
1107            sfx.consume(512).await;
1108            sfx.consume(512).await;
1109        });
1110
1111        fx.set_time(0);
1112        assert_eq!(fx.total_bytes_consumed(), 512);
1113        fx.set_time(1_000_000_000);
1114        assert_eq!(fx.total_bytes_consumed(), 1024);
1115
1116        // change speed limit to infinity...
1117        fx.set_speed_limit(std::f64::INFINITY);
1118
1119        // should not affect tasks still waiting
1120        fx.set_time(1_500_000_000);
1121        assert_eq!(fx.total_bytes_consumed(), 1024);
1122
1123        // but all future consumptions will be unlimited.
1124        fx.set_time(2_000_000_000);
1125        assert_eq!(fx.total_bytes_consumed(), 512_000);
1126
1127        // should act normal for keeping speed limit at infinity.
1128        fx.set_speed_limit(std::f64::INFINITY);
1129        fx.set_time(2_000_000_001);
1130        assert_eq!(fx.total_bytes_consumed(), 1_024_000);
1131
1132        // reducing speed limit to normal.
1133        fx.set_speed_limit(512.0);
1134        fx.set_time(2_000_000_002);
1135        assert_eq!(fx.total_bytes_consumed(), 1_024_512);
1136        fx.set_time(3_000_000_002);
1137        assert_eq!(fx.total_bytes_consumed(), 1_025_024);
1138        fx.set_time(4_000_000_002);
1139        assert_eq!(fx.total_bytes_consumed(), 1_025_024);
1140    }
1141}
1142
1143#[cfg(test)]
1144#[cfg(feature = "standard-clock")]
1145mod tests_with_standard_clock {
1146    use super::*;
1147    use futures_executor::LocalPool;
1148    use futures_util::{future::join_all, task::SpawnExt};
1149    use rand::{thread_rng, Rng};
1150    use std::time::Instant;
1151
1152    // This test case is ported from RocksDB.
1153    #[test]
1154    fn rate() {
1155        eprintln!("tests_with_standard_clock::rate() will run for 20 seconds, please be patient");
1156
1157        let mut pool = LocalPool::new();
1158        let sp = pool.spawner();
1159
1160        for &i in &[1, 2, 4, 8, 16] {
1161            let target = i * 10_240;
1162
1163            let limiter = <Limiter>::new(target as f64);
1164            for &speed_limit in &[target, target * 2] {
1165                limiter.reset_statistics();
1166                limiter.set_speed_limit(speed_limit as f64);
1167                let start = Instant::now();
1168
1169                let handles = (0..i).map(|_| {
1170                    let limiter = limiter.clone();
1171                    sp.spawn_with_handle(async move {
1172                        // tests for 2 seconds.
1173                        let until = Instant::now() + Duration::from_secs(2);
1174                        while Instant::now() < until {
1175                            let size = thread_rng().gen_range(1..=target / 10);
1176                            limiter.consume(size).await;
1177                        }
1178                    })
1179                    .unwrap()
1180                });
1181
1182                pool.run_until(join_all(handles));
1183                assert_eq!(limiter.shared_count(), 1);
1184
1185                let elapsed = start.elapsed();
1186                let speed = limiter.total_bytes_consumed() as f64 / elapsed.as_secs_f64();
1187                let diff_ratio = speed / speed_limit as f64;
1188                eprintln!(
1189                    "rate: {} threads, expected speed {} B/s, actual speed {:.0} B/s, elapsed {:?}",
1190                    i, speed_limit, speed, elapsed
1191                );
1192                assert!((0.80..=1.25).contains(&diff_ratio));
1193                assert!(elapsed <= Duration::from_secs(4));
1194            }
1195        }
1196    }
1197
1198    #[test]
1199    fn block() {
1200        eprintln!("tests_with_standard_clock::block() will run for 20 seconds, please be patient");
1201
1202        for &i in &[1, 2, 4, 8, 16] {
1203            let target = i * 10_240;
1204
1205            let limiter = <Limiter>::new(target as f64);
1206            for &speed_limit in &[target, target * 2] {
1207                limiter.reset_statistics();
1208                limiter.set_speed_limit(speed_limit as f64);
1209                let start = Instant::now();
1210
1211                let handles = (0..i)
1212                    .map(|_| {
1213                        let limiter = limiter.clone();
1214                        std::thread::spawn(move || {
1215                            // tests for 2 seconds.
1216                            let until = Instant::now() + Duration::from_secs(2);
1217                            while Instant::now() < until {
1218                                let size = thread_rng().gen_range(1..=target / 10);
1219                                limiter.blocking_consume(size);
1220                            }
1221                        })
1222                    })
1223                    .collect::<Vec<_>>();
1224
1225                for jh in handles {
1226                    jh.join().unwrap();
1227                }
1228
1229                assert_eq!(limiter.shared_count(), 1);
1230
1231                let elapsed = start.elapsed();
1232                let speed = limiter.total_bytes_consumed() as f64 / elapsed.as_secs_f64();
1233                let diff_ratio = speed / speed_limit as f64;
1234                eprintln!(
1235                    "block: {} threads, expected speed {} B/s, actual speed {:.0} B/s, elapsed {:?}",
1236                    i, speed_limit, speed, elapsed
1237                );
1238                assert!((0.80..=1.25).contains(&diff_ratio));
1239                assert!(elapsed <= Duration::from_secs(4));
1240            }
1241        }
1242    }
1243}