nautilus-hyperliquid 0.55.0

Hyperliquid integration adapter for the Nautilus trading engine
Documentation
// -------------------------------------------------------------------------------------------------
//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
//  https://nautechsystems.io
//
//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
//  You may not use this file except in compliance with the License.
//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{
    collections::hash_map::DefaultHasher,
    hash::{Hash, Hasher},
    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

use serde_json::Value;

use crate::{
    common::enums::HyperliquidInfoRequestType,
    http::query::{ExchangeAction, ExchangeActionParams, InfoRequest},
};

#[derive(Debug)]
pub struct WeightedLimiter {
    capacity: f64,       // tokens per minute (e.g., 1200)
    refill_per_sec: f64, // capacity / 60
    state: tokio::sync::Mutex<State>,
}

#[derive(Debug)]
struct State {
    tokens: f64,
    last_refill: Instant,
}

impl WeightedLimiter {
    pub fn per_minute(capacity: u32) -> Self {
        let cap = capacity as f64;
        Self {
            capacity: cap,
            refill_per_sec: cap / 60.0,
            state: tokio::sync::Mutex::new(State {
                tokens: cap,
                last_refill: Instant::now(),
            }),
        }
    }

    /// Acquire `weight` tokens, sleeping until available.
    pub async fn acquire(&self, weight: u32) {
        let need = weight as f64;
        loop {
            let mut st = self.state.lock().await;
            Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);

            if st.tokens >= need {
                st.tokens -= need;
                return;
            }
            let deficit = need - st.tokens;
            let secs = deficit / self.refill_per_sec;
            drop(st);
            tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
        }
    }

    /// Post-response debit for per-items adders (can temporarily clamp to 0).
    pub async fn debit_extra(&self, extra: u32) {
        if extra == 0 {
            return;
        }
        let mut st = self.state.lock().await;
        Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
        st.tokens = (st.tokens - extra as f64).max(0.0);
    }

    pub async fn snapshot(&self) -> RateLimitSnapshot {
        let mut st = self.state.lock().await;
        Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
        RateLimitSnapshot {
            capacity: self.capacity as u32,
            tokens: st.tokens.max(0.0) as u32,
        }
    }

    fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
        let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
        if dt > 0.0 {
            st.tokens = (st.tokens + dt * per_sec).min(cap);
            st.last_refill = Instant::now();
        }
    }
}

#[derive(Debug, Clone, Copy)]
pub struct RateLimitSnapshot {
    pub capacity: u32,
    pub tokens: u32,
}

pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
    let mut hasher = DefaultHasher::new();
    attempt.hash(&mut hasher);
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos();
    nanos.hash(&mut hasher);
    let hash = hasher.finish();

    let max = (base.as_millis() as u64)
        .saturating_mul(1u64 << attempt.min(16))
        .min(cap.as_millis() as u64)
        .max(base.as_millis() as u64);

    // Floor at 1ms to prevent zero-duration backoff
    Duration::from_millis((hash % max).max(1))
}

/// Classify Info requests into weight classes based on request type.
pub fn info_base_weight(req: &InfoRequest) -> u32 {
    match req.request_type {
        HyperliquidInfoRequestType::L2Book
        | HyperliquidInfoRequestType::AllMids
        | HyperliquidInfoRequestType::ClearinghouseState
        | HyperliquidInfoRequestType::OrderStatus
        | HyperliquidInfoRequestType::SpotClearinghouseState
        | HyperliquidInfoRequestType::ExchangeStatus
        | HyperliquidInfoRequestType::UserFees => 2,
        HyperliquidInfoRequestType::UserRole => 60,
        _ => 20,
    }
}

/// Extra weight for heavy Info endpoints: +1 per 20 (most), +1 per 60 for candleSnapshot.
/// We count the largest array in the response (robust to schema variants).
pub fn info_extra_weight(req: &InfoRequest, json: &Value) -> u32 {
    let items = match json {
        Value::Array(a) => a.len(),
        Value::Object(m) => m
            .values()
            .filter_map(|v| v.as_array().map(|a| a.len()))
            .max()
            .unwrap_or(0),
        _ => 0,
    };

    let unit = match req.request_type {
        HyperliquidInfoRequestType::CandleSnapshot => 60usize,
        HyperliquidInfoRequestType::RecentTrades
        | HyperliquidInfoRequestType::HistoricalOrders
        | HyperliquidInfoRequestType::UserFills
        | HyperliquidInfoRequestType::UserFillsByTime
        | HyperliquidInfoRequestType::FundingHistory
        | HyperliquidInfoRequestType::UserFunding
        | HyperliquidInfoRequestType::NonUserFundingUpdates
        | HyperliquidInfoRequestType::TwapHistory
        | HyperliquidInfoRequestType::UserTwapSliceFills
        | HyperliquidInfoRequestType::UserTwapSliceFillsByTime
        | HyperliquidInfoRequestType::DelegatorHistory
        | HyperliquidInfoRequestType::DelegatorRewards
        | HyperliquidInfoRequestType::ValidatorStats => 20usize,
        _ => return 0,
    };
    (items / unit) as u32
}

