use std::collections::HashMap;
use crate::concurrency::Duration;
use crate::concurrency::Instant;
use crate::factory::routing::RouteResult;
use crate::factory::routing::Router;
use crate::factory::Job;
use crate::factory::JobKey;
use crate::factory::WorkerId;
use crate::factory::WorkerProperties;
use crate::ActorProcessingErr;
use crate::Message;
use crate::State;
pub const MAX_LB_BALANCE: usize = isize::MAX as usize;
pub trait RateLimiter: State {
fn check(&mut self) -> bool;
fn bump(&mut self);
}
#[derive(Debug, bon::Builder)]
pub struct RateLimitedRouter<TRouter, TRateLimit> {
pub router: TRouter,
pub rate_limiter: TRateLimit,
}
impl<TKey, TMsg, TRouter, TRateLimit> Router<TKey, TMsg> for RateLimitedRouter<TRouter, TRateLimit>
where
TKey: JobKey,
TMsg: Message,
TRouter: Router<TKey, TMsg>,
TRateLimit: RateLimiter,
{
fn route_message(
&mut self,
job: Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
if !self.rate_limiter.check() {
Ok(RouteResult::RateLimited(job))
} else {
let result = self
.router
.route_message(job, pool_size, worker_hint, worker_pool);
if matches!(result, Ok(RouteResult::Handled)) {
self.rate_limiter.bump();
}
result
}
}
fn choose_target_worker(
&mut self,
job: &Job<TKey, TMsg>,
pool_size: usize,
worker_hint: Option<WorkerId>,
worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
) -> Option<WorkerId> {
self.router
.choose_target_worker(job, pool_size, worker_hint, worker_pool)
}
fn is_factory_queueing(&self) -> bool {
self.router.is_factory_queueing()
}
fn on_worker_availability_change(&mut self, wid: WorkerId, available: bool) {
self.router.on_worker_availability_change(wid, available);
}
}
#[derive(Debug)]
pub struct LeakyBucketRateLimiter {
pub refill: usize,
pub interval: Duration,
pub max: usize,
pub balance: usize,
deadline: Instant,
}
#[bon::bon]
impl LeakyBucketRateLimiter {
#[builder]
pub fn new(
refill: usize,
interval: Duration,
#[builder(default = MAX_LB_BALANCE)] max: usize,
initial: Option<usize>,
) -> LeakyBucketRateLimiter {
LeakyBucketRateLimiter {
refill,
interval,
max,
balance: initial.unwrap_or(max),
deadline: Instant::now() + interval,
}
}
fn refresh(&mut self, now: Instant) {
if now < self.deadline {
return;
}
let millis = self.interval.as_millis();
let since = now.saturating_duration_since(self.deadline).as_millis();
let periods = usize::try_from(since / millis + 1).unwrap_or(usize::MAX);
let tokens = periods
.checked_mul(self.refill)
.unwrap_or(MAX_LB_BALANCE)
.min(MAX_LB_BALANCE);
let remaining_millis_until_next_deadline =
u64::try_from(since % millis).unwrap_or(u64::MAX);
self.deadline = now
+ self
.interval
.saturating_sub(Duration::from_millis(remaining_millis_until_next_deadline));
self.balance = (self.balance + tokens).min(self.max);
}
}
impl RateLimiter for LeakyBucketRateLimiter {
fn check(&mut self) -> bool {
let now = Instant::now();
self.refresh(now);
self.balance > 0
}
fn bump(&mut self) {
if self.balance > 0 {
self.balance -= 1;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::concurrency::sleep;
#[crate::concurrency::test]
async fn test_basic_leaky_bucket() {
let mut limiter = LeakyBucketRateLimiter::builder()
.refill(1)
.initial(1)
.interval(Duration::from_millis(100))
.build();
assert!(limiter.check());
limiter.bump();
assert!(!limiter.check());
sleep(limiter.interval * 2).await;
assert!(limiter.check());
limiter.bump();
assert!(limiter.check());
limiter.bump();
assert!(!limiter.check());
}
#[crate::concurrency::test]
async fn test_leaky_bucket_max() {
let mut limiter = LeakyBucketRateLimiter::builder()
.refill(1)
.initial(1)
.max(1)
.interval(Duration::from_millis(100))
.build();
assert!(limiter.check());
limiter.bump();
assert!(!limiter.check());
sleep(limiter.interval * 2).await;
assert!(limiter.check());
limiter.bump();
assert!(!limiter.check());
}
}