chainrpc_core/policy/
rate_limiter.rs1use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use crate::cu_tracker::CuCostTable;
11
12#[derive(Debug, Clone)]
14pub struct RateLimiterConfig {
15 pub capacity: f64,
17 pub refill_rate: f64,
19}
20
21impl Default for RateLimiterConfig {
22 fn default() -> Self {
23 Self {
24 capacity: 300.0, refill_rate: 300.0, }
27 }
28}
29
30struct BucketState {
31 tokens: f64,
32 last_refill: Instant,
33}
34
35pub struct TokenBucket {
37 config: RateLimiterConfig,
38 state: Mutex<BucketState>,
39}
40
41impl TokenBucket {
42 pub fn new(config: RateLimiterConfig) -> Self {
43 Self {
44 state: Mutex::new(BucketState {
45 tokens: config.capacity,
46 last_refill: Instant::now(),
47 }),
48 config,
49 }
50 }
51
52 pub fn try_acquire(&self, cost: f64) -> bool {
57 let mut state = self.state.lock().unwrap();
58 self.refill(&mut state);
59
60 if state.tokens >= cost {
61 state.tokens -= cost;
62 true
63 } else {
64 false
65 }
66 }
67
68 pub fn wait_time(&self, cost: f64) -> Duration {
70 let state = self.state.lock().unwrap();
71 let deficit = cost - state.tokens;
72 if deficit <= 0.0 {
73 Duration::ZERO
74 } else {
75 Duration::from_secs_f64(deficit / self.config.refill_rate)
76 }
77 }
78
79 pub fn available(&self) -> f64 {
81 let mut state = self.state.lock().unwrap();
82 self.refill(&mut state);
83 state.tokens
84 }
85
86 fn refill(&self, state: &mut BucketState) {
87 let now = Instant::now();
88 let elapsed = now.duration_since(state.last_refill).as_secs_f64();
89 let new_tokens = elapsed * self.config.refill_rate;
90 state.tokens = (state.tokens + new_tokens).min(self.config.capacity);
91 state.last_refill = now;
92 }
93}
94
95pub struct RateLimiter {
97 bucket: TokenBucket,
98 pub default_cost: f64,
100}
101
102impl RateLimiter {
103 pub fn new(config: RateLimiterConfig) -> Self {
104 Self {
105 bucket: TokenBucket::new(config),
106 default_cost: 1.0,
107 }
108 }
109
110 pub fn try_acquire(&self) -> bool {
112 self.bucket.try_acquire(self.default_cost)
113 }
114
115 pub fn try_acquire_cost(&self, cost: f64) -> bool {
117 self.bucket.try_acquire(cost)
118 }
119
120 pub fn wait_time(&self) -> Duration {
122 self.bucket.wait_time(self.default_cost)
123 }
124}
125
126pub struct MethodAwareRateLimiter {
132 bucket: TokenBucket,
133 cost_table: CuCostTable,
134}
135
136impl MethodAwareRateLimiter {
137 pub fn new(config: RateLimiterConfig, cost_table: CuCostTable) -> Self {
139 Self {
140 bucket: TokenBucket::new(config),
141 cost_table,
142 }
143 }
144
145 pub fn try_acquire_method(&self, method: &str) -> bool {
150 let cost = self.cost_table.cost_for(method) as f64;
151 self.bucket.try_acquire(cost)
152 }
153
154 pub fn wait_time_for_method(&self, method: &str) -> Duration {
156 let cost = self.cost_table.cost_for(method) as f64;
157 self.bucket.wait_time(cost)
158 }
159
160 pub fn bucket(&self) -> &TokenBucket {
162 &self.bucket
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn acquire_within_capacity() {
172 let rl = RateLimiter::new(RateLimiterConfig {
173 capacity: 10.0,
174 refill_rate: 1.0,
175 });
176 for _ in 0..10 {
177 assert!(rl.try_acquire(), "should succeed within capacity");
178 }
179 }
180
181 #[test]
182 fn reject_when_empty() {
183 let rl = RateLimiter::new(RateLimiterConfig {
184 capacity: 3.0,
185 refill_rate: 0.0001, });
187 rl.try_acquire();
188 rl.try_acquire();
189 rl.try_acquire();
190 assert!(!rl.try_acquire(), "should be rate limited");
192 }
193
194 #[test]
195 fn wait_time_when_empty() {
196 let rl = RateLimiter::new(RateLimiterConfig {
197 capacity: 1.0,
198 refill_rate: 10.0, });
200 rl.try_acquire(); let wait = rl.wait_time();
202 assert!(
204 wait.as_millis() >= 50 && wait.as_millis() <= 200,
205 "unexpected wait time: {wait:?}"
206 );
207 }
208
209 #[test]
212 fn method_aware_uses_cu_costs() {
213 let table = CuCostTable::alchemy_defaults();
217
218 let rl_expensive = MethodAwareRateLimiter::new(
220 RateLimiterConfig {
221 capacity: 150.0,
222 refill_rate: 0.0001, },
224 table.clone(),
225 );
226 assert!(rl_expensive.try_acquire_method("eth_getLogs")); assert!(rl_expensive.try_acquire_method("eth_getLogs")); assert!(
229 !rl_expensive.try_acquire_method("eth_getLogs"),
230 "should be rate limited after 2 expensive calls"
231 );
232
233 let rl_cheap = MethodAwareRateLimiter::new(
235 RateLimiterConfig {
236 capacity: 150.0,
237 refill_rate: 0.0001,
238 },
239 CuCostTable::alchemy_defaults(),
240 );
241 let mut count = 0;
242 while rl_cheap.try_acquire_method("eth_blockNumber") {
243 count += 1;
244 if count > 20 {
245 break; }
247 }
248 assert_eq!(
249 count, 15,
250 "cheap method (10 CU) should fit 15 times in 150 capacity"
251 );
252 }
253
254 #[test]
255 fn method_aware_wait_time() {
256 let table = CuCostTable::alchemy_defaults();
259 let rl = MethodAwareRateLimiter::new(
260 RateLimiterConfig {
261 capacity: 300.0,
262 refill_rate: 100.0, },
264 table,
265 );
266 while rl.bucket().try_acquire(100.0) {}
268
269 let wait_cheap = rl.wait_time_for_method("eth_blockNumber");
271 let wait_expensive = rl.wait_time_for_method("eth_getLogs");
273
274 assert!(
275 wait_expensive > wait_cheap,
276 "expensive method should have longer wait: expensive={wait_expensive:?}, cheap={wait_cheap:?}"
277 );
278
279 let ratio = wait_expensive.as_secs_f64() / wait_cheap.as_secs_f64();
281 assert!(
282 ratio > 5.0 && ratio < 10.0,
283 "wait time ratio should be ~7.5, got {ratio:.2}"
284 );
285 }
286
287 #[test]
288 fn method_aware_unknown_method_uses_default() {
289 let table = CuCostTable::alchemy_defaults();
292 let rl = MethodAwareRateLimiter::new(
293 RateLimiterConfig {
294 capacity: 100.0,
295 refill_rate: 0.0001,
296 },
297 table,
298 );
299
300 assert!(rl.try_acquire_method("some_unknown_rpc_method")); assert!(rl.try_acquire_method("another_unknown_method")); assert!(
303 !rl.try_acquire_method("yet_another_unknown"),
304 "unknown method should use default cost (50) and be rate limited"
305 );
306 }
307}