Skip to main content

guilder_client_hyperliquid/
rate_limiter.rs

1//! Rate limiters for the Hyperliquid REST API.
2//!
3//! ## IP-based (`RestRateLimiter`)
4//! Tracks an aggregated weight budget of **1 200 per minute** in a 60-second sliding window.
5//!
6//! ## Address-based (`AddressRateLimiter`)
7//! Tracks the per-address action budget. Each address starts with 10 000 requests and accrues
8//! 1 request per 1 USDC traded. When exhausted, 1 request is allowed every 10 seconds.
9//! Cancels use a higher limit: `min(limit + 100_000, limit * 2)`.
10//! A batch of n orders/cancels counts as n requests against this limit.
11//!
12//! ## Interface
13//! Both limiters share the same interface:
14//! - `acquire(...)` → `Result<(), RateLimitError>` — returns `Err` immediately if throttled.
15//! - `acquire_blocking(...)` — retries on `Err`, logging each wait via `tracing::warn`.
16
17use std::collections::VecDeque;
18use std::time::{Duration, Instant};
19use tokio::sync::Mutex;
20
21// ── IP-based constants ────────────────────────────────────────────────────────
22
23const WINDOW: Duration = Duration::from_secs(60);
24pub const MAX_WEIGHT: u32 = 1200;
25
26// ── Address-based constants ───────────────────────────────────────────────────
27
28pub const ADDR_INITIAL_BUFFER: u64 = 10_000;
29const ADDR_THROTTLE_INTERVAL: Duration = Duration::from_secs(10);
30
31// ── Error ─────────────────────────────────────────────────────────────────────
32
33#[derive(Debug, Clone)]
34pub struct RateLimitError {
35    pub retry_after: Duration,
36}
37
38impl std::fmt::Display for RateLimitError {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "rate limited, retry after {:?}", self.retry_after)
41    }
42}
43
44impl std::error::Error for RateLimitError {}
45
46// ── IP-based limiter ──────────────────────────────────────────────────────────
47
48pub struct RestRateLimiter {
49    entries: Mutex<VecDeque<(Instant, u32)>>,
50    max_weight: u32,
51}
52
53impl Default for RestRateLimiter {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl RestRateLimiter {
60    pub fn new() -> Self {
61        Self::new_with_budget(MAX_WEIGHT)
62    }
63
64    pub fn new_with_budget(max_weight: u32) -> Self {
65        RestRateLimiter {
66            entries: Mutex::new(VecDeque::new()),
67            max_weight,
68        }
69    }
70
71    /// Returns `Err(RateLimitError)` immediately if the weight budget is exhausted.
72    pub async fn acquire(&self, weight: u32) -> Result<(), RateLimitError> {
73        let mut entries = self.entries.lock().await;
74        let now = Instant::now();
75
76        while let Some(&(t, _)) = entries.front() {
77            if now.duration_since(t) >= WINDOW {
78                entries.pop_front();
79            } else {
80                break;
81            }
82        }
83
84        let used: u32 = entries.iter().map(|(_, w)| w).sum();
85        if used + weight <= self.max_weight {
86            entries.push_back((now, weight));
87            return Ok(());
88        }
89
90        let oldest = entries.front().unwrap().0;
91        let retry_after = WINDOW.saturating_sub(now.duration_since(oldest));
92        Err(RateLimitError { retry_after })
93    }
94
95    /// Retries on `Err`, logging each wait. Resolves once the request is accepted.
96    pub async fn acquire_blocking(&self, weight: u32, call: &str) {
97        loop {
98            match self.acquire(weight).await {
99                Ok(()) => return,
100                Err(e) => {
101                    let used = {
102                        let entries = self.entries.lock().await;
103                        entries.iter().map(|(_, w)| w).sum::<u32>()
104                    };
105                    tracing::warn!(
106                        call,
107                        weight,
108                        used,
109                        budget = self.max_weight,
110                        retry_after_ms = e.retry_after.as_millis(),
111                        "REST rate limited"
112                    );
113                    tokio::time::sleep(e.retry_after).await;
114                }
115            }
116        }
117    }
118}
119
120// ── Address-based limiter ─────────────────────────────────────────────────────
121
122/// Tracks the per-address action budget.
123///
124/// Call [`record_fill`] when a fill is confirmed to grow the budget.
125/// Call [`acquire`] before every action (not info) request.
126/// Pass `is_cancel = true` for cancel actions to apply the higher cancel limit.
127/// Pass the actual batch size as `count` for batched requests.
128pub struct AddressRateLimiter {
129    inner: Mutex<AddrState>,
130}
131
132struct AddrState {
133    budget: u64,
134    consumed: u64,
135    last_throttled: Option<Instant>,
136}
137
138impl Default for AddressRateLimiter {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl AddressRateLimiter {
145    pub fn new() -> Self {
146        Self::new_with_budget(ADDR_INITIAL_BUFFER)
147    }
148
149    pub fn new_with_budget(initial_budget: u64) -> Self {
150        AddressRateLimiter {
151            inner: Mutex::new(AddrState {
152                budget: initial_budget,
153                consumed: 0,
154                last_throttled: None,
155            }),
156        }
157    }
158
159    /// Increase the budget by `usdc_volume` requests (1 request per 1 USDC traded).
160    pub async fn record_fill(&self, usdc_volume: u64) {
161        let mut s = self.inner.lock().await;
162        s.budget = s.budget.saturating_add(usdc_volume);
163    }
164
165    fn cancel_limit(default: u64) -> u64 {
166        (default + 100_000).min(default * 2)
167    }
168
169    /// Returns `Err(RateLimitError)` immediately if the address budget is exhausted.
170    pub async fn acquire(&self, count: u64, is_cancel: bool) -> Result<(), RateLimitError> {
171        let mut s = self.inner.lock().await;
172        let effective_limit = if is_cancel {
173            Self::cancel_limit(s.budget)
174        } else {
175            s.budget
176        };
177
178        if s.consumed + count <= effective_limit {
179            s.consumed += count;
180            return Ok(());
181        }
182
183        // Budget exhausted — compute throttle wait.
184        let now = Instant::now();
185        let retry_after = match s.last_throttled {
186            Some(t) => ADDR_THROTTLE_INTERVAL.saturating_sub(now.duration_since(t)),
187            None => Duration::ZERO,
188        };
189
190        if retry_after.is_zero() {
191            if count > 1 {
192                return Err(RateLimitError {
193                    retry_after: ADDR_THROTTLE_INTERVAL,
194                });
195            }
196            s.last_throttled = Some(now);
197            s.consumed += 1;
198            return Ok(());
199        }
200
201        Err(RateLimitError { retry_after })
202    }
203
204    /// Retries on `Err`, logging each wait. Resolves once the request is accepted.
205    pub async fn acquire_blocking(&self, count: u64, is_cancel: bool) {
206        loop {
207            match self.acquire(count, is_cancel).await {
208                Ok(()) => return,
209                Err(e) => {
210                    tracing::warn!(
211                        retry_after_ms = e.retry_after.as_millis(),
212                        "address rate limited"
213                    );
214                    tokio::time::sleep(e.retry_after).await;
215                }
216            }
217        }
218    }
219}