guilder_client_hyperliquid/
rate_limiter.rs1use std::collections::VecDeque;
18use std::time::{Duration, Instant};
19use tokio::sync::Mutex;
20
21const WINDOW: Duration = Duration::from_secs(60);
24pub const MAX_WEIGHT: u32 = 1200;
25
26pub const ADDR_INITIAL_BUFFER: u64 = 10_000;
29const ADDR_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
30
31#[derive(Debug, Clone)]
34pub struct RateLimitError {
35 pub retry_after: Duration,
36}
37
38impl std::fmt::Display for RateLimitError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "rate limited, retry after {:?}", self.retry_after)
41 }
42}
43
44impl std::error::Error for RateLimitError {}
45
46pub struct RestRateLimiter {
49 entries: Mutex<VecDeque<(Instant, u32)>>,
50 max_weight: u32,
51}
52
53impl Default for RestRateLimiter {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl RestRateLimiter {
60 pub fn new() -> Self {
61 Self::new_with_budget(MAX_WEIGHT)
62 }
63
64 pub fn new_with_budget(max_weight: u32) -> Self {
65 RestRateLimiter {
66 entries: Mutex::new(VecDeque::new()),
67 max_weight,
68 }
69 }
70
71 pub async fn acquire(&self, weight: u32) -> Result<(), RateLimitError> {
73 let mut entries = self.entries.lock().await;
74 let now = Instant::now();
75
76 while let Some(&(t, _)) = entries.front() {
77 if now.duration_since(t) >= WINDOW {
78 entries.pop_front();
79 } else {
80 break;
81 }
82 }
83
84 let used: u32 = entries.iter().map(|(_, w)| w).sum();
85 if used + weight <= self.max_weight {
86 entries.push_back((now, weight));
87 return Ok(());
88 }
89
90 let oldest = entries.front().unwrap().0;
91 let retry_after = WINDOW.saturating_sub(now.duration_since(oldest));
92 Err(RateLimitError { retry_after })
93 }
94
95 pub async fn acquire_blocking(&self, weight: u32) {
97 loop {
98 match self.acquire(weight).await {
99 Ok(()) => return,
100 Err(e) => {
101 tracing::warn!(
102 retry_after_ms = e.retry_after.as_millis(),
103 "REST rate limited"
104 );
105 tokio::time::sleep(e.retry_after).await;
106 }
107 }
108 }
109 }
110}
111
112pub struct AddressRateLimiter {
121 inner: Mutex<AddrState>,
122}
123
124struct AddrState {
125 budget: u64,
126 consumed: u64,
127 last_throttled: Option<Instant>,
128}
129
130impl Default for AddressRateLimiter {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136impl AddressRateLimiter {
137 pub fn new() -> Self {
138 Self::new_with_budget(ADDR_INITIAL_BUFFER)
139 }
140
141 pub fn new_with_budget(initial_budget: u64) -> Self {
142 AddressRateLimiter {
143 inner: Mutex::new(AddrState {
144 budget: initial_budget,
145 consumed: 0,
146 last_throttled: None,
147 }),
148 }
149 }
150
151 pub async fn record_fill(&self, usdc_volume: u64) {
153 let mut s = self.inner.lock().await;
154 s.budget = s.budget.saturating_add(usdc_volume);
155 }
156
157 fn cancel_limit(default: u64) -> u64 {
158 (default + 100_000).min(default * 2)
159 }
160
161 pub async fn acquire(&self, count: u64, is_cancel: bool) -> Result<(), RateLimitError> {
163 let mut s = self.inner.lock().await;
164 let effective_limit = if is_cancel {
165 Self::cancel_limit(s.budget)
166 } else {
167 s.budget
168 };
169
170 if s.consumed + count <= effective_limit {
171 s.consumed += count;
172 return Ok(());
173 }
174
175 let now = Instant::now();
177 let retry_after = match s.last_throttled {
178 Some(t) => ADDR_THROTTLE_INTERVAL.saturating_sub(now.duration_since(t)),
179 None => Duration::ZERO,
180 };
181
182 if retry_after.is_zero() {
183 if count > 1 {
184 return Err(RateLimitError {
185 retry_after: ADDR_THROTTLE_INTERVAL,
186 });
187 }
188 s.last_throttled = Some(now);
189 s.consumed += 1;
190 return Ok(());
191 }
192
193 Err(RateLimitError { retry_after })
194 }
195
196 pub async fn acquire_blocking(&self, count: u64, is_cancel: bool) {
198 loop {
199 match self.acquire(count, is_cancel).await {
200 Ok(()) => return,
201 Err(e) => {
202 tracing::warn!(
203 retry_after_ms = e.retry_after.as_millis(),
204 "address rate limited"
205 );
206 tokio::time::sleep(e.retry_after).await;
207 }
208 }
209 }
210 }
211}