1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
use std::{error::Error, fmt, num::NonZeroU32};
use super::RateLimiter;
use crate::{
clock,
middleware::RateLimitingMiddleware,
state::{DirectStateStore, NotKeyed},
Jitter, NegativeMultiDecision, NotUntil,
};
use futures_timer::Delay;
/// An error that occurs when the number of cells required in `check_n`
/// exceeds the maximum capacity of the limiter.
#[derive(Debug, Clone)]
pub struct InsufficientCapacity(pub u32);
impl fmt::Display for InsufficientCapacity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"required number of cell {} exceeds bucket's capacity",
self.0
)
}
}
impl Error for InsufficientCapacity {}
#[cfg(feature = "std")]
/// # Direct rate limiters - `async`/`await`
impl<S, C, MW> RateLimiter<NotKeyed, S, C, MW>
where
S: DirectStateStore,
C: clock::ReasonablyRealtime,
MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>,
{
/// Asynchronously resolves as soon as the rate limiter allows it.
///
/// When polled, the returned future either resolves immediately (in the case where the rate
/// limiter allows it), or else triggers an asynchronous delay, after which the rate limiter
/// is polled again. This means that the future might resolve at some later time (depending
/// on what other measurements are made on the rate limiter).
///
/// If multiple futures are dispatched against the rate limiter, it is advisable to use
/// [`until_ready_with_jitter`](#method.until_ready_with_jitter), to avoid thundering herds.
pub async fn until_ready(&self) -> MW::PositiveOutcome {
self.until_ready_with_jitter(Jitter::NONE).await
}
/// Asynchronously resolves as soon as the rate limiter allows it, with a randomized wait
/// period.
///
/// When polled, the returned future either resolves immediately (in the case where the rate
/// limiter allows it), or else triggers an asynchronous delay, after which the rate limiter
/// is polled again. This means that the future might resolve at some later time (depending
/// on what other measurements are made on the rate limiter).
///
/// This method allows for a randomized additional delay between polls of the rate limiter,
/// which can help reduce the likelihood of thundering herd effects if multiple tasks try to
/// wait on the same rate limiter.
pub async fn until_ready_with_jitter(&self, jitter: Jitter) -> MW::PositiveOutcome {
loop {
match self.check() {
Ok(x) => {
return x;
}
Err(negative) => {
let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
delay.await;
}
}
}
}
/// Asynchronously resolves as soon as the rate limiter allows it.
///
/// This is similar to `until_ready` except it waits for an abitrary number
/// of `n` cells to be available.
///
/// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
/// capacity of the rate limiter.
pub async fn until_n_ready(
&self,
n: NonZeroU32,
) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
self.until_n_ready_with_jitter(n, Jitter::NONE).await
}
/// Asynchronously resolves as soon as the rate limiter allows it, with a
/// randomized wait period.
///
/// This is similar to `until_ready_with_jitter` except it waits for an
/// abitrary number of `n` cells to be available.
///
/// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
/// capacity of the rate limiter.
pub async fn until_n_ready_with_jitter(
&self,
n: NonZeroU32,
jitter: Jitter,
) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
loop {
match self.check_n(n) {
Ok(x) => {
return Ok(x);
}
Err(NegativeMultiDecision::BatchNonConforming(_, negative)) => {
let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
delay.await;
}
Err(NegativeMultiDecision::InsufficientCapacity(cap)) => {
return Err(InsufficientCapacity(cap))
}
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn insufficient_capacity_impl_coverage() {
let i = InsufficientCapacity(1);
assert_eq!(i.0, i.clone().0);
assert!(format!("{}", i).len() > 0);
}
}