guilder_client_hyperliquid/
rate_limiter.rs1use std::collections::VecDeque;
18use std::time::{Duration, Instant};
19use tokio::sync::Mutex;
20
21const WINDOW: Duration = Duration::from_secs(60);
24pub const MAX_WEIGHT: u32 = 1200;
25
26pub const ADDR_INITIAL_BUFFER: u64 = 10_000;
29const ADDR_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
30
31#[derive(Debug, Clone)]
34pub struct RateLimitError {
35 pub retry_after: Duration,
36}
37
38impl std::fmt::Display for RateLimitError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "rate limited, retry after {:?}", self.retry_after)
41 }
42}
43
44impl std::error::Error for RateLimitError {}
45
46pub struct RestRateLimiter {
49 entries: Mutex<VecDeque<(Instant, u32)>>,
50}
51
52impl Default for RestRateLimiter {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl RestRateLimiter {
59 pub fn new() -> Self {
60 RestRateLimiter {
61 entries: Mutex::new(VecDeque::new()),
62 }
63 }
64
65 pub async fn acquire(&self, weight: u32) -> Result<(), RateLimitError> {
67 let mut entries = self.entries.lock().await;
68 let now = Instant::now();
69
70 while let Some(&(t, _)) = entries.front() {
71 if now.duration_since(t) >= WINDOW {
72 entries.pop_front();
73 } else {
74 break;
75 }
76 }
77
78 let used: u32 = entries.iter().map(|(_, w)| w).sum();
79 if used + weight <= MAX_WEIGHT {
80 entries.push_back((now, weight));
81 return Ok(());
82 }
83
84 let oldest = entries.front().unwrap().0;
85 let retry_after = WINDOW.saturating_sub(now.duration_since(oldest));
86 Err(RateLimitError { retry_after })
87 }
88
89 pub async fn acquire_blocking(&self, weight: u32) {
91 loop {
92 match self.acquire(weight).await {
93 Ok(()) => return,
94 Err(e) => {
95 tracing::warn!(
96 retry_after_ms = e.retry_after.as_millis(),
97 "REST rate limited"
98 );
99 tokio::time::sleep(e.retry_after).await;
100 }
101 }
102 }
103 }
104}
105
106pub struct AddressRateLimiter {
115 inner: Mutex<AddrState>,
116}
117
118struct AddrState {
119 budget: u64,
120 consumed: u64,
121 last_throttled: Option<Instant>,
122}
123
124impl Default for AddressRateLimiter {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl AddressRateLimiter {
131 pub fn new() -> Self {
132 AddressRateLimiter {
133 inner: Mutex::new(AddrState {
134 budget: ADDR_INITIAL_BUFFER,
135 consumed: 0,
136 last_throttled: None,
137 }),
138 }
139 }
140
141 pub async fn record_fill(&self, usdc_volume: u64) {
143 let mut s = self.inner.lock().await;
144 s.budget = s.budget.saturating_add(usdc_volume);
145 }
146
147 fn cancel_limit(default: u64) -> u64 {
148 (default + 100_000).min(default * 2)
149 }
150
151 pub async fn acquire(&self, count: u64, is_cancel: bool) -> Result<(), RateLimitError> {
153 let mut s = self.inner.lock().await;
154 let effective_limit = if is_cancel {
155 Self::cancel_limit(s.budget)
156 } else {
157 s.budget
158 };
159
160 if s.consumed + count <= effective_limit {
161 s.consumed += count;
162 return Ok(());
163 }
164
165 let now = Instant::now();
167 let retry_after = match s.last_throttled {
168 Some(t) => ADDR_THROTTLE_INTERVAL.saturating_sub(now.duration_since(t)),
169 None => Duration::ZERO,
170 };
171
172 if retry_after.is_zero() {
173 if count > 1 {
174 return Err(RateLimitError {
175 retry_after: ADDR_THROTTLE_INTERVAL,
176 });
177 }
178 s.last_throttled = Some(now);
179 s.consumed += 1;
180 return Ok(());
181 }
182
183 Err(RateLimitError { retry_after })
184 }
185
186 pub async fn acquire_blocking(&self, count: u64, is_cancel: bool) {
188 loop {
189 match self.acquire(count, is_cancel).await {
190 Ok(()) => return,
191 Err(e) => {
192 tracing::warn!(
193 retry_after_ms = e.retry_after.as_millis(),
194 "address rate limited"
195 );
196 tokio::time::sleep(e.retry_after).await;
197 }
198 }
199 }
200 }
201}