ractor/factory/
ratelim.rs1use std::collections::HashMap;
9
10use crate::concurrency::Duration;
11use crate::concurrency::Instant;
12use crate::factory::routing::RouteResult;
13use crate::factory::routing::Router;
14use crate::factory::Job;
15use crate::factory::JobKey;
16use crate::factory::WorkerId;
17use crate::factory::WorkerProperties;
18use crate::ActorProcessingErr;
19use crate::Message;
20use crate::State;
21
22pub const MAX_LB_BALANCE: usize = isize::MAX as usize;
24
25pub trait RateLimiter: State {
27 fn check(&mut self) -> bool;
32
33 fn bump(&mut self);
36}
37
38#[derive(Debug, bon::Builder)]
41pub struct RateLimitedRouter<TRouter, TRateLimit> {
42 pub router: TRouter,
44 pub rate_limiter: TRateLimit,
46}
47
48impl<TKey, TMsg, TRouter, TRateLimit> Router<TKey, TMsg> for RateLimitedRouter<TRouter, TRateLimit>
49where
50 TKey: JobKey,
51 TMsg: Message,
52 TRouter: Router<TKey, TMsg>,
53 TRateLimit: RateLimiter,
54{
55 fn route_message(
56 &mut self,
57 job: Job<TKey, TMsg>,
58 pool_size: usize,
59 worker_hint: Option<WorkerId>,
60 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
61 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
62 if !self.rate_limiter.check() {
63 Ok(RouteResult::RateLimited(job))
64 } else {
65 let result = self
66 .router
67 .route_message(job, pool_size, worker_hint, worker_pool);
68 if matches!(result, Ok(RouteResult::Handled)) {
69 self.rate_limiter.bump();
71 }
72 result
73 }
74 }
75
76 fn choose_target_worker(
77 &mut self,
78 job: &Job<TKey, TMsg>,
79 pool_size: usize,
80 worker_hint: Option<WorkerId>,
81 worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
82 ) -> Option<WorkerId> {
83 self.router
84 .choose_target_worker(job, pool_size, worker_hint, worker_pool)
85 }
86
87 fn is_factory_queueing(&self) -> bool {
88 self.router.is_factory_queueing()
89 }
90}
91
92#[derive(Debug)]
96pub struct LeakyBucketRateLimiter {
97 pub refill: usize,
99 pub interval: Duration,
101 pub max: usize,
103 pub balance: usize,
105 deadline: Instant,
107}
108
109#[bon::bon]
110impl LeakyBucketRateLimiter {
111 #[builder]
120 pub fn new(
121 refill: usize,
122 interval: Duration,
123 #[builder(default = MAX_LB_BALANCE)] max: usize,
124 initial: Option<usize>,
125 ) -> LeakyBucketRateLimiter {
126 LeakyBucketRateLimiter {
127 refill,
128 interval,
129 max,
130 balance: initial.unwrap_or(max),
131 deadline: Instant::now() + interval,
132 }
133 }
134
135 fn refresh(&mut self, now: Instant) {
136 if now < self.deadline {
137 return;
138 }
139
140 let millis = self.interval.as_millis();
142 let since = now.saturating_duration_since(self.deadline).as_millis();
143
144 let periods = usize::try_from(since / millis + 1).unwrap_or(usize::MAX);
145
146 let tokens = periods
147 .checked_mul(self.refill)
148 .unwrap_or(MAX_LB_BALANCE)
149 .min(MAX_LB_BALANCE);
150
151 let remaining_millis_until_next_deadline =
152 u64::try_from(since % millis).unwrap_or(u64::MAX);
153 self.deadline = now
154 + self
155 .interval
156 .saturating_sub(Duration::from_millis(remaining_millis_until_next_deadline));
157 self.balance = (self.balance + tokens).min(self.max);
158 }
159}
160
161impl RateLimiter for LeakyBucketRateLimiter {
162 fn check(&mut self) -> bool {
163 let now = Instant::now();
164 self.refresh(now);
165 self.balance > 0
166 }
167
168 fn bump(&mut self) {
169 if self.balance > 0 {
170 self.balance -= 1;
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use crate::concurrency::sleep;
179
180 #[crate::concurrency::test]
181 async fn test_basic_leaky_bucket() {
182 let mut limiter = LeakyBucketRateLimiter::builder()
183 .refill(1)
184 .initial(1)
185 .interval(Duration::from_millis(100))
186 .build();
187
188 assert!(limiter.check());
189 limiter.bump();
190 assert!(!limiter.check());
191
192 sleep(limiter.interval * 2).await;
193
194 assert!(limiter.check());
195 limiter.bump();
196 assert!(limiter.check());
197 limiter.bump();
198 assert!(!limiter.check());
199 }
200
201 #[crate::concurrency::test]
202 async fn test_leaky_bucket_max() {
203 let mut limiter = LeakyBucketRateLimiter::builder()
204 .refill(1)
205 .initial(1)
206 .max(1)
207 .interval(Duration::from_millis(100))
208 .build();
209
210 assert!(limiter.check());
211 limiter.bump();
212 assert!(!limiter.check());
213
214 sleep(limiter.interval * 2).await;
215
216 assert!(limiter.check());
217 limiter.bump();
218 assert!(!limiter.check());
219 }
220}