use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use serde_json::Value;
use crate::{
common::enums::HyperliquidInfoRequestType,
http::query::{ExchangeAction, ExchangeActionParams, InfoRequest},
};
#[derive(Debug)]
pub struct WeightedLimiter {
capacity: f64, refill_per_sec: f64, state: tokio::sync::Mutex<State>,
}
#[derive(Debug)]
struct State {
tokens: f64,
last_refill: Instant,
}
impl WeightedLimiter {
pub fn per_minute(capacity: u32) -> Self {
let cap = capacity as f64;
Self {
capacity: cap,
refill_per_sec: cap / 60.0,
state: tokio::sync::Mutex::new(State {
tokens: cap,
last_refill: Instant::now(),
}),
}
}
pub async fn acquire(&self, weight: u32) {
let need = weight as f64;
loop {
let mut st = self.state.lock().await;
Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
if st.tokens >= need {
st.tokens -= need;
return;
}
let deficit = need - st.tokens;
let secs = deficit / self.refill_per_sec;
drop(st);
tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
}
}
pub async fn debit_extra(&self, extra: u32) {
if extra == 0 {
return;
}
let mut st = self.state.lock().await;
Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
st.tokens = (st.tokens - extra as f64).max(0.0);
}
pub async fn snapshot(&self) -> RateLimitSnapshot {
let mut st = self.state.lock().await;
Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
RateLimitSnapshot {
capacity: self.capacity as u32,
tokens: st.tokens.max(0.0) as u32,
}
}
fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
if dt > 0.0 {
st.tokens = (st.tokens + dt * per_sec).min(cap);
st.last_refill = Instant::now();
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct RateLimitSnapshot {
pub capacity: u32,
pub tokens: u32,
}
pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
let mut hasher = DefaultHasher::new();
attempt.hash(&mut hasher);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
nanos.hash(&mut hasher);
let hash = hasher.finish();
let max = (base.as_millis() as u64)
.saturating_mul(1u64 << attempt.min(16))
.min(cap.as_millis() as u64)
.max(base.as_millis() as u64);
Duration::from_millis((hash % max).max(1))
}
pub fn info_base_weight(req: &InfoRequest) -> u32 {
match req.request_type {
HyperliquidInfoRequestType::L2Book
| HyperliquidInfoRequestType::AllMids
| HyperliquidInfoRequestType::ClearinghouseState
| HyperliquidInfoRequestType::OrderStatus
| HyperliquidInfoRequestType::SpotClearinghouseState
| HyperliquidInfoRequestType::ExchangeStatus
| HyperliquidInfoRequestType::UserFees => 2,
HyperliquidInfoRequestType::UserRole => 60,
_ => 20,
}
}
pub fn info_extra_weight(req: &InfoRequest, json: &Value) -> u32 {
let items = match json {
Value::Array(a) => a.len(),
Value::Object(m) => m
.values()
.filter_map(|v| v.as_array().map(|a| a.len()))
.max()
.unwrap_or(0),
_ => 0,
};
let unit = match req.request_type {
HyperliquidInfoRequestType::CandleSnapshot => 60usize,
HyperliquidInfoRequestType::RecentTrades
| HyperliquidInfoRequestType::HistoricalOrders
| HyperliquidInfoRequestType::UserFills
| HyperliquidInfoRequestType::UserFillsByTime
| HyperliquidInfoRequestType::FundingHistory
| HyperliquidInfoRequestType::UserFunding
| HyperliquidInfoRequestType::NonUserFundingUpdates
| HyperliquidInfoRequestType::TwapHistory
| HyperliquidInfoRequestType::UserTwapSliceFills
| HyperliquidInfoRequestType::UserTwapSliceFillsByTime
| HyperliquidInfoRequestType::DelegatorHistory
| HyperliquidInfoRequestType::DelegatorRewards
| HyperliquidInfoRequestType::ValidatorStats => 20usize,
_ => return 0,
};
(items / unit) as u32
}
pub fn exchange_weight(action: &ExchangeAction) -> u32 {
let batch_size = match &action.params {
ExchangeActionParams::Order(params) => params.orders.len(),
ExchangeActionParams::Cancel(params) => params.cancels.len(),
ExchangeActionParams::Modify(_) => {
1
}
ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
0
}
};
1 + (batch_size as u32 / 40)
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use rust_decimal::Decimal;
use super::{
super::models::{
Cloid, HyperliquidExecCancelByCloidRequest, HyperliquidExecGrouping,
HyperliquidExecLimitParams, HyperliquidExecOrderKind, HyperliquidExecPlaceOrderRequest,
HyperliquidExecTif,
},
*,
};
use crate::http::query::{
CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, OrderParams,
UpdateLeverageParams,
};
#[rstest]
#[case(1, 1)]
#[case(39, 1)]
#[case(40, 2)]
#[case(79, 2)]
#[case(80, 3)]
fn test_exchange_weight_order_steps_every_40(
#[case] array_len: usize,
#[case] expected_weight: u32,
) {
let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
.map(|_| HyperliquidExecPlaceOrderRequest {
asset: 0,
is_buy: true,
price: Decimal::new(50000, 0),
size: Decimal::new(1, 0),
reduce_only: false,
kind: HyperliquidExecOrderKind::Limit {
limit: HyperliquidExecLimitParams {
tif: HyperliquidExecTif::Gtc,
},
},
cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
})
.collect();
let action = ExchangeAction {
action_type: ExchangeActionType::Order,
params: ExchangeActionParams::Order(OrderParams {
orders,
grouping: HyperliquidExecGrouping::Na,
builder: None,
}),
};
assert_eq!(exchange_weight(&action), expected_weight);
}
#[rstest]
fn test_exchange_weight_cancel() {
let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
.map(|_| HyperliquidExecCancelByCloidRequest {
asset: 0,
cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
})
.collect();
let action = ExchangeAction {
action_type: ExchangeActionType::Cancel,
params: ExchangeActionParams::Cancel(CancelParams { cancels }),
};
assert_eq!(exchange_weight(&action), 2);
}
#[rstest]
fn test_exchange_weight_non_batch_action() {
let update_leverage = ExchangeAction {
action_type: ExchangeActionType::UpdateLeverage,
params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
asset: 1,
is_cross: true,
leverage: 10,
}),
};
assert_eq!(exchange_weight(&update_leverage), 1);
}
#[tokio::test]
async fn test_limiter_roughly_caps_to_capacity() {
let limiter = WeightedLimiter::per_minute(1200);
for _ in 0..60 {
limiter.acquire(20).await; }
let t0 = std::time::Instant::now();
limiter.acquire(20).await;
let elapsed = t0.elapsed();
assert!(
elapsed.as_millis() >= 500,
"Expected significant delay, was {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
async fn test_limiter_debit_extra_works() {
let limiter = WeightedLimiter::per_minute(100);
let snapshot = limiter.snapshot().await;
assert_eq!(snapshot.capacity, 100);
assert_eq!(snapshot.tokens, 100);
limiter.acquire(30).await;
let snapshot = limiter.snapshot().await;
assert_eq!(snapshot.tokens, 70);
limiter.debit_extra(20).await;
let snapshot = limiter.snapshot().await;
assert_eq!(snapshot.tokens, 50);
limiter.debit_extra(100).await;
let snapshot = limiter.snapshot().await;
assert_eq!(snapshot.tokens, 0);
}
#[rstest]
#[case(0, 100)]
#[case(1, 200)]
#[case(2, 400)]
fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
let base = Duration::from_millis(100);
let cap = Duration::from_secs(5);
let delay = backoff_full_jitter(attempt, base, cap);
assert!(delay.as_millis() >= 1);
assert!(delay.as_millis() <= max_expected_ms as u128);
}
#[rstest]
fn test_backoff_full_jitter_respects_cap() {
let base = Duration::from_millis(100);
let cap = Duration::from_secs(5);
let delay_high = backoff_full_jitter(10, base, cap);
assert!(delay_high.as_millis() <= cap.as_millis());
}
}