guilder_client_hyperliquid/
rate_limiter.rs1use std::collections::VecDeque;
18use std::time::{Duration, Instant};
19use tokio::sync::Mutex;
20
21const WINDOW: Duration = Duration::from_secs(60);
24pub const MAX_WEIGHT: u32 = 1200;
25
26pub const ADDR_INITIAL_BUFFER: u64 = 10_000;
29const ADDR_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
30
31#[derive(Debug, Clone)]
34pub struct RateLimitError {
35 pub retry_after: Duration,
36}
37
38impl std::fmt::Display for RateLimitError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "rate limited, retry after {:?}", self.retry_after)
41 }
42}
43
44impl std::error::Error for RateLimitError {}
45
46pub struct RestRateLimiter {
49 entries: Mutex<VecDeque<(Instant, u32)>>,
50 max_weight: u32,
51}
52
53impl Default for RestRateLimiter {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl RestRateLimiter {
60 pub fn new() -> Self {
61 Self::new_with_budget(MAX_WEIGHT)
62 }
63
64 pub fn new_with_budget(max_weight: u32) -> Self {
65 RestRateLimiter {
66 entries: Mutex::new(VecDeque::new()),
67 max_weight,
68 }
69 }
70
71 pub async fn acquire(&self, weight: u32) -> Result<(), RateLimitError> {
73 let mut entries = self.entries.lock().await;
74 let now = Instant::now();
75
76 while let Some(&(t, _)) = entries.front() {
77 if now.duration_since(t) >= WINDOW {
78 entries.pop_front();
79 } else {
80 break;
81 }
82 }
83
84 let used: u32 = entries.iter().map(|(_, w)| w).sum();
85 if used + weight <= self.max_weight {
86 entries.push_back((now, weight));
87 return Ok(());
88 }
89
90 let oldest = entries.front().unwrap().0;
91 let retry_after = WINDOW.saturating_sub(now.duration_since(oldest));
92 Err(RateLimitError { retry_after })
93 }
94
95 pub async fn acquire_blocking(&self, weight: u32, call: &str) {
97 loop {
98 match self.acquire(weight).await {
99 Ok(()) => return,
100 Err(e) => {
101 let used = {
102 let entries = self.entries.lock().await;
103 entries.iter().map(|(_, w)| w).sum::<u32>()
104 };
105 tracing::warn!(
106 call,
107 weight,
108 used,
109 budget = self.max_weight,
110 retry_after_ms = e.retry_after.as_millis(),
111 "REST rate limited"
112 );
113 tokio::time::sleep(e.retry_after).await;
114 }
115 }
116 }
117 }
118}
119
120pub struct AddressRateLimiter {
129 inner: Mutex<AddrState>,
130}
131
132struct AddrState {
133 budget: u64,
134 consumed: u64,
135 last_throttled: Option<Instant>,
136}
137
138impl Default for AddressRateLimiter {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl AddressRateLimiter {
145 pub fn new() -> Self {
146 Self::new_with_budget(ADDR_INITIAL_BUFFER)
147 }
148
149 pub fn new_with_budget(initial_budget: u64) -> Self {
150 AddressRateLimiter {
151 inner: Mutex::new(AddrState {
152 budget: initial_budget,
153 consumed: 0,
154 last_throttled: None,
155 }),
156 }
157 }
158
159 pub async fn record_fill(&self, usdc_volume: u64) {
161 let mut s = self.inner.lock().await;
162 s.budget = s.budget.saturating_add(usdc_volume);
163 }
164
165 fn cancel_limit(default: u64) -> u64 {
166 (default + 100_000).min(default * 2)
167 }
168
169 pub async fn acquire(&self, count: u64, is_cancel: bool) -> Result<(), RateLimitError> {
171 let mut s = self.inner.lock().await;
172 let effective_limit = if is_cancel {
173 Self::cancel_limit(s.budget)
174 } else {
175 s.budget
176 };
177
178 if s.consumed + count <= effective_limit {
179 s.consumed += count;
180 return Ok(());
181 }
182
183 let now = Instant::now();
185 let retry_after = match s.last_throttled {
186 Some(t) => ADDR_THROTTLE_INTERVAL.saturating_sub(now.duration_since(t)),
187 None => Duration::ZERO,
188 };
189
190 if retry_after.is_zero() {
191 if count > 1 {
192 return Err(RateLimitError {
193 retry_after: ADDR_THROTTLE_INTERVAL,
194 });
195 }
196 s.last_throttled = Some(now);
197 s.consumed += 1;
198 return Ok(());
199 }
200
201 Err(RateLimitError { retry_after })
202 }
203
204 pub async fn acquire_blocking(&self, count: u64, is_cancel: bool) {
206 loop {
207 match self.acquire(count, is_cancel).await {
208 Ok(()) => return,
209 Err(e) => {
210 tracing::warn!(
211 retry_after_ms = e.retry_after.as_millis(),
212 "address rate limited"
213 );
214 tokio::time::sleep(e.retry_after).await;
215 }
216 }
217 }
218 }
219}