titanium_http/
ratelimit.rs1use dashmap::DashMap;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::Semaphore;
10use tokio::time::sleep;
11
12pub struct RateLimiter {
14 buckets: DashMap<String, Arc<Bucket>>,
16 #[allow(dead_code)]
18 global: Arc<Semaphore>,
19 global_until: Mutex<Option<Instant>>,
21}
22
23struct Bucket {
25 remaining: Mutex<u32>,
27 reset_at: Mutex<Instant>,
29 semaphore: Semaphore,
31}
32
33impl RateLimiter {
34 pub fn new() -> Self {
36 Self {
37 buckets: DashMap::new(),
38 global: Arc::new(Semaphore::new(50)), global_until: Mutex::new(None),
40 }
41 }
42
43 pub async fn acquire(&self, route: &str) {
45 let until = { *self.global_until.lock() };
47 if let Some(until) = until {
48 if Instant::now() < until {
49 sleep(until - Instant::now()).await;
50 }
51 }
52
53 let bucket = self
55 .buckets
56 .entry(route.to_string())
57 .or_insert_with(|| {
58 Arc::new(Bucket {
59 remaining: Mutex::new(1),
60 reset_at: Mutex::new(Instant::now()),
61 semaphore: Semaphore::new(1),
62 })
63 })
64 .clone();
65
66 let _permit = bucket.semaphore.acquire().await.expect("semaphore closed");
68
69 let wait = {
71 let remaining = *bucket.remaining.lock();
72 if remaining == 0 {
73 let reset_at = *bucket.reset_at.lock();
74 if Instant::now() < reset_at {
75 Some(reset_at - Instant::now())
76 } else {
77 None
78 }
79 } else {
80 None
81 }
82 };
83
84 if let Some(duration) = wait {
85 sleep(duration).await;
86 }
87 }
88
89 pub fn update(&self, route: &str, remaining: u32, reset_after_ms: u64) {
91 if let Some(bucket) = self.buckets.get(route) {
92 *bucket.remaining.lock() = remaining;
93 *bucket.reset_at.lock() = Instant::now() + Duration::from_millis(reset_after_ms);
94 }
95 }
96
97 pub fn set_global(&self, retry_after_ms: u64) {
99 *self.global_until.lock() = Some(Instant::now() + Duration::from_millis(retry_after_ms));
100 }
101}
102
103impl Default for RateLimiter {
104 fn default() -> Self {
105 Self::new()
106 }
107}