/// Exchange: 1 + floor(batch_len / 40)
pub fn exchange_weight(action: &ExchangeAction) -> u32 {
    // Extract batch size from typed params
    let batch_size = match &action.params {
        ExchangeActionParams::Order(params) => params.orders.len(),
        ExchangeActionParams::Cancel(params) => params.cancels.len(),
        ExchangeActionParams::Modify(_) => {
            // Modify is for a single order
            1
        }
        ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
            0
        }
    };
    1 + (batch_size as u32 / 40)
}

#[cfg(test)]
mod tests {
    use rstest::rstest;
    use rust_decimal::Decimal;

    use super::{
        super::models::{
            Cloid, HyperliquidExecCancelByCloidRequest, HyperliquidExecGrouping,
            HyperliquidExecLimitParams, HyperliquidExecOrderKind, HyperliquidExecPlaceOrderRequest,
            HyperliquidExecTif,
        },
        *,
    };
    use crate::http::query::{
        CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, OrderParams,
        UpdateLeverageParams,
    };

    #[rstest]
    #[case(1, 1)]
    #[case(39, 1)]
    #[case(40, 2)]
    #[case(79, 2)]
    #[case(80, 3)]
    fn test_exchange_weight_order_steps_every_40(
        #[case] array_len: usize,
        #[case] expected_weight: u32,
    ) {
        let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
            .map(|_| HyperliquidExecPlaceOrderRequest {
                asset: 0,
                is_buy: true,
                price: Decimal::new(50000, 0),
                size: Decimal::new(1, 0),
                reduce_only: false,
                kind: HyperliquidExecOrderKind::Limit {
                    limit: HyperliquidExecLimitParams {
                        tif: HyperliquidExecTif::Gtc,
                    },
                },
                cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
            })
            .collect();

        let action = ExchangeAction {
            action_type: ExchangeActionType::Order,
            params: ExchangeActionParams::Order(OrderParams {
                orders,
                grouping: HyperliquidExecGrouping::Na,
                builder: None,
            }),
        };
        assert_eq!(exchange_weight(&action), expected_weight);
    }

    #[rstest]
    fn test_exchange_weight_cancel() {
        let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
            .map(|_| HyperliquidExecCancelByCloidRequest {
                asset: 0,
                cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
            })
            .collect();

        let action = ExchangeAction {
            action_type: ExchangeActionType::Cancel,
            params: ExchangeActionParams::Cancel(CancelParams { cancels }),
        };
        assert_eq!(exchange_weight(&action), 2);
    }

    #[rstest]
    fn test_exchange_weight_non_batch_action() {
        let update_leverage = ExchangeAction {
            action_type: ExchangeActionType::UpdateLeverage,
            params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
                asset: 1,
                is_cross: true,
                leverage: 10,
            }),
        };
        assert_eq!(exchange_weight(&update_leverage), 1);
    }

    #[tokio::test]
    async fn test_limiter_roughly_caps_to_capacity() {
        let limiter = WeightedLimiter::per_minute(1200);

        // Consume ~1200 in quick succession
        for _ in 0..60 {
            limiter.acquire(20).await; // 60 * 20 = 1200
        }

        // The next acquire should take time for tokens to refill
        let t0 = std::time::Instant::now();
        limiter.acquire(20).await;
        let elapsed = t0.elapsed();

        // Should take at least some time to refill (allow some jitter/timing variance)
        assert!(
            elapsed.as_millis() >= 500,
            "Expected significant delay, was {}ms",
            elapsed.as_millis()
        );
    }

    #[tokio::test]
    async fn test_limiter_debit_extra_works() {
        let limiter = WeightedLimiter::per_minute(100);

        // Start with full bucket
        let snapshot = limiter.snapshot().await;
        assert_eq!(snapshot.capacity, 100);
        assert_eq!(snapshot.tokens, 100);

        // Acquire some tokens
        limiter.acquire(30).await;
        let snapshot = limiter.snapshot().await;
        assert_eq!(snapshot.tokens, 70);

        // Debit extra
        limiter.debit_extra(20).await;
        let snapshot = limiter.snapshot().await;
        assert_eq!(snapshot.tokens, 50);

        // Debit more than available (should clamp to 0)
        limiter.debit_extra(100).await;
        let snapshot = limiter.snapshot().await;
        assert_eq!(snapshot.tokens, 0);
    }

    #[rstest]
    #[case(0, 100)]
    #[case(1, 200)]
    #[case(2, 400)]
    fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
        let base = Duration::from_millis(100);
        let cap = Duration::from_secs(5);

        let delay = backoff_full_jitter(attempt, base, cap);

        assert!(delay.as_millis() >= 1);
        assert!(delay.as_millis() <= max_expected_ms as u128);
    }

    #[rstest]
    fn test_backoff_full_jitter_respects_cap() {
        let base = Duration::from_millis(100);
        let cap = Duration::from_secs(5);

        let delay_high = backoff_full_jitter(10, base, cap);
        assert!(delay_high.as_millis() <= cap.as_millis());
    }
}