use crate::lib::*;
use crate::thread_safety::ThreadsafeWrapper;
use crate::{
algorithms::{Algorithm, RateLimitState},
clock, InconsistentCapacity, NegativeMultiDecision, NonConformance,
};
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct LeakyBucket<P: clock::Reference = <clock::DefaultClock as clock::Clock>::Instant> {
full: Duration,
token_interval: Duration,
point: PhantomData<P>,
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct State<P: clock::Reference>(ThreadsafeWrapper<BucketState<P>>);
impl<P: clock::Reference> Default for State<P> {
fn default() -> Self {
State(Default::default())
}
}
impl<P: clock::Reference> RateLimitState<LeakyBucket<P>, P> for State<P> {
fn last_touched(&self, _params: &LeakyBucket<P>) -> Option<P> {
let data = self.0.snapshot();
Some(data.last_update? + data.level)
}
}
#[cfg(feature = "std")]
mod std {
use crate::clock;
use evmap::ShallowCopy;
impl<P: clock::Reference> ShallowCopy for super::State<P> {
unsafe fn shallow_copy(&mut self) -> Self {
super::State(self.0.shallow_copy())
}
}
}
#[derive(Debug, PartialEq)]
pub struct TooEarly<P: clock::Reference>(P, Duration);
impl<P: clock::Reference> fmt::Display for TooEarly<P> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "rate-limited until {:?}", self.0 + self.1)
}
}
impl<P: clock::Reference> NonConformance<P> for TooEarly<P> {
#[inline]
fn earliest_possible(&self) -> P {
self.0 + self.1
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BucketState<P: clock::Reference> {
level: Duration,
last_update: Option<P>,
}
impl<P: clock::Reference> Default for BucketState<P> {
fn default() -> Self {
BucketState {
level: Duration::new(0, 0),
last_update: None,
}
}
}
impl<P: clock::Reference> Algorithm<P> for LeakyBucket<P> {
type BucketState = State<P>;
type NegativeDecision = TooEarly<P>;
fn construct(
capacity: NonZeroU32,
cell_weight: NonZeroU32,
per_time_unit: Duration,
) -> Result<Self, InconsistentCapacity> {
if capacity < cell_weight {
return Err(InconsistentCapacity::new(capacity, cell_weight));
}
let token_interval = (per_time_unit * cell_weight.get()) / capacity.get();
Ok(LeakyBucket {
full: per_time_unit,
token_interval,
point: PhantomData,
})
}
fn test_n_and_update(
&self,
state: &Self::BucketState,
n: u32,
t0: P,
) -> Result<(), NegativeMultiDecision<TooEarly<P>>> {
let full = self.full;
let weight = self.token_interval * n;
if weight > self.full {
return Err(NegativeMultiDecision::InsufficientCapacity(n));
}
state.0.measure_and_replace(|state| {
let mut new = BucketState {
last_update: Some(t0),
level: Duration::new(0, 0),
};
let last = state.last_update.unwrap_or(t0);
let t0 = cmp::max(t0, last);
new.level = state.level - cmp::min(t0.duration_since(last), state.level);
if weight + new.level <= full {
new.level += weight;
(Ok(()), Some(new))
} else {
let wait_period = (weight + new.level) - full;
(
Err(NegativeMultiDecision::BatchNonConforming(
n,
TooEarly(t0, wait_period),
)),
None,
)
}
})
}
}