1#[cfg(feature = "standard-clock")]
6use crate::clock::StandardClock;
7use crate::clock::{BlockingClock, Clock};
8use pin_project_lite::pin_project;
9use std::{
10 future::Future,
11 mem,
12 ops::Sub,
13 pin::Pin,
14 sync::Arc,
15 sync::{
16 atomic::{AtomicBool, AtomicUsize, Ordering},
17 Mutex,
18 },
19 task::{Context, Poll},
20 time::Duration,
21};
22
23#[derive(Debug, Clone, Copy)]
25struct Bucket<I> {
26 last_updated: I,
29 speed_limit: f64,
31 refill: f64,
33 value: f64,
36 min_wait: f64,
40}
41
42impl<I> Bucket<I> {
43 fn capacity(&self) -> f64 {
45 self.speed_limit * self.refill
46 }
47
48 fn consume(&mut self, size: f64) -> Duration {
54 self.value -= size;
55 if self.value > 0.0 {
56 Duration::from_secs(0)
57 } else {
58 let sleep_secs = self.min_wait - self.value / self.speed_limit;
59 Duration::from_secs_f64(sleep_secs)
60 }
61 }
62
63 fn unconsume(&mut self, size: f64) {
67 self.value += size;
68 }
69
70 fn set_speed_limit(&mut self, new_speed_limit: f64) {
75 let old_capacity = self.capacity();
76 self.speed_limit = new_speed_limit;
77 if new_speed_limit.is_finite() {
78 let new_capacity = self.capacity();
79 if old_capacity.is_finite() {
80 self.value += new_capacity - old_capacity;
81 } else {
82 self.value = new_capacity;
83 }
84 }
85 }
86}
87
88impl<I: Copy + Sub<Output = Duration>> Bucket<I> {
89 fn refill(&mut self, now: I) {
93 let elapsed = (now - self.last_updated).as_secs_f64();
94 let refilled = self.speed_limit * elapsed;
95 self.value = self.capacity().min(self.value + refilled);
96 self.last_updated = now;
97 }
98}
99
100#[cfg_attr(feature = "standard-clock", doc = "```rust")]
105#[cfg_attr(not(feature = "standard-clock"), doc = "```ignore")]
106#[derive(Debug)]
115pub struct Builder<C: Clock> {
116 clock: C,
117 bucket: Bucket<C::Instant>,
118 min_wait: Option<f64>,
119}
120
121impl<C: Clock> Builder<C> {
122 pub fn new(speed_limit: f64) -> Self {
126 let clock = C::default();
127 let mut result = Self {
128 bucket: Bucket {
129 last_updated: clock.now(),
130 speed_limit: 0.0,
131 refill: 0.1,
132 value: 0.0,
133 min_wait: 0.1,
134 },
135 clock,
136 min_wait: None,
137 };
138 result.speed_limit(speed_limit);
139 result
140 }
141
142 pub fn speed_limit(&mut self, speed_limit: f64) -> &mut Self {
151 assert!(speed_limit > 0.0, "speed limit must be positive");
152 self.bucket.speed_limit = speed_limit;
153 self
154 }
155
156 pub fn refill(&mut self, dur: Duration) -> &mut Self {
165 assert!(
166 dur > Duration::from_secs(0),
167 "refill duration must not be zero"
168 );
169 self.bucket.refill = dur.as_secs_f64();
170 self
171 }
172
173 pub fn min_wait(&mut self, dur: Duration) -> &mut Self {
177 self.min_wait = Some(dur.as_secs_f64());
178 self
179 }
180
181 pub fn clock(&mut self, clock: C) -> &mut Self {
183 self.clock = clock;
184 self
185 }
186
187 pub fn build(&mut self) -> Limiter<C> {
189 self.bucket.value = self.bucket.capacity();
190 self.bucket.last_updated = self.clock.now();
191 let is_unlimited = self.bucket.speed_limit.is_infinite();
192 let min_wait = self.min_wait.unwrap_or(self.bucket.refill);
193 self.bucket.min_wait = min_wait;
194 Limiter {
195 bucket: Arc::new(Mutex::new(self.bucket)),
196 clock: mem::take(&mut self.clock),
197 total_bytes_consumed: Arc::new(AtomicUsize::new(0)),
198 is_unlimited: Arc::new(AtomicBool::new(is_unlimited)),
199 }
200 }
201}
202
203macro_rules! declare_limiter {
204 ($($default_clock:tt)*) => {
205 #[cfg_attr(feature = "standard-clock", doc = "```rust")]
222 #[cfg_attr(not(feature = "standard-clock"), doc = "```ignore")]
223 #[derive(Debug, Clone)]
244 pub struct Limiter<C: Clock $($default_clock)*> {
245 bucket: Arc<Mutex<Bucket<C::Instant>>>,
248 clock: C,
250 total_bytes_consumed: Arc<AtomicUsize>,
253 is_unlimited: Arc<AtomicBool>,
255 }
256 }
257}
258
259#[cfg(feature = "standard-clock")]
260declare_limiter! { = StandardClock }
261
262#[cfg(not(feature = "standard-clock"))]
263declare_limiter! {}
264
265impl<C: Clock> Limiter<C> {
266 pub fn new(speed_limit: f64) -> Self {
270 Builder::new(speed_limit).build()
271 }
272
273 pub fn builder(speed_limit: f64) -> Builder<C> {
277 Builder::new(speed_limit)
278 }
279
280 pub fn clock(&self) -> &C {
282 &self.clock
283 }
284
285 pub fn set_speed_limit(&self, speed_limit: f64) {
292 debug_assert!(speed_limit > 0.0, "speed limit must be positive");
293 self.bucket.lock().unwrap().set_speed_limit(speed_limit);
294 self.is_unlimited
295 .store(speed_limit.is_infinite(), Ordering::Relaxed);
296 }
297
298 pub fn speed_limit(&self) -> f64 {
303 self.bucket.lock().unwrap().speed_limit
304 }
305
306 pub fn total_bytes_consumed(&self) -> usize {
311 self.total_bytes_consumed.load(Ordering::Relaxed)
312 }
313
314 pub fn reset_statistics(&self) {
316 self.total_bytes_consumed.store(0, Ordering::Relaxed);
317 }
318
319 pub fn consume_duration(&self, byte_size: usize) -> Duration {
322 self.total_bytes_consumed
323 .fetch_add(byte_size, Ordering::Relaxed);
324
325 if self.is_unlimited.load(Ordering::Relaxed) {
326 return Duration::from_secs(0);
327 }
328
329 #[allow(clippy::cast_precision_loss)]
330 let size = byte_size as f64;
331
332 let mut bucket = self.bucket.lock().unwrap();
335 bucket.refill(self.clock.now());
336 bucket.consume(size)
337 }
338
339 pub fn unconsume(&self, byte_size: usize) {
341 self.total_bytes_consumed
342 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |x| {
343 Some(x.saturating_sub(byte_size))
344 })
345 .unwrap();
346
347 if !self.is_unlimited.load(Ordering::Relaxed) {
348 #[allow(clippy::cast_precision_loss)]
349 let size = byte_size as f64;
350
351 let mut bucket = self.bucket.lock().unwrap();
352 bucket.unconsume(size);
353 }
354 }
355
356 pub fn consume(&self, byte_size: usize) -> Consume<C, ()> {
362 let sleep_dur = self.consume_duration(byte_size);
363 let future = if sleep_dur == Duration::from_secs(0) {
365 None
366 } else {
367 Some(self.clock.sleep(sleep_dur))
368 };
369 Consume {
370 future,
371 result: Some(()),
372 }
373 }
374
375 pub fn limit<R>(self, resource: R) -> Resource<R, C> {
381 Resource::new(self, resource)
382 }
383
384 #[cfg(test)]
388 fn shared_count(&self) -> usize {
389 Arc::strong_count(&self.bucket)
390 }
391}
392
393impl<C: BlockingClock> Limiter<C> {
394 pub fn blocking_consume(&self, byte_size: usize) {
407 let sleep_dur = self.consume_duration(byte_size);
408 self.clock.blocking_sleep(sleep_dur);
409 }
410}
411
412#[derive(Debug)]
414pub struct Consume<C: Clock, R> {
415 future: Option<C::Delay>,
416 result: Option<R>,
417}
418
419#[allow(clippy::use_self)] impl<C: Clock, R> Consume<C, R> {
421 pub fn map<T, F: FnOnce(R) -> T>(self, f: F) -> Consume<C, T> {
423 Consume {
424 future: self.future,
425 result: self.result.map(f),
426 }
427 }
428}
429
430impl<C: Clock, R: Unpin> Future for Consume<C, R> {
431 type Output = R;
432
433 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434 let this = self.get_mut();
435 let is_ready = match &mut this.future {
436 Some(future) => Pin::new(future).poll(cx).is_ready(),
437 None => true,
438 };
439 if is_ready {
440 if let Some(value) = this.result.take() {
441 return Poll::Ready(value);
442 }
443 }
444 Poll::Pending
445 }
446}
447
448#[cfg(feature = "fused-future")]
449impl<C: Clock, R: Unpin> futures_core::future::FusedFuture for Consume<C, R> {
450 fn is_terminated(&self) -> bool {
451 self.result.is_none()
452 }
453}
454
455pin_project! {
456 pub struct Resource<R, C: Clock> {
469 limiter: Limiter<C>,
470 #[pin]
471 resource: R,
472 waiter: Option<Consume<C, ()>>,
473 }
474}
475
476impl<R, C: Clock> Resource<R, C> {
477 pub fn new(limiter: Limiter<C>, resource: R) -> Self {
482 Self {
483 limiter,
484 resource,
485 waiter: None,
486 }
487 }
488
489 pub fn into_inner(self) -> R {
491 self.resource
492 }
493
494 pub fn get_ref(&self) -> &R {
498 &self.resource
499 }
500
501 pub fn get_mut(&mut self) -> &mut R {
505 &mut self.resource
506 }
507
508 pub fn get_pin_ref(self: Pin<&Self>) -> Pin<&R> {
512 self.project_ref().resource
513 }
514
515 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
519 self.project().resource
520 }
521}
522
523impl<R, C: Clock> Resource<R, C> {
524 #[allow(dead_code)]
531 pub(crate) fn poll_limited<T, B>(
532 self: Pin<&mut Self>,
533 cx: &mut Context<'_>,
534 mut buf: B,
535 length: impl FnOnce(&T, &B) -> usize,
536 poll: impl FnOnce(Pin<&mut R>, &mut Context<'_>, &mut B) -> Poll<T>,
537 ) -> Poll<T> {
538 let this = self.project();
539
540 if let Some(waiter) = this.waiter {
541 let res = Pin::new(waiter).poll(cx);
542 if res.is_pending() {
543 return Poll::Pending;
544 }
545 *this.waiter = None;
546 }
547
548 let res = poll(this.resource, cx, &mut buf);
549 if let Poll::Ready(obj) = &res {
550 let len = length(obj, &buf);
551 if len > 0 {
552 *this.waiter = Some(this.limiter.consume(len));
553 }
554 }
555 res
556 }
557}
558
559#[cfg(test)]
562mod tests_with_manual_clock {
563 use super::*;
564 use crate::clock::{Clock, ManualClock, Nanoseconds};
565 use futures_executor::LocalPool;
566 use futures_util::task::SpawnExt;
567 use std::{future::Future, thread::panicking};
568
569 #[derive(Clone)]
571 struct SharedFixture {
572 limiter: Limiter<ManualClock>,
573 }
574
575 impl SharedFixture {
576 fn now(&self) -> u64 {
577 self.limiter.clock().now().0
578 }
579
580 fn sleep(&self, nanos: u64) -> impl Future<Output = ()> + '_ {
581 self.limiter.clock().sleep(Duration::from_nanos(nanos))
582 }
583
584 fn consume(&self, bytes: usize) -> impl Future<Output = ()> + '_ {
585 self.limiter.consume(bytes)
586 }
587
588 fn unconsume(&self, bytes: usize) {
589 self.limiter.unconsume(bytes);
590 }
591 }
592
593 struct Fixture {
595 shared: SharedFixture,
596 pool: LocalPool,
597 }
598
599 impl Fixture {
600 fn new() -> Self {
601 Self::with_min_wait(Duration::from_secs(1))
602 }
603
604 fn with_min_wait(min_wait: Duration) -> Self {
605 Self {
606 shared: SharedFixture {
607 limiter: Limiter::builder(512.0)
608 .refill(Duration::from_secs(1))
609 .min_wait(min_wait)
610 .build(),
611 },
612 pool: LocalPool::new(),
613 }
614 }
615
616 fn spawn<F, G>(&self, f: F)
617 where
618 F: FnOnce(SharedFixture) -> G,
619 G: Future<Output = ()> + Send + 'static,
620 {
621 self.pool.spawner().spawn(f(self.shared.clone())).unwrap();
622 }
623
624 fn set_time(&mut self, time: u64) {
625 self.shared.limiter.clock().set_time(Nanoseconds(time));
626 self.pool.run_until_stalled();
627 }
628
629 fn set_speed_limit(&self, limit: f64) {
630 self.shared.limiter.set_speed_limit(limit);
631 }
632
633 fn total_bytes_consumed(&self) -> usize {
634 self.shared.limiter.total_bytes_consumed()
635 }
636 }
637
638 impl Drop for Fixture {
639 fn drop(&mut self) {
640 if !panicking() {
641 assert_eq!(self.shared.limiter.shared_count(), 1);
643 }
644 }
645 }
646
647 #[test]
648 fn under_limit_single_thread() {
649 let mut fx = Fixture::new();
650
651 fx.spawn(|sfx| async move {
652 sfx.consume(50).await;
653 assert_eq!(sfx.now(), 0);
654 sfx.consume(51).await;
655 assert_eq!(sfx.now(), 0);
656 sfx.consume(52).await;
657 assert_eq!(sfx.now(), 0);
658 sfx.consume(53).await;
659 assert_eq!(sfx.now(), 0);
660 sfx.consume(54).await;
661 assert_eq!(sfx.now(), 0);
662 sfx.consume(55).await;
663 assert_eq!(sfx.now(), 0);
664 });
665
666 fx.set_time(0);
667 assert_eq!(fx.total_bytes_consumed(), 315);
668 }
669
670 #[test]
671 fn over_limit_single_thread() {
672 let mut fx = Fixture::new();
673
674 fx.spawn(|sfx| {
675 async move {
676 sfx.consume(200).await;
677 assert_eq!(sfx.now(), 0);
678 sfx.consume(201).await;
679 assert_eq!(sfx.now(), 0);
680 sfx.consume(202).await;
681 assert_eq!(sfx.now(), 1_177_734_375);
682 sfx.consume(203).await;
685 assert_eq!(sfx.now(), 1_177_734_375);
686 sfx.consume(204).await;
687 assert_eq!(sfx.now(), 1_177_734_375);
688 sfx.consume(205).await;
689 assert_eq!(sfx.now(), 2_373_046_875);
690 }
691 });
692
693 fx.set_time(0);
694 assert_eq!(fx.total_bytes_consumed(), 603);
695 fx.set_time(1_177_734_374);
696 assert_eq!(fx.total_bytes_consumed(), 603);
697 fx.set_time(1_177_734_375);
698 assert_eq!(fx.total_bytes_consumed(), 1215);
699 fx.set_time(2_373_046_874);
700 assert_eq!(fx.total_bytes_consumed(), 1215);
701 fx.set_time(2_373_046_875);
702 assert_eq!(fx.total_bytes_consumed(), 1215);
703 }
704
705 #[test]
706 fn over_limit_single_thread_with_min_wait() {
707 let mut fx = Fixture::with_min_wait(Duration::from_millis(100));
708
709 fx.spawn(|sfx| {
710 async move {
711 sfx.consume(200).await;
712 assert_eq!(sfx.now(), 0);
713 sfx.consume(201).await;
714 assert_eq!(sfx.now(), 0);
715 sfx.consume(202).await;
716 assert_eq!(sfx.now(), 277_734_375);
717 sfx.consume(203).await;
720 assert_eq!(sfx.now(), 674_218_750);
721 sfx.consume(204).await;
722 assert_eq!(sfx.now(), 1_072_656_250);
723 sfx.consume(205).await;
724 assert_eq!(sfx.now(), 1_473_046_875);
725 }
726 });
727
728 fx.set_time(0);
729 assert_eq!(fx.total_bytes_consumed(), 603);
730 fx.set_time(277_734_374);
731 assert_eq!(fx.total_bytes_consumed(), 603);
732 fx.set_time(277_734_375);
733 assert_eq!(fx.total_bytes_consumed(), 806);
734 fx.set_time(674_218_750);
735 assert_eq!(fx.total_bytes_consumed(), 1010);
736 fx.set_time(1_072_656_250);
737 assert_eq!(fx.total_bytes_consumed(), 1215);
738 fx.set_time(1_473_046_875);
739 assert_eq!(fx.total_bytes_consumed(), 1215);
740 }
741
742 #[test]
743 fn over_limit_multi_thread() {
744 let mut fx = Fixture::new();
745
746 fx.spawn(|sfx| async move {
751 sfx.consume(200).await;
752 assert_eq!(sfx.now(), 0);
753 sfx.consume(202).await;
754 assert_eq!(sfx.now(), 0);
755 sfx.consume(204).await;
756 assert_eq!(sfx.now(), 1_183_593_750);
757 sfx.consume(206).await;
758 assert_eq!(sfx.now(), 1_183_593_750);
759 sfx.consume(208).await;
760 assert_eq!(sfx.now(), 2_384_765_625);
761 });
762 fx.spawn(|sfx| async move {
763 sfx.consume(201).await;
764 assert_eq!(sfx.now(), 1_576_171_875);
765 sfx.consume(203).await;
766 assert_eq!(sfx.now(), 2_781_250_000);
767 sfx.consume(205).await;
768 assert_eq!(sfx.now(), 2_781_250_000);
769 sfx.consume(207).await;
770 assert_eq!(sfx.now(), 2_781_250_000);
771 sfx.consume(209).await;
772 assert_eq!(sfx.now(), 3_994_140_625);
773 });
774
775 fx.set_time(0);
776 assert_eq!(fx.total_bytes_consumed(), 807);
777 fx.set_time(1_183_593_749);
778 assert_eq!(fx.total_bytes_consumed(), 807);
779 fx.set_time(1_183_593_750);
780 assert_eq!(fx.total_bytes_consumed(), 1221);
781 fx.set_time(1_576_171_874);
782 assert_eq!(fx.total_bytes_consumed(), 1221);
783 fx.set_time(1_576_171_875);
784 assert_eq!(fx.total_bytes_consumed(), 1424);
785 fx.set_time(2_384_765_624);
786 assert_eq!(fx.total_bytes_consumed(), 1424);
787 fx.set_time(2_384_765_625);
788 assert_eq!(fx.total_bytes_consumed(), 1424);
789 fx.set_time(2_781_249_999);
790 assert_eq!(fx.total_bytes_consumed(), 1424);
791 fx.set_time(2_781_250_000);
792 assert_eq!(fx.total_bytes_consumed(), 2045);
793 fx.set_time(3_994_140_624);
794 assert_eq!(fx.total_bytes_consumed(), 2045);
795 fx.set_time(3_994_140_625);
796 assert_eq!(fx.total_bytes_consumed(), 2045);
797 }
798
799 #[test]
800 fn over_limit_multi_thread_2() {
801 let mut fx = Fixture::new();
802
803 fx.spawn(|sfx| async move {
804 sfx.consume(300).await;
805 assert_eq!(sfx.now(), 0);
806 sfx.consume(301).await;
807 assert_eq!(sfx.now(), 1_173_828_125);
808 sfx.consume(302).await;
809 assert_eq!(sfx.now(), 1_173_828_125);
810 sfx.consume(303).await;
811 assert_eq!(sfx.now(), 2_550_781_250);
812 sfx.consume(304).await;
813 assert_eq!(sfx.now(), 2_550_781_250);
814 });
815 fx.spawn(|sfx| async move {
816 sfx.consume(100).await;
817 assert_eq!(sfx.now(), 1_369_140_625);
818 sfx.consume(101).await;
819 assert_eq!(sfx.now(), 2_748_046_875);
820 sfx.consume(102).await;
821 assert_eq!(sfx.now(), 2_748_046_875);
822 sfx.consume(103).await;
823 assert_eq!(sfx.now(), 2_748_046_875);
824 sfx.consume(104).await;
825 assert_eq!(sfx.now(), 3_945_312_500);
826 });
827
828 fx.set_time(0);
829 assert_eq!(fx.total_bytes_consumed(), 701);
830 fx.set_time(1_173_828_125);
831 assert_eq!(fx.total_bytes_consumed(), 1306);
832 fx.set_time(1_369_140_625);
833 assert_eq!(fx.total_bytes_consumed(), 1407);
834 fx.set_time(2_550_781_250);
835 assert_eq!(fx.total_bytes_consumed(), 1711);
836 fx.set_time(2_748_046_875);
837 assert_eq!(fx.total_bytes_consumed(), 2020);
838 fx.set_time(3_945_312_500);
839 assert_eq!(fx.total_bytes_consumed(), 2020);
840 }
841
842 #[test]
843 fn over_limit_multi_thread_yielded() {
844 let mut fx = Fixture::new();
845
846 fx.spawn(|sfx| async move {
851 sfx.consume(300).await;
852 assert_eq!(sfx.now(), 0);
853 sfx.sleep(1).await;
854 sfx.consume(301).await;
855 assert_eq!(sfx.now(), 1_369_140_625);
856 sfx.sleep(1).await;
857 sfx.consume(302).await;
858 assert_eq!(sfx.now(), 1_369_140_626);
859 sfx.sleep(1).await;
860 sfx.consume(303).await;
861 assert_eq!(sfx.now(), 2_748_046_875);
862 sfx.sleep(1).await;
863 sfx.consume(304).await;
864 assert_eq!(sfx.now(), 2_748_046_876);
865 });
866 fx.spawn(|sfx| async move {
867 sfx.consume(100).await;
868 assert_eq!(sfx.now(), 0);
869 sfx.sleep(1).await;
870 sfx.consume(101).await;
871 assert_eq!(sfx.now(), 1_566_406_250);
872 sfx.sleep(1).await;
873 sfx.consume(102).await;
874 assert_eq!(sfx.now(), 2_947_265_625);
875 sfx.sleep(1).await;
876 sfx.consume(103).await;
877 assert_eq!(sfx.now(), 2_947_265_626);
878 sfx.sleep(1).await;
879 sfx.consume(104).await;
880 assert_eq!(sfx.now(), 2_947_265_627);
881 });
882
883 fx.set_time(0);
884 assert_eq!(fx.total_bytes_consumed(), 400);
885 fx.set_time(1);
886 assert_eq!(fx.total_bytes_consumed(), 802);
887 fx.set_time(1_369_140_625);
888 assert_eq!(fx.total_bytes_consumed(), 802);
889 fx.set_time(1_369_140_626);
890 assert_eq!(fx.total_bytes_consumed(), 1104);
891 fx.set_time(1_566_406_250);
892 assert_eq!(fx.total_bytes_consumed(), 1407);
893 fx.set_time(1_566_406_251);
894 assert_eq!(fx.total_bytes_consumed(), 1509);
895 fx.set_time(2_748_046_875);
896 assert_eq!(fx.total_bytes_consumed(), 1509);
897 fx.set_time(2_748_046_876);
898 assert_eq!(fx.total_bytes_consumed(), 1813);
899 fx.set_time(2_947_265_625);
900 assert_eq!(fx.total_bytes_consumed(), 1813);
901 fx.set_time(2_947_265_626);
902 assert_eq!(fx.total_bytes_consumed(), 1916);
903 fx.set_time(2_947_265_627);
904 assert_eq!(fx.total_bytes_consumed(), 2020);
905 }
906
907 #[test]
908 fn unconsume() {
909 let mut fx = Fixture::new();
910
911 fx.spawn(|sfx| async move {
912 sfx.consume(200).await;
913 assert_eq!(sfx.now(), 0);
914 sfx.consume(201).await;
915 assert_eq!(sfx.now(), 0);
916 sfx.unconsume(200);
917 sfx.consume(202).await;
918 assert_eq!(sfx.now(), 0);
919 sfx.consume(200).await;
920 assert_eq!(sfx.now(), 1_177_734_375);
921
922 sfx.consume(203).await;
923 assert_eq!(sfx.now(), 1_177_734_375);
924 sfx.consume(204).await;
925 assert_eq!(sfx.now(), 1_177_734_375);
926 sfx.consume(205).await;
927 assert_eq!(sfx.now(), 2_373_046_875);
928 sfx.unconsume(2000);
929 });
930
931 fx.set_time(0);
932 assert_eq!(fx.total_bytes_consumed(), 603);
933 fx.set_time(1_177_734_374);
934 assert_eq!(fx.total_bytes_consumed(), 603);
935 fx.set_time(1_177_734_375);
936 assert_eq!(fx.total_bytes_consumed(), 1215);
937 fx.set_time(2_373_046_874);
938 assert_eq!(fx.total_bytes_consumed(), 1215);
939 fx.set_time(2_373_046_875);
940 assert_eq!(fx.total_bytes_consumed(), 0);
941 }
942
943 #[test]
946 fn hiatus() {
947 let mut fx = Fixture::new();
948
949 fx.spawn(|sfx| async move {
950 sfx.consume(400).await;
951 assert_eq!(sfx.now(), 0);
952 sfx.consume(401).await;
953 assert_eq!(sfx.now(), 1_564_453_125);
954
955 sfx.sleep(10_000_000_000).await;
956 assert_eq!(sfx.now(), 11_564_453_125);
957
958 sfx.consume(402).await;
959 assert_eq!(sfx.now(), 11_564_453_125);
960 sfx.consume(403).await;
961 assert_eq!(sfx.now(), 13_136_718_750);
962 });
963
964 fx.set_time(0);
965 assert_eq!(fx.total_bytes_consumed(), 801);
966 fx.set_time(1_564_453_125);
967 assert_eq!(fx.total_bytes_consumed(), 801);
968 fx.set_time(11_564_453_125);
969 assert_eq!(fx.total_bytes_consumed(), 1606);
970 fx.set_time(13_136_718_750);
971 assert_eq!(fx.total_bytes_consumed(), 1606);
972 }
973
974 #[test]
976 fn burst() {
977 let mut fx = Fixture::new();
978
979 fx.spawn(|sfx| async move {
980 sfx.consume(5000).await;
981 assert_eq!(sfx.now(), 9_765_625_000);
982 sfx.consume(5001).await;
983 assert_eq!(sfx.now(), 19_533_203_125);
984 sfx.consume(5002).await;
985 assert_eq!(sfx.now(), 29_302_734_375);
986 });
987
988 fx.set_time(0);
989 assert_eq!(fx.total_bytes_consumed(), 5000);
990 fx.set_time(9_765_625_000);
991 assert_eq!(fx.total_bytes_consumed(), 10001);
992 fx.set_time(19_533_203_125);
993 assert_eq!(fx.total_bytes_consumed(), 15003);
994 fx.set_time(29_302_734_375);
995 assert_eq!(fx.total_bytes_consumed(), 15003);
996 }
997
998 #[test]
999 fn change_speed_limit() {
1000 let mut fx = Fixture::new();
1001
1002 fx.spawn(|sfx| async move {
1004 for _ in 0..20 {
1005 sfx.consume(256).await;
1006 }
1007 });
1008
1009 fx.set_time(0);
1011 assert_eq!(fx.total_bytes_consumed(), 512);
1012 fx.set_time(500_000_000);
1013 assert_eq!(fx.total_bytes_consumed(), 512);
1014 fx.set_time(1_000_000_000);
1015 assert_eq!(fx.total_bytes_consumed(), 1024);
1016 fx.set_time(1_500_000_000);
1017 assert_eq!(fx.total_bytes_consumed(), 1024);
1018
1019 fx.set_speed_limit(256.0);
1021 fx.set_time(1_500_000_001);
1022 assert_eq!(fx.total_bytes_consumed(), 1024);
1023
1024 fx.set_time(2_000_000_000);
1025 assert_eq!(fx.total_bytes_consumed(), 1280);
1026 fx.set_time(2_500_000_000);
1027 assert_eq!(fx.total_bytes_consumed(), 1280);
1028 fx.set_time(3_000_000_000);
1029 assert_eq!(fx.total_bytes_consumed(), 1280);
1030 fx.set_time(3_500_000_000);
1031 assert_eq!(fx.total_bytes_consumed(), 1280);
1032 fx.set_time(4_000_000_000);
1033 assert_eq!(fx.total_bytes_consumed(), 1536);
1034 fx.set_time(4_500_000_000);
1035 assert_eq!(fx.total_bytes_consumed(), 1536);
1036
1037 fx.set_speed_limit(1024.0);
1039 fx.set_time(4_500_000_001);
1040 assert_eq!(fx.total_bytes_consumed(), 1536);
1041
1042 fx.set_time(5_000_000_000);
1043 assert_eq!(fx.total_bytes_consumed(), 2560);
1044 fx.set_time(5_500_000_000);
1045 assert_eq!(fx.total_bytes_consumed(), 2560);
1046 fx.set_time(6_000_000_000);
1047 assert_eq!(fx.total_bytes_consumed(), 3584);
1048 fx.set_time(6_500_000_000);
1049 assert_eq!(fx.total_bytes_consumed(), 3584);
1050 fx.set_time(7_000_000_000);
1051 assert_eq!(fx.total_bytes_consumed(), 4608);
1052 fx.set_time(7_500_000_000);
1053 assert_eq!(fx.total_bytes_consumed(), 4608);
1054 fx.set_time(8_000_000_000);
1055 assert_eq!(fx.total_bytes_consumed(), 5120);
1056 }
1057
1058 #[test]
1060 fn thousand_cuts() {
1061 let mut fx = Fixture::new();
1062
1063 fx.spawn(|sfx| async move {
1064 for _ in 0..64 {
1065 sfx.consume(16).await;
1066 }
1067 });
1068
1069 fx.spawn(|sfx| async move {
1070 sfx.consume(555).await;
1071 assert_eq!(sfx.now(), 2_083_984_375);
1072 sfx.consume(556).await;
1073 assert_eq!(sfx.now(), 3_201_171_875);
1074 });
1075
1076 fx.set_time(0);
1077 assert_eq!(fx.total_bytes_consumed(), 1067);
1078 fx.set_time(1_000_000_000);
1079 assert_eq!(fx.total_bytes_consumed(), 1083);
1080 fx.set_time(2_000_000_000);
1081 assert_eq!(fx.total_bytes_consumed(), 1083);
1082 fx.set_time(2_083_984_375);
1083 assert_eq!(fx.total_bytes_consumed(), 1639);
1084 fx.set_time(3_000_000_000);
1085 assert_eq!(fx.total_bytes_consumed(), 2055);
1086 fx.set_time(3_201_171_875);
1087 assert_eq!(fx.total_bytes_consumed(), 2055);
1088 fx.set_time(4_000_000_000);
1089 assert_eq!(fx.total_bytes_consumed(), 2055);
1090 fx.set_time(4_169_921_875);
1091 assert_eq!(fx.total_bytes_consumed(), 2135);
1092 }
1093
1094 #[test]
1095 fn set_infinite_speed_limit() {
1096 let mut fx = Fixture::new();
1097
1098 fx.spawn(|sfx| async move {
1099 for _ in 0..1000 {
1100 sfx.consume(512).await;
1101 }
1102 sfx.sleep(1).await;
1103 for _ in 0..1000 {
1104 sfx.consume(512).await;
1105 }
1106 sfx.sleep(1).await;
1107 sfx.consume(512).await;
1108 sfx.consume(512).await;
1109 });
1110
1111 fx.set_time(0);
1112 assert_eq!(fx.total_bytes_consumed(), 512);
1113 fx.set_time(1_000_000_000);
1114 assert_eq!(fx.total_bytes_consumed(), 1024);
1115
1116 fx.set_speed_limit(std::f64::INFINITY);
1118
1119 fx.set_time(1_500_000_000);
1121 assert_eq!(fx.total_bytes_consumed(), 1024);
1122
1123 fx.set_time(2_000_000_000);
1125 assert_eq!(fx.total_bytes_consumed(), 512_000);
1126
1127 fx.set_speed_limit(std::f64::INFINITY);
1129 fx.set_time(2_000_000_001);
1130 assert_eq!(fx.total_bytes_consumed(), 1_024_000);
1131
1132 fx.set_speed_limit(512.0);
1134 fx.set_time(2_000_000_002);
1135 assert_eq!(fx.total_bytes_consumed(), 1_024_512);
1136 fx.set_time(3_000_000_002);
1137 assert_eq!(fx.total_bytes_consumed(), 1_025_024);
1138 fx.set_time(4_000_000_002);
1139 assert_eq!(fx.total_bytes_consumed(), 1_025_024);
1140 }
1141}
1142
1143#[cfg(test)]
1144#[cfg(feature = "standard-clock")]
1145mod tests_with_standard_clock {
1146 use super::*;
1147 use futures_executor::LocalPool;
1148 use futures_util::{future::join_all, task::SpawnExt};
1149 use rand::{thread_rng, Rng};
1150 use std::time::Instant;
1151
1152 #[test]
1154 fn rate() {
1155 eprintln!("tests_with_standard_clock::rate() will run for 20 seconds, please be patient");
1156
1157 let mut pool = LocalPool::new();
1158 let sp = pool.spawner();
1159
1160 for &i in &[1, 2, 4, 8, 16] {
1161 let target = i * 10_240;
1162
1163 let limiter = <Limiter>::new(target as f64);
1164 for &speed_limit in &[target, target * 2] {
1165 limiter.reset_statistics();
1166 limiter.set_speed_limit(speed_limit as f64);
1167 let start = Instant::now();
1168
1169 let handles = (0..i).map(|_| {
1170 let limiter = limiter.clone();
1171 sp.spawn_with_handle(async move {
1172 let until = Instant::now() + Duration::from_secs(2);
1174 while Instant::now() < until {
1175 let size = thread_rng().gen_range(1..=target / 10);
1176 limiter.consume(size).await;
1177 }
1178 })
1179 .unwrap()
1180 });
1181
1182 pool.run_until(join_all(handles));
1183 assert_eq!(limiter.shared_count(), 1);
1184
1185 let elapsed = start.elapsed();
1186 let speed = limiter.total_bytes_consumed() as f64 / elapsed.as_secs_f64();
1187 let diff_ratio = speed / speed_limit as f64;
1188 eprintln!(
1189 "rate: {} threads, expected speed {} B/s, actual speed {:.0} B/s, elapsed {:?}",
1190 i, speed_limit, speed, elapsed
1191 );
1192 assert!((0.80..=1.25).contains(&diff_ratio));
1193 assert!(elapsed <= Duration::from_secs(4));
1194 }
1195 }
1196 }
1197
1198 #[test]
1199 fn block() {
1200 eprintln!("tests_with_standard_clock::block() will run for 20 seconds, please be patient");
1201
1202 for &i in &[1, 2, 4, 8, 16] {
1203 let target = i * 10_240;
1204
1205 let limiter = <Limiter>::new(target as f64);
1206 for &speed_limit in &[target, target * 2] {
1207 limiter.reset_statistics();
1208 limiter.set_speed_limit(speed_limit as f64);
1209 let start = Instant::now();
1210
1211 let handles = (0..i)
1212 .map(|_| {
1213 let limiter = limiter.clone();
1214 std::thread::spawn(move || {
1215 let until = Instant::now() + Duration::from_secs(2);
1217 while Instant::now() < until {
1218 let size = thread_rng().gen_range(1..=target / 10);
1219 limiter.blocking_consume(size);
1220 }
1221 })
1222 })
1223 .collect::<Vec<_>>();
1224
1225 for jh in handles {
1226 jh.join().unwrap();
1227 }
1228
1229 assert_eq!(limiter.shared_count(), 1);
1230
1231 let elapsed = start.elapsed();
1232 let speed = limiter.total_bytes_consumed() as f64 / elapsed.as_secs_f64();
1233 let diff_ratio = speed / speed_limit as f64;
1234 eprintln!(
1235 "block: {} threads, expected speed {} B/s, actual speed {:.0} B/s, elapsed {:?}",
1236 i, speed_limit, speed, elapsed
1237 );
1238 assert!((0.80..=1.25).contains(&diff_ratio));
1239 assert!(elapsed <= Duration::from_secs(4));
1240 }
1241 }
1242 }
1243}