ig_client/utils/
rate_limiter.rs1use crate::constants::{BASE_DELAY_MS, SAFETY_BUFFER_MS};
5use serde::{Deserialize, Serialize};
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Mutex;
10use tokio::time::sleep;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21pub enum RateLimitType {
22 NonTradingAccount,
24 TradingAccount,
26 NonTradingApp,
28 HistoricalPrice,
30 OnePerSecond,
32}
33
34impl RateLimitType {
35 pub fn request_limit(&self) -> usize {
37 match self {
38 Self::NonTradingAccount => 30, Self::TradingAccount => 100, Self::NonTradingApp => 60, Self::HistoricalPrice => 10000, Self::OnePerSecond => 1, }
44 }
45
46 pub fn time_window_ms(&self) -> u64 {
48 match self {
49 Self::NonTradingAccount => 60_000, Self::TradingAccount => 60_000, Self::NonTradingApp => 60_000, Self::HistoricalPrice => 604_800_000, Self::OnePerSecond => 1_000, }
55 }
56
57 pub fn description(&self) -> String {
59 match self {
60 Self::NonTradingAccount => "30 requests per minute (per account)".to_string(),
61 Self::TradingAccount => "100 requests per minute (per account)".to_string(),
62 Self::NonTradingApp => "60 requests per minute (per app)".to_string(),
63 Self::HistoricalPrice => "10,000 points per week".to_string(),
64 Self::OnePerSecond => "1 request per second".to_string(),
65 }
66 }
67}
68
69#[derive(Debug)]
71pub struct RateLimiter {
72 request_history: Mutex<VecDeque<Instant>>,
74 limit_type: RateLimitType,
76 safety_margin: f64,
78}
79
80impl RateLimiter {
81 pub fn new(limit_type: RateLimitType) -> Self {
83 RateLimiter {
84 request_history: Mutex::new(VecDeque::new()),
85 limit_type,
86 safety_margin: 1.0,
87 }
88 }
89
90 pub fn with_safety_margin(&mut self, safety_margin: f64) -> Self {
97 let safety_margin = safety_margin.clamp(0.1, 1.0);
98 Self {
99 request_history: Mutex::new(VecDeque::new()),
100 limit_type: self.limit_type,
101 safety_margin,
102 }
103 }
104
105 pub fn limit_type(&self) -> RateLimitType {
107 self.limit_type
108 }
109
110 pub fn effective_limit(&self) -> usize {
112 let raw_limit = self.limit_type.request_limit();
113 (raw_limit as f64 * self.safety_margin).floor() as usize
114 }
115
116 async fn cleanup_history(&self, now: Instant) {
118 let mut history = self.request_history.lock().await;
119 let window_duration = Duration::from_millis(self.limit_type.time_window_ms());
120
121 while let Some(oldest) = history.front() {
123 if now.duration_since(*oldest) >= window_duration {
124 history.pop_front();
125 } else {
126 break;
127 }
128 }
129 }
130
131 pub async fn current_request_count(&self) -> usize {
133 let history = self.request_history.lock().await;
134 history.len()
135 }
136
137 pub async fn time_until_next_request_ms(&self) -> u64 {
140 let now = Instant::now();
141 self.cleanup_history(now).await;
142
143 let history = self.request_history.lock().await;
145 let effective_limit = self.effective_limit();
146
147 let usage_threshold = effective_limit.saturating_sub(2);
150
151 if history.len() < usage_threshold {
152 return 0;
154 }
155
156 if history.len() < effective_limit {
159 let proximity_factor = (history.len() as f64) / (effective_limit as f64);
161 return (BASE_DELAY_MS as f64 * proximity_factor * proximity_factor).round() as u64;
162 }
163
164 if let Some(oldest) = history.front() {
166 let window_duration = Duration::from_millis(self.limit_type.time_window_ms());
167 let time_since_oldest = now.duration_since(*oldest);
168
169 if time_since_oldest < window_duration {
170 let wait_time = window_duration.saturating_sub(time_since_oldest);
172 return wait_time.as_millis() as u64 + SAFETY_BUFFER_MS;
174 }
175 }
176
177 0 }
179
180 async fn record_request(&self) {
182 let now = Instant::now();
183 let mut history = self.request_history.lock().await;
184 history.push_back(now);
185 }
186
187 pub async fn notify_rate_limit_exceeded(&self) {
190 let now = Instant::now();
192 let mut history = self.request_history.lock().await;
193
194 history.clear();
197
198 let limit = self.effective_limit();
200 for _ in 0..limit {
201 history.push_back(now);
202 }
203
204 warn!(
205 "Rate limit exceeded! Enforcing mandatory cooldown period for {:?}",
206 self.limit_type
207 );
208 }
209
210 pub async fn wait(&self) {
213 self.record_request().await;
216
217 let wait_time = self.time_until_next_request_ms().await;
219
220 if wait_time > 0 {
221 info!(
222 "Rate limiter ({:?}): waiting for {}ms ({}/{} requests used in window)",
223 self.limit_type,
224 wait_time,
225 self.current_request_count().await,
226 self.effective_limit()
227 );
228 sleep(Duration::from_millis(wait_time)).await;
229 } else {
230 debug!(
231 "Rate limiter ({:?}): no wait needed ({}/{} requests used)",
232 self.limit_type,
233 self.current_request_count().await,
234 self.effective_limit()
235 );
236 }
237 }
238
239 pub async fn get_stats(&self) -> RateLimiterStats {
241 let now = Instant::now();
242 self.cleanup_history(now).await;
243
244 let history = self.request_history.lock().await;
245 let count = history.len();
246 let limit = self.effective_limit();
247 let usage_percent = if limit > 0 {
248 (count as f64 / limit as f64) * 100.0
249 } else {
250 0.0
251 };
252
253 RateLimiterStats {
254 limit_type: self.limit_type,
255 request_count: count,
256 effective_limit: limit,
257 usage_percent,
258 }
259 }
260}
261
262#[derive(Debug)]
264pub struct RateLimiterStats {
265 pub limit_type: RateLimitType,
267 pub request_count: usize,
269 pub effective_limit: usize,
271 pub usage_percent: f64,
273}
274
275impl std::fmt::Display for RateLimiterStats {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 write!(
278 f,
279 "RateLimiter({:?}): {}/{} requests ({:.1}%), window: {}ms",
280 self.limit_type,
281 self.request_count,
282 self.effective_limit,
283 self.usage_percent,
284 self.limit_type.time_window_ms()
285 )
286 }
287}
288
289pub fn account_non_trading_limiter() -> Arc<RateLimiter> {
291 static INSTANCE: once_cell::sync::Lazy<Arc<RateLimiter>> = once_cell::sync::Lazy::new(|| {
292 let mut limiter = RateLimiter::new(RateLimitType::NonTradingAccount);
293 Arc::new(limiter.with_safety_margin(0.8))
294 });
295
296 INSTANCE.clone()
297}
298
299pub fn account_trading_limiter() -> Arc<RateLimiter> {
301 static INSTANCE: once_cell::sync::Lazy<Arc<RateLimiter>> = once_cell::sync::Lazy::new(|| {
302 let mut limiter = RateLimiter::new(RateLimitType::TradingAccount);
303 Arc::new(limiter.with_safety_margin(0.8))
304 });
305
306 INSTANCE.clone()
307}
308
309pub fn app_non_trading_limiter() -> Arc<RateLimiter> {
311 static INSTANCE: once_cell::sync::Lazy<Arc<RateLimiter>> = once_cell::sync::Lazy::new(|| {
312 let mut limiter = RateLimiter::new(RateLimitType::NonTradingApp);
313 Arc::new(limiter.with_safety_margin(0.8))
314 });
315
316 INSTANCE.clone()
317}
318
319pub fn historical_price_limiter() -> Arc<RateLimiter> {
321 static INSTANCE: once_cell::sync::Lazy<Arc<RateLimiter>> = once_cell::sync::Lazy::new(|| {
322 let mut limiter = RateLimiter::new(RateLimitType::HistoricalPrice);
323 Arc::new(limiter.with_safety_margin(0.8))
324 });
325
326 INSTANCE.clone()
327}
328
329pub fn create_rate_limiter(
331 limit_type: RateLimitType,
332 safety_margin: Option<f64>,
333) -> Arc<RateLimiter> {
334 let mut limiter = RateLimiter::new(limit_type);
335 match safety_margin {
336 Some(margin) => Arc::new(limiter.with_safety_margin(margin)),
337 None => Arc::new(limiter),
338 }
339}
340
341pub fn one_per_second_limiter() -> Arc<RateLimiter> {
343 static INSTANCE: once_cell::sync::Lazy<Arc<RateLimiter>> = once_cell::sync::Lazy::new(|| {
344 let limiter = RateLimiter::new(RateLimitType::OnePerSecond);
345 Arc::new(limiter)
346 });
347
348 INSTANCE.clone()
349}
350
351pub fn global_rate_limiter() -> Arc<RateLimiter> {
353 account_non_trading_limiter()
354}
355
356#[macro_export]
358macro_rules! rate_limited_test {
359 (fn $name:ident() $body:block) => {
360 #[test]
361 #[ignore]
362 fn $name() $body
363 };
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use tokio::runtime::Runtime;
370
371 #[test]
372 fn test_rate_limiter_effective_limit() {
373 let limiter = RateLimiter::new(RateLimitType::NonTradingAccount);
374 assert_eq!(limiter.effective_limit(), 30); let mut limiter = RateLimiter::new(RateLimitType::NonTradingAccount);
377 let limiter = limiter.with_safety_margin(0.5);
378 assert_eq!(limiter.effective_limit(), 15); }
380
381 #[test]
382 fn test_rate_limiter_history_tracking() {
383 let rt = Runtime::new().unwrap();
384 rt.block_on(async {
385 let mut limiter = RateLimiter::new(RateLimitType::NonTradingAccount);
386 let limiter = limiter.with_safety_margin(1.0);
387 assert_eq!(limiter.current_request_count().await, 0);
388
389 for _ in 0..5 {
391 limiter.record_request().await;
392 }
393
394 assert_eq!(limiter.current_request_count().await, 5);
395 });
396 }
397
398 #[test]
399 fn test_rate_limiter_stats() {
400 let rt = Runtime::new().unwrap();
401 rt.block_on(async {
402 let mut limiter = RateLimiter::new(RateLimitType::NonTradingAccount);
403 let limiter = limiter.with_safety_margin(0.8);
404
405 for _ in 0..10 {
407 limiter.record_request().await;
408 }
409
410 let stats = limiter.get_stats().await;
411 assert_eq!(stats.request_count, 10);
412 assert_eq!(stats.effective_limit, 24); assert!(stats.usage_percent > 0.0);
414 });
415 }
416}