use std::{
collections::HashMap,
env,
num::NonZeroU32,
sync::{Arc, LazyLock},
time::Duration,
};
use ahash::AHashMap;
use anyhow::Context;
use nautilus_core::{
AtomicMap, UUID4, UnixNanos,
consts::NAUTILUS_USER_AGENT,
time::{AtomicTime, get_atomic_clock_realtime},
};
use nautilus_model::{
data::{Bar, BarType},
enums::{
AccountType, BarAggregation, CurrencyType, OrderSide, OrderStatus, OrderType, TimeInForce,
TriggerType,
},
events::AccountState,
identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
instruments::{CurrencyPair, Instrument, InstrumentAny},
orders::{Order, OrderAny},
reports::{FillReport, OrderStatusReport, PositionStatusReport},
types::{AccountBalance, Currency, Money, Price, Quantity},
};
use nautilus_network::{
http::{HttpClient, HttpClientError, HttpResponse, Method, USER_AGENT},
ratelimiter::quota::Quota,
};
use rust_decimal::Decimal;
use serde_json::Value;
use ustr::Ustr;
use crate::{
common::{
consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_ADDRESS, exchange_url, info_url},
credential::{Secrets, VaultAddress},
enums::{
HyperliquidBarInterval, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
HyperliquidProductType,
},
parse::{
bar_type_to_interval, clamp_price_to_precision, derive_limit_from_trigger,
extract_inner_error, normalize_price, order_to_hyperliquid_request_with_asset,
round_to_sig_figs, time_in_force_to_hyperliquid_tif,
},
},
data::candle_to_bar,
http::{
error::{Error, Result},
models::{
ClearinghouseState, Cloid, HyperliquidCandleSnapshot, HyperliquidExchangeRequest,
HyperliquidExchangeResponse, HyperliquidExecAction, HyperliquidExecBuilderFee,
HyperliquidExecCancelByCloidRequest, HyperliquidExecCancelOrderRequest,
HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecModifyOrderRequest,
HyperliquidExecOrderKind, HyperliquidExecOrderResponseData, HyperliquidExecOrderStatus,
HyperliquidExecPlaceOrderRequest, HyperliquidExecTif, HyperliquidExecTpSl,
HyperliquidExecTriggerParams, HyperliquidFills, HyperliquidL2Book, HyperliquidMeta,
HyperliquidOrderStatus, PerpMeta, PerpMetaAndCtxs, RESPONSE_STATUS_OK, SpotMeta,
SpotMetaAndCtxs,
},
parse::{
HyperliquidInstrumentDef, instruments_from_defs_owned, parse_fill_report,
parse_order_status_report_from_basic, parse_perp_instruments,
parse_position_status_report, parse_spot_instruments,
},
query::{ExchangeAction, InfoRequest},
rate_limits::{
RateLimitSnapshot, WeightedLimiter, backoff_full_jitter, exchange_weight,
info_base_weight, info_extra_weight,
},
},
signing::{
HyperliquidActionType, HyperliquidEip712Signer, NonceManager, SignRequest, types::SignerId,
},
websocket::messages::WsBasicOrderData,
};
pub static HYPERLIQUID_REST_QUOTA: LazyLock<Quota> =
LazyLock::new(|| Quota::per_minute(NonZeroU32::new(1200).unwrap()));
#[derive(Debug, Clone)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
from_py_object
)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
)]
pub struct HyperliquidRawHttpClient {
client: HttpClient,
is_testnet: bool,
base_info: String,
base_exchange: String,
signer: Option<HyperliquidEip712Signer>,
nonce_manager: Option<Arc<NonceManager>>,
vault_address: Option<VaultAddress>,
rest_limiter: Arc<WeightedLimiter>,
rate_limit_backoff_base: Duration,
rate_limit_backoff_cap: Duration,
rate_limit_max_attempts_info: u32,
}
impl HyperliquidRawHttpClient {
pub fn new(
is_testnet: bool,
timeout_secs: u64,
proxy_url: Option<String>,
) -> std::result::Result<Self, HttpClientError> {
Ok(Self {
client: HttpClient::new(
Self::default_headers(),
vec![],
vec![],
Some(*HYPERLIQUID_REST_QUOTA),
Some(timeout_secs),
proxy_url,
)?,
is_testnet,
base_info: info_url(is_testnet).to_string(),
base_exchange: exchange_url(is_testnet).to_string(),
signer: None,
nonce_manager: None,
vault_address: None,
rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
rate_limit_backoff_base: Duration::from_millis(125),
rate_limit_backoff_cap: Duration::from_secs(5),
rate_limit_max_attempts_info: 3,
})
}
pub fn with_credentials(
secrets: &Secrets,
timeout_secs: u64,
proxy_url: Option<String>,
) -> std::result::Result<Self, HttpClientError> {
let signer = HyperliquidEip712Signer::new(secrets.private_key.clone());
let nonce_manager = Arc::new(NonceManager::new());
Ok(Self {
client: HttpClient::new(
Self::default_headers(),
vec![],
vec![],
Some(*HYPERLIQUID_REST_QUOTA),
Some(timeout_secs),
proxy_url,
)?,
is_testnet: secrets.is_testnet,
base_info: info_url(secrets.is_testnet).to_string(),
base_exchange: exchange_url(secrets.is_testnet).to_string(),
signer: Some(signer),
nonce_manager: Some(nonce_manager),
vault_address: secrets.vault_address,
rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
rate_limit_backoff_base: Duration::from_millis(125),
rate_limit_backoff_cap: Duration::from_secs(5),
rate_limit_max_attempts_info: 3,
})
}
pub fn set_base_info_url(&mut self, url: String) {
self.base_info = url;
}
pub fn set_base_exchange_url(&mut self, url: String) {
self.base_exchange = url;
}
pub fn from_env(is_testnet: bool) -> Result<Self> {
let secrets = Secrets::from_env(is_testnet)
.map_err(|e| Error::auth(format!("missing credentials in environment: {e}")))?;
Self::with_credentials(&secrets, 60, None)
.map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
}
pub fn from_credentials(
private_key: &str,
vault_address: Option<&str>,
is_testnet: bool,
timeout_secs: u64,
proxy_url: Option<String>,
) -> Result<Self> {
let secrets = Secrets::from_private_key(private_key, vault_address, is_testnet)
.map_err(|e| Error::auth(format!("invalid credentials: {e}")))?;
Self::with_credentials(&secrets, timeout_secs, proxy_url)
.map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
}
#[must_use]
pub fn with_rate_limits(mut self) -> Self {
self.rest_limiter = Arc::new(WeightedLimiter::per_minute(1200));
self.rate_limit_backoff_base = Duration::from_millis(125);
self.rate_limit_backoff_cap = Duration::from_secs(5);
self.rate_limit_max_attempts_info = 3;
self
}
#[must_use]
pub fn is_testnet(&self) -> bool {
self.is_testnet
}
pub fn get_user_address(&self) -> Result<String> {
self.signer
.as_ref()
.ok_or_else(|| Error::auth("No signer configured"))?
.address()
}
#[must_use]
pub fn has_vault_address(&self) -> bool {
self.vault_address.is_some()
}
pub fn get_account_address(&self) -> Result<String> {
if let Some(vault) = &self.vault_address {
Ok(vault.to_hex())
} else {
self.get_user_address()
}
}
fn default_headers() -> HashMap<String, String> {
HashMap::from([
(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
("Content-Type".to_string(), "application/json".to_string()),
])
}
fn signer_id(&self) -> SignerId {
SignerId("hyperliquid:default".into())
}
fn parse_retry_after_simple(&self, headers: &HashMap<String, String>) -> Option<u64> {
let retry_after = headers.get("retry-after")?;
retry_after.parse::<u64>().ok().map(|s| s * 1000) }
pub async fn info_meta(&self) -> Result<HyperliquidMeta> {
let request = InfoRequest::meta();
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn get_spot_meta(&self) -> Result<SpotMeta> {
let request = InfoRequest::spot_meta();
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn get_perp_meta_and_ctxs(&self) -> Result<PerpMetaAndCtxs> {
let request = InfoRequest::meta_and_asset_ctxs();
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn get_spot_meta_and_ctxs(&self) -> Result<SpotMetaAndCtxs> {
let request = InfoRequest::spot_meta_and_asset_ctxs();
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub(crate) async fn load_perp_meta(&self) -> Result<PerpMeta> {
let request = InfoRequest::meta();
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub(crate) async fn load_all_perp_metas(&self) -> Result<Vec<PerpMeta>> {
let request = InfoRequest::all_perp_metas();
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn info_l2_book(&self, coin: &str) -> Result<HyperliquidL2Book> {
let request = InfoRequest::l2_book(coin);
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn info_user_fills(&self, user: &str) -> Result<HyperliquidFills> {
let request = InfoRequest::user_fills(user);
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn info_order_status(&self, user: &str, oid: u64) -> Result<HyperliquidOrderStatus> {
let request = InfoRequest::order_status(user, oid);
let response = self.send_info_request(&request).await?;
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn info_open_orders(&self, user: &str) -> Result<Value> {
let request = InfoRequest::open_orders(user);
self.send_info_request(&request).await
}
pub async fn info_frontend_open_orders(&self, user: &str) -> Result<Value> {
let request = InfoRequest::frontend_open_orders(user);
self.send_info_request(&request).await
}
pub async fn info_clearinghouse_state(&self, user: &str) -> Result<Value> {
let request = InfoRequest::clearinghouse_state(user);
self.send_info_request(&request).await
}
pub async fn info_user_fees(&self, user: &str) -> Result<Value> {
let request = InfoRequest::user_fees(user);
self.send_info_request(&request).await
}
pub async fn info_candle_snapshot(
&self,
coin: &str,
interval: HyperliquidBarInterval,
start_time: u64,
end_time: u64,
) -> Result<HyperliquidCandleSnapshot> {
let request = InfoRequest::candle_snapshot(coin, interval, start_time, end_time);
let response = self.send_info_request(&request).await?;
log::trace!(
"Candle snapshot raw response (len={}): {:?}",
response.as_array().map_or(0, |a| a.len()),
response
);
serde_json::from_value(response).map_err(Error::Serde)
}
pub async fn send_info_request_raw(&self, request: &InfoRequest) -> Result<Value> {
self.send_info_request(request).await
}
async fn send_info_request(&self, request: &InfoRequest) -> Result<Value> {
let base_w = info_base_weight(request);
self.rest_limiter.acquire(base_w).await;
let mut attempt = 0u32;
loop {
let response = self.http_roundtrip_info(request).await?;
if response.status.is_success() {
let val: Value = serde_json::from_slice(&response.body).map_err(Error::Serde)?;
let extra = info_extra_weight(request, &val);
if extra > 0 {
self.rest_limiter.debit_extra(extra).await;
log::debug!(
"Info debited extra weight: endpoint={request:?}, base_w={base_w}, extra={extra}"
);
}
return Ok(val);
}
if response.status.as_u16() == 429 {
if attempt >= self.rate_limit_max_attempts_info {
let ra = self.parse_retry_after_simple(&response.headers);
return Err(Error::rate_limit("info", base_w, ra));
}
let delay = self
.parse_retry_after_simple(&response.headers)
.map_or_else(
|| {
backoff_full_jitter(
attempt,
self.rate_limit_backoff_base,
self.rate_limit_backoff_cap,
)
},
Duration::from_millis,
);
log::warn!(
"429 Too Many Requests; backing off: endpoint={request:?}, attempt={attempt}, wait_ms={:?}",
delay.as_millis()
);
attempt += 1;
tokio::time::sleep(delay).await;
self.rest_limiter.acquire(1).await;
continue;
}
if (response.status.is_server_error() || response.status.as_u16() == 408)
&& attempt < self.rate_limit_max_attempts_info
{
let delay = backoff_full_jitter(
attempt,
self.rate_limit_backoff_base,
self.rate_limit_backoff_cap,
);
log::warn!(
"Transient error; retrying: endpoint={request:?}, attempt={attempt}, status={:?}, wait_ms={:?}",
response.status.as_u16(),
delay.as_millis()
);
attempt += 1;
tokio::time::sleep(delay).await;
continue;
}
let error_body = String::from_utf8_lossy(&response.body);
return Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
));
}
}
async fn http_roundtrip_info(&self, request: &InfoRequest) -> Result<HttpResponse> {
let url = &self.base_info;
let body = serde_json::to_value(request).map_err(Error::Serde)?;
let body_bytes = serde_json::to_string(&body)
.map_err(Error::Serde)?
.into_bytes();
self.client
.request(
Method::POST,
url.clone(),
None,
None,
Some(body_bytes),
None,
None,
)
.await
.map_err(Error::from_http_client)
}
pub async fn post_action(
&self,
action: &ExchangeAction,
) -> Result<HyperliquidExchangeResponse> {
let w = exchange_weight(action);
self.rest_limiter.acquire(w).await;
let signer = self
.signer
.as_ref()
.ok_or_else(|| Error::auth("credentials required for exchange operations"))?;
let nonce_manager = self
.nonce_manager
.as_ref()
.ok_or_else(|| Error::auth("nonce manager missing"))?;
let signer_id = self.signer_id();
let time_nonce = nonce_manager.next(signer_id)?;
let action_value = serde_json::to_value(action)
.context("serialize exchange action")
.map_err(|e| Error::bad_request(e.to_string()))?;
let action_bytes = rmp_serde::to_vec_named(action)
.context("serialize action with MessagePack")
.map_err(|e| Error::bad_request(e.to_string()))?;
let sign_request = SignRequest {
action: action_value.clone(),
action_bytes: Some(action_bytes),
time_nonce,
action_type: HyperliquidActionType::L1,
is_testnet: self.is_testnet,
vault_address: self.vault_address.as_ref().map(|v| v.to_hex()),
};
let sig = signer.sign(&sign_request)?.signature;
let nonce_u64 = time_nonce.as_millis() as u64;
let request = if let Some(vault) = self.vault_address {
HyperliquidExchangeRequest::with_vault(
action.clone(),
nonce_u64,
&sig,
vault.to_string(),
)
.map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
} else {
HyperliquidExchangeRequest::new(action.clone(), nonce_u64, &sig)
.map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
};
let response = self.http_roundtrip_exchange(&request).await?;
if response.status.is_success() {
let parsed_response: HyperliquidExchangeResponse =
serde_json::from_slice(&response.body).map_err(Error::Serde)?;
match &parsed_response {
HyperliquidExchangeResponse::Status {
status,
response: response_data,
} if status == "err" => {
let error_msg = response_data
.as_str()
.map_or_else(|| response_data.to_string(), |s| s.to_string());
log::error!("Hyperliquid API returned error: {error_msg}");
Err(Error::bad_request(format!("API error: {error_msg}")))
}
HyperliquidExchangeResponse::Error { error } => {
log::error!("Hyperliquid API returned error: {error}");
Err(Error::bad_request(format!("API error: {error}")))
}
_ => Ok(parsed_response),
}
} else if response.status.as_u16() == 429 {
let ra = self.parse_retry_after_simple(&response.headers);
Err(Error::rate_limit("exchange", w, ra))
} else {
let error_body = String::from_utf8_lossy(&response.body);
log::error!(
"Exchange API error (status {}): {}",
response.status.as_u16(),
error_body
);
Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
))
}
}
pub async fn post_action_exec(
&self,
action: &HyperliquidExecAction,
) -> Result<HyperliquidExchangeResponse> {
let w = match action {
HyperliquidExecAction::Order { orders, .. } => 1 + (orders.len() as u32 / 40),
HyperliquidExecAction::Cancel { cancels } => 1 + (cancels.len() as u32 / 40),
HyperliquidExecAction::CancelByCloid { cancels } => 1 + (cancels.len() as u32 / 40),
HyperliquidExecAction::BatchModify { modifies } => 1 + (modifies.len() as u32 / 40),
_ => 1,
};
self.rest_limiter.acquire(w).await;
let signer = self
.signer
.as_ref()
.ok_or_else(|| Error::auth("credentials required for exchange operations"))?;
let nonce_manager = self
.nonce_manager
.as_ref()
.ok_or_else(|| Error::auth("nonce manager missing"))?;
let signer_id = self.signer_id();
let time_nonce = nonce_manager.next(signer_id)?;
let action_value = serde_json::to_value(action)
.context("serialize exchange action")
.map_err(|e| Error::bad_request(e.to_string()))?;
let action_bytes = rmp_serde::to_vec_named(action)
.context("serialize action with MessagePack")
.map_err(|e| Error::bad_request(e.to_string()))?;
let sig = signer
.sign(&SignRequest {
action: action_value.clone(),
action_bytes: Some(action_bytes),
time_nonce,
action_type: HyperliquidActionType::L1,
is_testnet: self.is_testnet,
vault_address: self.vault_address.as_ref().map(|v| v.to_hex()),
})?
.signature;
let request = if let Some(vault) = self.vault_address {
HyperliquidExchangeRequest::with_vault(
action.clone(),
time_nonce.as_millis() as u64,
&sig,
vault.to_string(),
)
.map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
} else {
HyperliquidExchangeRequest::new(action.clone(), time_nonce.as_millis() as u64, &sig)
.map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
};
let response = self.http_roundtrip_exchange(&request).await?;
if response.status.is_success() {
let parsed_response: HyperliquidExchangeResponse =
serde_json::from_slice(&response.body).map_err(Error::Serde)?;
match &parsed_response {
HyperliquidExchangeResponse::Status {
status,
response: response_data,
} if status == "err" => {
let error_msg = response_data
.as_str()
.map_or_else(|| response_data.to_string(), |s| s.to_string());
log::error!("Hyperliquid API returned error: {error_msg}");
Err(Error::bad_request(format!("API error: {error_msg}")))
}
HyperliquidExchangeResponse::Error { error } => {
log::error!("Hyperliquid API returned error: {error}");
Err(Error::bad_request(format!("API error: {error}")))
}
_ => Ok(parsed_response),
}
} else if response.status.as_u16() == 429 {
let ra = self.parse_retry_after_simple(&response.headers);
Err(Error::rate_limit("exchange", w, ra))
} else {
let error_body = String::from_utf8_lossy(&response.body);
Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
))
}
}
pub async fn rest_limiter_snapshot(&self) -> RateLimitSnapshot {
self.rest_limiter.snapshot().await
}
async fn http_roundtrip_exchange<T>(
&self,
request: &HyperliquidExchangeRequest<T>,
) -> Result<HttpResponse>
where
T: serde::Serialize,
{
let url = &self.base_exchange;
let body = serde_json::to_string(&request).map_err(Error::Serde)?;
let body_bytes = body.into_bytes();
let response = self
.client
.request(
Method::POST,
url.clone(),
None,
None,
Some(body_bytes),
None,
None,
)
.await
.map_err(Error::from_http_client)?;
Ok(response)
}
}
#[derive(Debug, Clone)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
from_py_object
)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
)]
pub struct HyperliquidHttpClient {
pub(crate) inner: Arc<HyperliquidRawHttpClient>,
clock: &'static AtomicTime,
instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
instruments_by_coin: Arc<AtomicMap<(Ustr, HyperliquidProductType), InstrumentAny>>,
asset_indices: Arc<AtomicMap<Ustr, u32>>,
spot_fill_coins: Arc<AtomicMap<Ustr, Ustr>>,
account_id: Option<AccountId>,
account_address: Option<String>,
normalize_prices: bool,
}
impl Default for HyperliquidHttpClient {
fn default() -> Self {
Self::new(true, 60, None).expect("Failed to create default Hyperliquid HTTP client")
}
}
impl HyperliquidHttpClient {
pub fn new(
is_testnet: bool,
timeout_secs: u64,
proxy_url: Option<String>,
) -> std::result::Result<Self, HttpClientError> {
let raw_client = HyperliquidRawHttpClient::new(is_testnet, timeout_secs, proxy_url)?;
Ok(Self::from_raw(raw_client))
}
pub fn with_secrets(
secrets: &Secrets,
timeout_secs: u64,
proxy_url: Option<String>,
) -> std::result::Result<Self, HttpClientError> {
let raw_client =
HyperliquidRawHttpClient::with_credentials(secrets, timeout_secs, proxy_url)?;
Ok(Self::from_raw(raw_client))
}
fn from_raw(raw_client: HyperliquidRawHttpClient) -> Self {
Self {
inner: Arc::new(raw_client),
clock: get_atomic_clock_realtime(),
instruments: Arc::new(AtomicMap::new()),
instruments_by_coin: Arc::new(AtomicMap::new()),
asset_indices: Arc::new(AtomicMap::new()),
spot_fill_coins: Arc::new(AtomicMap::new()),
account_id: None,
account_address: None,
normalize_prices: true,
}
}
pub fn set_base_info_url(&mut self, url: String) {
Arc::get_mut(&mut self.inner)
.expect("cannot override URL: Arc has multiple references")
.set_base_info_url(url);
}
pub fn set_base_exchange_url(&mut self, url: String) {
Arc::get_mut(&mut self.inner)
.expect("cannot override URL: Arc has multiple references")
.set_base_exchange_url(url);
}
pub fn from_env(is_testnet: bool) -> Result<Self> {
let raw_client = HyperliquidRawHttpClient::from_env(is_testnet)?;
Ok(Self {
inner: Arc::new(raw_client),
clock: get_atomic_clock_realtime(),
instruments: Arc::new(AtomicMap::new()),
instruments_by_coin: Arc::new(AtomicMap::new()),
asset_indices: Arc::new(AtomicMap::new()),
spot_fill_coins: Arc::new(AtomicMap::new()),
account_id: None,
account_address: None,
normalize_prices: true,
})
}
pub fn with_credentials(
private_key: Option<String>,
vault_address: Option<String>,
account_address: Option<String>,
is_testnet: bool,
timeout_secs: u64,
proxy_url: Option<String>,
) -> Result<Self> {
let pk_env_var = if is_testnet {
"HYPERLIQUID_TESTNET_PK"
} else {
"HYPERLIQUID_PK"
};
let vault_env_var = if is_testnet {
"HYPERLIQUID_TESTNET_VAULT"
} else {
"HYPERLIQUID_VAULT"
};
let resolved_pk = match private_key {
Some(pk) => Some(pk),
None => env::var(pk_env_var).ok(),
};
let resolved_vault = match vault_address {
Some(vault) => Some(vault),
None => env::var(vault_env_var).ok(),
};
let resolved_account_address = match account_address {
Some(addr) => Some(addr),
None => env::var("HYPERLIQUID_ACCOUNT_ADDRESS").ok(),
};
match resolved_pk {
Some(pk) => {
let raw_client = HyperliquidRawHttpClient::from_credentials(
&pk,
resolved_vault.as_deref(),
is_testnet,
timeout_secs,
proxy_url,
)?;
Ok(Self {
inner: Arc::new(raw_client),
clock: get_atomic_clock_realtime(),
instruments: Arc::new(AtomicMap::new()),
instruments_by_coin: Arc::new(AtomicMap::new()),
asset_indices: Arc::new(AtomicMap::new()),
spot_fill_coins: Arc::new(AtomicMap::new()),
account_id: None,
account_address: resolved_account_address,
normalize_prices: true,
})
}
None => {
Self::new(is_testnet, timeout_secs, proxy_url)
.map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
}
}
}
pub fn from_credentials(
private_key: &str,
vault_address: Option<&str>,
is_testnet: bool,
timeout_secs: u64,
proxy_url: Option<String>,
) -> Result<Self> {
let raw_client = HyperliquidRawHttpClient::from_credentials(
private_key,
vault_address,
is_testnet,
timeout_secs,
proxy_url,
)?;
Ok(Self {
inner: Arc::new(raw_client),
clock: get_atomic_clock_realtime(),
instruments: Arc::new(AtomicMap::new()),
instruments_by_coin: Arc::new(AtomicMap::new()),
asset_indices: Arc::new(AtomicMap::new()),
spot_fill_coins: Arc::new(AtomicMap::new()),
account_id: None,
account_address: None,
normalize_prices: true,
})
}
#[must_use]
pub fn is_testnet(&self) -> bool {
self.inner.is_testnet()
}
#[must_use]
pub fn normalize_prices(&self) -> bool {
self.normalize_prices
}
pub fn set_normalize_prices(&mut self, value: bool) {
self.normalize_prices = value;
}
pub fn get_user_address(&self) -> Result<String> {
self.inner.get_user_address()
}
#[must_use]
pub fn has_vault_address(&self) -> bool {
self.inner.has_vault_address()
}
pub fn get_account_address(&self) -> Result<String> {
if let Some(addr) = &self.account_address {
return Ok(addr.clone());
}
self.inner.get_account_address()
}
pub fn set_account_address(&mut self, address: Option<String>) {
self.account_address = address;
}
pub fn cache_instrument(&self, instrument: &InstrumentAny) {
let full_symbol = instrument.symbol().inner();
let coin = instrument.raw_symbol().inner();
self.instruments.rcu(|m| {
m.insert(full_symbol, instrument.clone());
m.insert(coin, instrument.clone());
});
if let Ok(product_type) = HyperliquidProductType::from_symbol(full_symbol.as_str()) {
self.instruments_by_coin.rcu(|m| {
m.insert((coin, product_type), instrument.clone());
if coin.as_str().starts_with('@')
&& let Some(base) = full_symbol.as_str().split('-').next()
{
let base_ustr = Ustr::from(base);
if base_ustr != coin {
m.insert((base_ustr, product_type), instrument.clone());
}
}
});
} else {
log::warn!("Unable to determine product type for symbol: {full_symbol}");
}
}
fn get_or_create_instrument(
&self,
coin: &Ustr,
product_type: Option<HyperliquidProductType>,
) -> Option<InstrumentAny> {
if let Some(pt) = product_type
&& let Some(instrument) = self.instruments_by_coin.load().get(&(*coin, pt))
{
return Some(instrument.clone());
}
if product_type.is_none() {
let guard = self.instruments_by_coin.load();
if let Some(instrument) = guard.get(&(*coin, HyperliquidProductType::Perp)) {
return Some(instrument.clone());
}
if let Some(instrument) = guard.get(&(*coin, HyperliquidProductType::Spot)) {
return Some(instrument.clone());
}
}
if coin.as_str().starts_with('@')
&& let Some(symbol) = self.spot_fill_coins.load().get(coin)
{
if let Some(instrument) = self.instruments.load().get(symbol) {
return Some(instrument.clone());
}
}
if coin.as_str().starts_with("vntls:") {
log::info!("Creating synthetic instrument for vault token: {coin}");
let ts_event = self.clock.get_time_ns();
let symbol_str = format!("{coin}-USDC-SPOT");
let symbol = Symbol::new(&symbol_str);
let venue = *HYPERLIQUID_VENUE;
let instrument_id = InstrumentId::new(symbol, venue);
let base_currency = Currency::new(
coin.as_str(),
8, 0, coin.as_str(),
CurrencyType::Crypto,
);
let quote_currency = Currency::new(
"USDC",
6, 0,
"USDC",
CurrencyType::Crypto,
);
let price_increment = Price::from("0.00000001");
let size_increment = Quantity::from("0.00000001");
let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
instrument_id,
symbol,
base_currency,
quote_currency,
8, 8, price_increment,
size_increment,
None, None, None, None, None, None, None, None, None, None, None, None, None, ts_event,
ts_event,
));
self.cache_instrument(&instrument);
Some(instrument)
} else {
log::warn!("Instrument not found in cache: {coin}");
None
}
}
pub fn set_account_id(&mut self, account_id: AccountId) {
self.account_id = Some(account_id);
}
pub async fn request_instrument_defs(&self) -> Result<Vec<HyperliquidInstrumentDef>> {
let mut defs: Vec<HyperliquidInstrumentDef> = Vec::new();
match self.inner.load_all_perp_metas().await {
Ok(all_metas) => {
for (dex_index, meta) in all_metas.iter().enumerate() {
let base = perp_dex_asset_index_base(dex_index);
match parse_perp_instruments(meta, base) {
Ok(perp_defs) => {
log::debug!(
"Loaded Hyperliquid perp defs: dex_index={dex_index}, count={}",
perp_defs.len(),
);
defs.extend(perp_defs);
}
Err(e) => {
log::warn!("Failed to parse perp instruments for dex {dex_index}: {e}");
}
}
}
}
Err(e) => {
log::warn!("Failed to load allPerpMetas, falling back to meta: {e}");
match self.inner.load_perp_meta().await {
Ok(perp_meta) => match parse_perp_instruments(&perp_meta, 0) {
Ok(perp_defs) => {
log::debug!(
"Loaded Hyperliquid perp defs via fallback: count={}",
perp_defs.len(),
);
defs.extend(perp_defs);
}
Err(e) => {
log::warn!("Failed to parse perp instruments: {e}");
}
},
Err(e) => {
log::warn!("Failed to load Hyperliquid perp metadata: {e}");
}
}
}
}
match self.inner.get_spot_meta().await {
Ok(spot_meta) => match parse_spot_instruments(&spot_meta) {
Ok(spot_defs) => {
log::debug!(
"Loaded Hyperliquid spot definitions: count={}",
spot_defs.len(),
);
defs.extend(spot_defs);
}
Err(e) => {
log::warn!("Failed to parse Hyperliquid spot instruments: {e}");
}
},
Err(e) => {
log::warn!("Failed to load Hyperliquid spot metadata: {e}");
}
}
self.asset_indices.rcu(|m| {
for def in &defs {
m.insert(def.symbol, def.asset_index);
}
});
log::debug!(
"Populated asset indices map (count={})",
self.asset_indices.len()
);
Ok(defs)
}
pub fn convert_defs(&self, defs: Vec<HyperliquidInstrumentDef>) -> Vec<InstrumentAny> {
let ts_init = self.clock.get_time_ns();
instruments_from_defs_owned(defs, ts_init)
}
pub async fn request_instruments(&self) -> Result<Vec<InstrumentAny>> {
let defs = self.request_instrument_defs().await?;
Ok(self.convert_defs(defs))
}
pub fn get_asset_index(&self, symbol: &str) -> Option<u32> {
self.asset_indices.load().get(&Ustr::from(symbol)).copied()
}
pub fn get_price_precision(&self, symbol: &str) -> Option<u8> {
self.instruments
.load()
.get(&Ustr::from(symbol))
.map(|inst| inst.price_precision())
}
#[must_use]
pub fn get_spot_fill_coin_mapping(&self) -> AHashMap<Ustr, Ustr> {
const SPOT_INDEX_OFFSET: u32 = 10_000;
const BUILDER_PERP_OFFSET: u32 = 100_000;
let guard = self.asset_indices.load();
let mut mapping = AHashMap::new();
for (symbol, &asset_index) in guard.iter() {
if (SPOT_INDEX_OFFSET..BUILDER_PERP_OFFSET).contains(&asset_index) {
let pair_index = asset_index - SPOT_INDEX_OFFSET;
let fill_coin = Ustr::from(&format!("@{pair_index}"));
mapping.insert(fill_coin, *symbol);
}
}
self.spot_fill_coins.store(mapping.clone());
mapping
}
#[allow(dead_code)]
pub(crate) async fn load_perp_meta(&self) -> Result<PerpMeta> {
self.inner.load_perp_meta().await
}
#[allow(dead_code)]
pub(crate) async fn load_all_perp_metas(&self) -> Result<Vec<PerpMeta>> {
self.inner.load_all_perp_metas().await
}
#[allow(dead_code)]
pub(crate) async fn get_spot_meta(&self) -> Result<SpotMeta> {
self.inner.get_spot_meta().await
}
pub async fn info_l2_book(&self, coin: &str) -> Result<HyperliquidL2Book> {
self.inner.info_l2_book(coin).await
}
pub async fn info_user_fills(&self, user: &str) -> Result<HyperliquidFills> {
self.inner.info_user_fills(user).await
}
pub async fn info_order_status(&self, user: &str, oid: u64) -> Result<HyperliquidOrderStatus> {
self.inner.info_order_status(user, oid).await
}
pub async fn info_open_orders(&self, user: &str) -> Result<Value> {
self.inner.info_open_orders(user).await
}
pub async fn info_frontend_open_orders(&self, user: &str) -> Result<Value> {
self.inner.info_frontend_open_orders(user).await
}
pub async fn info_clearinghouse_state(&self, user: &str) -> Result<Value> {
self.inner.info_clearinghouse_state(user).await
}
pub async fn info_user_fees(&self, user: &str) -> Result<Value> {
self.inner.info_user_fees(user).await
}
pub async fn info_candle_snapshot(
&self,
coin: &str,
interval: HyperliquidBarInterval,
start_time: u64,
end_time: u64,
) -> Result<HyperliquidCandleSnapshot> {
self.inner
.info_candle_snapshot(coin, interval, start_time, end_time)
.await
}
pub async fn post_action(
&self,
action: &ExchangeAction,
) -> Result<HyperliquidExchangeResponse> {
self.inner.post_action(action).await
}
pub async fn post_action_exec(
&self,
action: &HyperliquidExecAction,
) -> Result<HyperliquidExchangeResponse> {
self.inner.post_action_exec(action).await
}
pub async fn info_meta(&self) -> Result<HyperliquidMeta> {
self.inner.info_meta().await
}
pub async fn cancel_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> Result<()> {
let symbol = instrument_id.symbol.as_str();
let asset_id = self.get_asset_index(symbol).ok_or_else(|| {
Error::bad_request(format!(
"Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
))
})?;
let action = if let Some(cloid) = client_order_id {
let cloid_hash = Cloid::from_client_order_id(cloid);
let cancel_req = HyperliquidExecCancelByCloidRequest {
asset: asset_id,
cloid: cloid_hash,
};
HyperliquidExecAction::CancelByCloid {
cancels: vec![cancel_req],
}
} else if let Some(oid) = venue_order_id {
let oid_u64 = oid
.as_str()
.parse::<u64>()
.map_err(|_| Error::bad_request("Invalid venue order ID format"))?;
let cancel_req = HyperliquidExecCancelOrderRequest {
asset: asset_id,
oid: oid_u64,
};
HyperliquidExecAction::Cancel {
cancels: vec![cancel_req],
}
} else {
return Err(Error::bad_request(
"Either client_order_id or venue_order_id must be provided",
));
};
let response = self.inner.post_action_exec(&action).await?;
match response {
ref r @ HyperliquidExchangeResponse::Status { .. } if r.is_ok() => Ok(()),
HyperliquidExchangeResponse::Status {
status,
response: error_data,
} => Err(Error::bad_request(format!(
"Cancel order failed: status={status}, error={error_data}"
))),
HyperliquidExchangeResponse::Error { error } => {
Err(Error::bad_request(format!("Cancel order error: {error}")))
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn modify_order(
&self,
instrument_id: InstrumentId,
venue_order_id: VenueOrderId,
order_side: OrderSide,
order_type: OrderType,
price: Price,
quantity: Quantity,
trigger_price: Option<Price>,
reduce_only: bool,
post_only: bool,
time_in_force: TimeInForce,
client_order_id: Option<ClientOrderId>,
) -> Result<()> {
let symbol = instrument_id.symbol.as_str();
let asset_id = self.get_asset_index(symbol).ok_or_else(|| {
Error::bad_request(format!(
"Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
))
})?;
let oid: u64 = venue_order_id
.as_str()
.parse()
.map_err(|_| Error::bad_request("Invalid venue order ID format"))?;
let is_buy = matches!(order_side, OrderSide::Buy);
let decimals = self.get_price_precision(symbol).unwrap_or(2);
let normalized_price = if self.normalize_prices {
normalize_price(price.as_decimal(), decimals).normalize()
} else {
price.as_decimal().normalize()
};
let size = quantity.as_decimal().normalize();
let cloid = client_order_id.map(Cloid::from_client_order_id);
let kind = match order_type {
OrderType::Market => HyperliquidExecOrderKind::Limit {
limit: HyperliquidExecLimitParams {
tif: HyperliquidExecTif::Ioc,
},
},
OrderType::Limit => {
let tif = time_in_force_to_hyperliquid_tif(time_in_force, post_only)
.map_err(|e| Error::bad_request(format!("{e}")))?;
HyperliquidExecOrderKind::Limit {
limit: HyperliquidExecLimitParams { tif },
}
}
OrderType::StopMarket
| OrderType::StopLimit
| OrderType::MarketIfTouched
| OrderType::LimitIfTouched => {
if let Some(trig_px) = trigger_price {
let trigger_price_decimal = if self.normalize_prices {
normalize_price(trig_px.as_decimal(), decimals).normalize()
} else {
trig_px.as_decimal().normalize()
};
let tpsl = match order_type {
OrderType::StopMarket | OrderType::StopLimit => HyperliquidExecTpSl::Sl,
_ => HyperliquidExecTpSl::Tp,
};
let is_market = matches!(
order_type,
OrderType::StopMarket | OrderType::MarketIfTouched
);
HyperliquidExecOrderKind::Trigger {
trigger: HyperliquidExecTriggerParams {
is_market,
trigger_px: trigger_price_decimal,
tpsl,
},
}
} else {
return Err(Error::bad_request("Trigger orders require a trigger price"));
}
}
_ => {
return Err(Error::bad_request(format!(
"Order type {order_type:?} not supported for modify"
)));
}
};
let order = HyperliquidExecPlaceOrderRequest {
asset: asset_id,
is_buy,
price: normalized_price,
size,
reduce_only,
kind,
cloid,
};
let action = HyperliquidExecAction::Modify {
modify: HyperliquidExecModifyOrderRequest { oid, order },
};
let response = self.inner.post_action_exec(&action).await?;
match response {
ref r @ HyperliquidExchangeResponse::Status { .. } if r.is_ok() => {
if let Some(inner_error) = extract_inner_error(&response) {
Err(Error::bad_request(format!(
"Modify order rejected: {inner_error}",
)))
} else {
Ok(())
}
}
HyperliquidExchangeResponse::Status {
status,
response: error_data,
} => Err(Error::bad_request(format!(
"Modify order failed: status={status}, error={error_data}"
))),
HyperliquidExchangeResponse::Error { error } => {
Err(Error::bad_request(format!("Modify order error: {error}")))
}
}
}
pub async fn request_order_status_reports(
&self,
user: &str,
instrument_id: Option<InstrumentId>,
) -> Result<Vec<OrderStatusReport>> {
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let response = self.info_frontend_open_orders(user).await?;
let orders: Vec<serde_json::Value> = serde_json::from_value(response)
.map_err(|e| Error::bad_request(format!("Failed to parse orders: {e}")))?;
let mut reports = Vec::new();
let ts_init = self.clock.get_time_ns();
for order_value in orders {
let order: WsBasicOrderData = match serde_json::from_value(order_value.clone()) {
Ok(o) => o,
Err(e) => {
log::warn!("Failed to parse order: {e}");
continue;
}
};
let instrument = match self.get_or_create_instrument(&order.coin, None) {
Some(inst) => inst,
None => continue, };
if let Some(filter_id) = instrument_id
&& instrument.id() != filter_id
{
continue;
}
let status = HyperliquidOrderStatusEnum::Open;
match parse_order_status_report_from_basic(
&order,
&status,
&instrument,
account_id,
ts_init,
) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Failed to parse order status report: {e}"),
}
}
Ok(reports)
}
pub async fn request_order_status_report(
&self,
user: &str,
oid: u64,
) -> Result<Option<OrderStatusReport>> {
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let ts_init = self.clock.get_time_ns();
let response = self.info_frontend_open_orders(user).await?;
let orders: Vec<WsBasicOrderData> = match serde_json::from_value(response) {
Ok(v) => v,
Err(e) => {
log::warn!("Failed to parse frontend open orders response: {e}");
Vec::new()
}
};
if let Some(order) = orders.into_iter().find(|o| o.oid == oid) {
let instrument = match self.get_or_create_instrument(&order.coin, None) {
Some(inst) => inst,
None => return Ok(None),
};
let status = if order.trigger_activated == Some(true) {
HyperliquidOrderStatusEnum::Triggered
} else {
HyperliquidOrderStatusEnum::Open
};
return match parse_order_status_report_from_basic(
&order,
&status,
&instrument,
account_id,
ts_init,
) {
Ok(report) => Ok(Some(report)),
Err(e) => {
log::error!("Failed to parse order status report for oid {oid}: {e}");
Ok(None)
}
};
}
let response = self.info_order_status(user, oid).await?;
let entry = match response.statuses.into_iter().next() {
Some(e) => e,
None => return Ok(None),
};
let instrument = match self.get_or_create_instrument(&entry.order.coin, None) {
Some(inst) => inst,
None => return Ok(None),
};
let basic = WsBasicOrderData {
coin: entry.order.coin,
side: entry.order.side,
limit_px: entry.order.limit_px,
sz: entry.order.sz,
oid: entry.order.oid,
timestamp: entry.order.timestamp,
orig_sz: entry.order.orig_sz,
cloid: None,
trigger_px: None,
is_market: None,
tpsl: None,
trigger_activated: None,
trailing_stop: None,
};
match parse_order_status_report_from_basic(
&basic,
&entry.status,
&instrument,
account_id,
ts_init,
) {
Ok(mut report) => {
if entry.status_timestamp > 0 {
report.ts_last = UnixNanos::from(entry.status_timestamp * 1_000_000);
}
Ok(Some(report))
}
Err(e) => {
log::error!("Failed to parse order status report for oid {oid}: {e}");
Ok(None)
}
}
}
pub async fn request_order_status_report_by_client_order_id(
&self,
user: &str,
client_order_id: &ClientOrderId,
) -> Result<Option<OrderStatusReport>> {
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let ts_init = self.clock.get_time_ns();
let cloid_hex = Cloid::from_client_order_id(*client_order_id).to_hex();
let response = self.info_frontend_open_orders(user).await?;
let orders: Vec<WsBasicOrderData> = match serde_json::from_value(response) {
Ok(v) => v,
Err(e) => {
log::warn!("Failed to parse frontend open orders response: {e}");
return Ok(None);
}
};
let order = match orders
.into_iter()
.find(|o| o.cloid.as_ref().is_some_and(|c| c == &cloid_hex))
{
Some(o) => o,
None => return Ok(None),
};
let instrument = match self.get_or_create_instrument(&order.coin, None) {
Some(inst) => inst,
None => return Ok(None),
};
let status = if order.trigger_activated == Some(true) {
HyperliquidOrderStatusEnum::Triggered
} else {
HyperliquidOrderStatusEnum::Open
};
match parse_order_status_report_from_basic(
&order,
&status,
&instrument,
account_id,
ts_init,
) {
Ok(mut report) => {
report.client_order_id = Some(*client_order_id);
Ok(Some(report))
}
Err(e) => {
log::error!("Failed to parse order status report for cloid {cloid_hex}: {e}");
Ok(None)
}
}
}
pub async fn request_fill_reports(
&self,
user: &str,
instrument_id: Option<InstrumentId>,
) -> Result<Vec<FillReport>> {
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let fills_response = self.info_user_fills(user).await?;
let mut reports = Vec::new();
let ts_init = self.clock.get_time_ns();
for fill in fills_response {
let instrument = match self.get_or_create_instrument(&fill.coin, None) {
Some(inst) => inst,
None => continue, };
if let Some(filter_id) = instrument_id
&& instrument.id() != filter_id
{
continue;
}
match parse_fill_report(&fill, &instrument, account_id, ts_init) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Failed to parse fill report: {e}"),
}
}
Ok(reports)
}
pub async fn request_position_status_reports(
&self,
user: &str,
instrument_id: Option<InstrumentId>,
) -> Result<Vec<PositionStatusReport>> {
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let state_response = self.info_clearinghouse_state(user).await?;
let asset_positions: Vec<serde_json::Value> = state_response
.get("assetPositions")
.and_then(|v| v.as_array())
.ok_or_else(|| Error::bad_request("assetPositions not found in clearinghouse state"))?
.clone();
let mut reports = Vec::new();
let ts_init = self.clock.get_time_ns();
for position_value in asset_positions {
let coin = position_value
.get("position")
.and_then(|p| p.get("coin"))
.and_then(|c| c.as_str())
.ok_or_else(|| Error::bad_request("coin not found in position"))?;
let coin_ustr = Ustr::from(coin);
let instrument = match self.get_or_create_instrument(&coin_ustr, None) {
Some(inst) => inst,
None => continue, };
if let Some(filter_id) = instrument_id
&& instrument.id() != filter_id
{
continue;
}
match parse_position_status_report(&position_value, &instrument, account_id, ts_init) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Failed to parse position status report: {e}"),
}
}
Ok(reports)
}
pub async fn request_account_state(&self, user: &str) -> Result<AccountState> {
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let state_response = self.info_clearinghouse_state(user).await?;
let ts_init = self.clock.get_time_ns();
log::trace!("Clearinghouse state response: {state_response}");
let state: ClearinghouseState =
serde_json::from_value(state_response.clone()).map_err(|e| {
log::error!("Failed to parse clearinghouse state: {e}");
log::debug!("Raw response: {state_response}");
Error::bad_request(format!("Failed to parse clearinghouse state: {e}"))
})?;
let usdc = Currency::new("USDC", 6, 0, "0.000001", CurrencyType::Crypto);
let balances = if let Some(margin) = &state.cross_margin_summary {
let mut total = margin.total_raw_usd.max(Decimal::ZERO);
let free = state.withdrawable.unwrap_or(total).max(Decimal::ZERO);
if free > total {
log::debug!("Adjusting total ({total}) to match withdrawable ({free})");
total = free;
}
let locked = (total - free).max(Decimal::ZERO);
vec![AccountBalance::new(
Money::from_decimal(total, usdc).map_err(|e| Error::decode(e.to_string()))?,
Money::from_decimal(locked, usdc).map_err(|e| Error::decode(e.to_string()))?,
Money::from_decimal(free, usdc).map_err(|e| Error::decode(e.to_string()))?,
)]
} else {
let free = state
.withdrawable
.unwrap_or(Decimal::ZERO)
.max(Decimal::ZERO);
vec![AccountBalance::new(
Money::from_decimal(free, usdc).map_err(|e| Error::decode(e.to_string()))?,
Money::zero(usdc),
Money::from_decimal(free, usdc).map_err(|e| Error::decode(e.to_string()))?,
)]
};
Ok(AccountState::new(
account_id,
AccountType::Margin,
balances,
vec![], true, UUID4::new(),
ts_init,
ts_init,
None,
))
}
pub async fn request_bars(
&self,
bar_type: BarType,
start: Option<chrono::DateTime<chrono::Utc>>,
end: Option<chrono::DateTime<chrono::Utc>>,
limit: Option<u32>,
) -> Result<Vec<Bar>> {
let instrument_id = bar_type.instrument_id();
let symbol = instrument_id.symbol;
let product_type = HyperliquidProductType::from_symbol(symbol.as_str()).ok();
let base = Ustr::from(
symbol
.as_str()
.split('-')
.next()
.ok_or_else(|| Error::bad_request("Invalid instrument symbol"))?,
);
let instrument = self
.get_or_create_instrument(&base, product_type)
.ok_or_else(|| {
Error::bad_request(format!("Instrument not found in cache: {instrument_id}"))
})?;
let coin = instrument.raw_symbol().inner();
let price_precision = instrument.price_precision();
let size_precision = instrument.size_precision();
let interval =
bar_type_to_interval(&bar_type).map_err(|e| Error::bad_request(e.to_string()))?;
let now = chrono::Utc::now();
let end_time = end.unwrap_or(now).timestamp_millis() as u64;
let start_time = if let Some(start) = start {
start.timestamp_millis() as u64
} else {
let spec = bar_type.spec();
let step_ms = match spec.aggregation {
BarAggregation::Minute => spec.step.get() as u64 * 60_000,
BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
BarAggregation::Week => spec.step.get() as u64 * 604_800_000,
BarAggregation::Month => spec.step.get() as u64 * 2_592_000_000,
_ => 60_000,
};
end_time.saturating_sub(1000 * step_ms)
};
let candles = self
.info_candle_snapshot(coin.as_str(), interval, start_time, end_time)
.await?;
let now_ms = now.timestamp_millis() as u64;
let mut bars: Vec<Bar> = candles
.iter()
.filter(|candle| candle.end_timestamp < now_ms)
.enumerate()
.filter_map(|(i, candle)| {
candle_to_bar(candle, bar_type, price_precision, size_precision)
.map_err(|e| {
log::error!("Failed to convert candle {i} to bar: {candle:?} error: {e}");
e
})
.ok()
})
.collect();
if let Some(limit) = limit
&& limit > 0
&& bars.len() > limit as usize
{
bars.truncate(limit as usize);
}
log::debug!(
"Received {} bars for {} (filtered {} incomplete)",
bars.len(),
bar_type,
candles.len() - bars.len()
);
Ok(bars)
}
#[allow(clippy::too_many_arguments)]
pub async fn submit_order(
&self,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
post_only: bool,
reduce_only: bool,
) -> Result<OrderStatusReport> {
let symbol = instrument_id.symbol.as_str();
let asset = self.get_asset_index(symbol).ok_or_else(|| {
Error::bad_request(format!(
"Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
))
})?;
let is_buy = matches!(order_side, OrderSide::Buy);
let price_precision = self.get_price_precision(symbol).unwrap_or(2);
let price_decimal = match price {
Some(px) if self.normalize_prices => {
normalize_price(px.as_decimal(), price_precision).normalize()
}
Some(px) => px.as_decimal().normalize(),
None if matches!(order_type, OrderType::Market) => Decimal::ZERO,
None if matches!(
order_type,
OrderType::StopMarket | OrderType::MarketIfTouched
) =>
{
match trigger_price {
Some(tp) => {
let derived =
derive_limit_from_trigger(tp.as_decimal().normalize(), is_buy);
let sig_rounded = round_to_sig_figs(derived, 5);
clamp_price_to_precision(sig_rounded, price_precision, is_buy).normalize()
}
None => Decimal::ZERO,
}
}
None => return Err(Error::bad_request("Limit orders require a price")),
};
let size_decimal = quantity.as_decimal().normalize();
let kind = match order_type {
OrderType::Market => HyperliquidExecOrderKind::Limit {
limit: HyperliquidExecLimitParams {
tif: HyperliquidExecTif::Ioc,
},
},
OrderType::Limit => {
let tif = if post_only {
HyperliquidExecTif::Alo
} else {
match time_in_force {
TimeInForce::Gtc => HyperliquidExecTif::Gtc,
TimeInForce::Ioc => HyperliquidExecTif::Ioc,
TimeInForce::Fok
| TimeInForce::Day
| TimeInForce::Gtd
| TimeInForce::AtTheOpen
| TimeInForce::AtTheClose => {
return Err(Error::bad_request(format!(
"Time in force {time_in_force:?} not supported"
)));
}
}
};
HyperliquidExecOrderKind::Limit {
limit: HyperliquidExecLimitParams { tif },
}
}
OrderType::StopMarket
| OrderType::StopLimit
| OrderType::MarketIfTouched
| OrderType::LimitIfTouched => {
if let Some(trig_px) = trigger_price {
let trigger_price_decimal = if self.normalize_prices {
normalize_price(trig_px.as_decimal(), price_precision).normalize()
} else {
trig_px.as_decimal().normalize()
};
let tpsl = match order_type {
OrderType::StopMarket | OrderType::StopLimit => HyperliquidExecTpSl::Sl,
OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
HyperliquidExecTpSl::Tp
}
_ => unreachable!(),
};
let is_market = matches!(
order_type,
OrderType::StopMarket | OrderType::MarketIfTouched
);
HyperliquidExecOrderKind::Trigger {
trigger: HyperliquidExecTriggerParams {
is_market,
trigger_px: trigger_price_decimal,
tpsl,
},
}
} else {
return Err(Error::bad_request("Trigger orders require a trigger price"));
}
}
_ => {
return Err(Error::bad_request(format!(
"Order type {order_type:?} not supported"
)));
}
};
let hyperliquid_order = HyperliquidExecPlaceOrderRequest {
asset,
is_buy,
price: price_decimal,
size: size_decimal,
reduce_only,
kind,
cloid: Some(Cloid::from_client_order_id(client_order_id)),
};
let builder = if self.has_vault_address() {
None
} else {
Some(HyperliquidExecBuilderFee {
address: NAUTILUS_BUILDER_ADDRESS.to_string(),
fee_tenths_bp: 0,
})
};
let action = HyperliquidExecAction::Order {
orders: vec![hyperliquid_order],
grouping: HyperliquidExecGrouping::Na,
builder,
};
let response = self.inner.post_action_exec(&action).await?;
match response {
HyperliquidExchangeResponse::Status {
status,
response: response_data,
} if status == RESPONSE_STATUS_OK => {
let data_value = if let Some(data) = response_data.get("data") {
data.clone()
} else {
response_data
};
let order_response: HyperliquidExecOrderResponseData =
serde_json::from_value(data_value).map_err(|e| {
Error::bad_request(format!("Failed to parse order response: {e}"))
})?;
let order_status = order_response
.statuses
.first()
.ok_or_else(|| Error::bad_request("No order status in response"))?;
let symbol_str = instrument_id.symbol.as_str();
let product_type = HyperliquidProductType::from_symbol(symbol_str).ok();
let asset_str = symbol_str.split('-').next().unwrap_or(symbol_str);
let instrument = self
.get_or_create_instrument(&Ustr::from(asset_str), product_type)
.ok_or_else(|| {
Error::bad_request(format!("Instrument not found for {asset_str}"))
})?;
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let ts_init = self.clock.get_time_ns();
match order_status {
HyperliquidExecOrderStatus::Resting { resting } => Ok(self
.create_order_status_report(
instrument_id,
Some(client_order_id),
VenueOrderId::new(resting.oid.to_string()),
order_side,
order_type,
quantity,
time_in_force,
price,
trigger_price,
OrderStatus::Accepted,
Quantity::new(0.0, instrument.size_precision()),
&instrument,
account_id,
ts_init,
)),
HyperliquidExecOrderStatus::Filled { filled } => {
let filled_qty = Quantity::new(
filled.total_sz.to_string().parse::<f64>().unwrap_or(0.0),
instrument.size_precision(),
);
Ok(self.create_order_status_report(
instrument_id,
Some(client_order_id),
VenueOrderId::new(filled.oid.to_string()),
order_side,
order_type,
quantity,
time_in_force,
price,
trigger_price,
OrderStatus::Filled,
filled_qty,
&instrument,
account_id,
ts_init,
))
}
HyperliquidExecOrderStatus::Error { error } => {
Err(Error::bad_request(format!("Order rejected: {error}")))
}
}
}
HyperliquidExchangeResponse::Error { error } => Err(Error::bad_request(format!(
"Order submission failed: {error}"
))),
_ => Err(Error::bad_request("Unexpected response format")),
}
}
pub async fn submit_order_from_order_any(&self, order: &OrderAny) -> Result<OrderStatusReport> {
self.submit_order(
order.instrument_id(),
order.client_order_id(),
order.order_side(),
order.order_type(),
order.quantity(),
order.time_in_force(),
order.price(),
order.trigger_price(),
order.is_post_only(),
order.is_reduce_only(),
)
.await
}
#[allow(clippy::too_many_arguments)]
fn create_order_status_report(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: VenueOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
order_status: OrderStatus,
filled_qty: Quantity,
_instrument: &InstrumentAny,
account_id: AccountId,
ts_init: UnixNanos,
) -> OrderStatusReport {
let ts_accepted = self.clock.get_time_ns();
let ts_last = ts_accepted;
let report_id = UUID4::new();
let mut report = OrderStatusReport::new(
account_id,
instrument_id,
client_order_id,
venue_order_id,
order_side,
order_type,
time_in_force,
order_status,
quantity,
filled_qty,
ts_accepted,
ts_last,
ts_init,
Some(report_id),
);
if let Some(px) = price {
report = report.with_price(px);
}
if let Some(trig_px) = trigger_price {
report = report
.with_trigger_price(trig_px)
.with_trigger_type(TriggerType::Default);
}
report
}
pub async fn submit_orders(&self, orders: &[&OrderAny]) -> Result<Vec<OrderStatusReport>> {
let mut hyperliquid_orders = Vec::with_capacity(orders.len());
for order in orders {
let instrument_id = order.instrument_id();
let symbol = instrument_id.symbol.as_str();
let asset = self.get_asset_index(symbol).ok_or_else(|| {
Error::bad_request(format!(
"Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
))
})?;
let price_decimals = self.get_price_precision(symbol).unwrap_or(2);
let request = order_to_hyperliquid_request_with_asset(
order,
asset,
price_decimals,
self.normalize_prices,
)
.map_err(|e| Error::bad_request(format!("Failed to convert order: {e}")))?;
hyperliquid_orders.push(request);
}
let builder = if self.has_vault_address() {
None
} else {
Some(HyperliquidExecBuilderFee {
address: NAUTILUS_BUILDER_ADDRESS.to_string(),
fee_tenths_bp: 0,
})
};
let action = HyperliquidExecAction::Order {
orders: hyperliquid_orders,
grouping: HyperliquidExecGrouping::Na,
builder,
};
let response = self.inner.post_action_exec(&action).await?;
match response {
HyperliquidExchangeResponse::Status {
status,
response: response_data,
} if status == RESPONSE_STATUS_OK => {
let data_value = if let Some(data) = response_data.get("data") {
data.clone()
} else {
response_data
};
let order_response: HyperliquidExecOrderResponseData =
serde_json::from_value(data_value).map_err(|e| {
Error::bad_request(format!("Failed to parse order response: {e}"))
})?;
let account_id = self
.account_id
.ok_or_else(|| Error::bad_request("Account ID not set"))?;
let ts_init = self.clock.get_time_ns();
if order_response.statuses.len() != orders.len() {
return Err(Error::bad_request(format!(
"Mismatch between submitted orders ({}) and response statuses ({})",
orders.len(),
order_response.statuses.len()
)));
}
let mut reports = Vec::new();
for (order, order_status) in orders.iter().zip(order_response.statuses.iter()) {
let instrument_id = order.instrument_id();
let symbol = instrument_id.symbol.as_str();
let product_type = HyperliquidProductType::from_symbol(symbol).ok();
let asset = symbol.split('-').next().unwrap_or(symbol);
let instrument = self
.get_or_create_instrument(&Ustr::from(asset), product_type)
.ok_or_else(|| {
Error::bad_request(format!("Instrument not found for {asset}"))
})?;
let report = match order_status {
HyperliquidExecOrderStatus::Resting { resting } => {
self.create_order_status_report(
order.instrument_id(),
Some(order.client_order_id()),
VenueOrderId::new(resting.oid.to_string()),
order.order_side(),
order.order_type(),
order.quantity(),
order.time_in_force(),
order.price(),
order.trigger_price(),
OrderStatus::Accepted,
Quantity::new(0.0, instrument.size_precision()),
&instrument,
account_id,
ts_init,
)
}
HyperliquidExecOrderStatus::Filled { filled } => {
let filled_qty = Quantity::new(
filled.total_sz.to_string().parse::<f64>().unwrap_or(0.0),
instrument.size_precision(),
);
self.create_order_status_report(
order.instrument_id(),
Some(order.client_order_id()),
VenueOrderId::new(filled.oid.to_string()),
order.order_side(),
order.order_type(),
order.quantity(),
order.time_in_force(),
order.price(),
order.trigger_price(),
OrderStatus::Filled,
filled_qty,
&instrument,
account_id,
ts_init,
)
}
HyperliquidExecOrderStatus::Error { error } => {
return Err(Error::bad_request(format!(
"Order {} rejected: {error}",
order.client_order_id()
)));
}
};
reports.push(report);
}
Ok(reports)
}
HyperliquidExchangeResponse::Error { error } => Err(Error::bad_request(format!(
"Order submission failed: {error}"
))),
_ => Err(Error::bad_request("Unexpected response format")),
}
}
}
fn perp_dex_asset_index_base(dex_index: usize) -> u32 {
if dex_index == 0 {
0
} else {
100_000 + dex_index as u32 * 10_000
}
}
#[cfg(test)]
mod tests {
use nautilus_core::{MUTEX_POISONED, time::get_atomic_clock_realtime};
use nautilus_model::{
currencies::CURRENCY_MAP,
enums::CurrencyType,
identifiers::{InstrumentId, Symbol},
instruments::{CurrencyPair, Instrument, InstrumentAny},
types::{Currency, Price, Quantity},
};
use rstest::rstest;
use ustr::Ustr;
use super::HyperliquidHttpClient;
use crate::{
common::{consts::HYPERLIQUID_VENUE, enums::HyperliquidProductType},
http::query::InfoRequest,
};
#[rstest]
fn stable_json_roundtrips() {
let v = serde_json::json!({"type":"l2Book","coin":"BTC"});
let s = serde_json::to_string(&v).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&s).unwrap();
assert_eq!(parsed["type"], "l2Book");
assert_eq!(parsed["coin"], "BTC");
assert_eq!(parsed, v);
}
#[rstest]
fn info_pretty_shape() {
let r = InfoRequest::l2_book("BTC");
let val = serde_json::to_value(&r).unwrap();
let pretty = serde_json::to_string_pretty(&val).unwrap();
assert!(pretty.contains("\"type\": \"l2Book\""));
assert!(pretty.contains("\"coin\": \"BTC\""));
}
#[rstest]
fn test_cache_instrument_by_raw_symbol() {
let client = HyperliquidHttpClient::new(true, 60, None).unwrap();
let base_code = "vntls:vCURSOR";
let quote_code = "USDC";
{
let mut currency_map = CURRENCY_MAP.lock().expect(MUTEX_POISONED);
if !currency_map.contains_key(base_code) {
currency_map.insert(
base_code.to_string(),
Currency::new(base_code, 8, 0, base_code, CurrencyType::Crypto),
);
}
}
let base_currency = Currency::new(base_code, 8, 0, base_code, CurrencyType::Crypto);
let quote_currency = Currency::new(quote_code, 6, 0, quote_code, CurrencyType::Crypto);
let symbol = Symbol::new("vntls:vCURSOR-USDC-SPOT");
let venue = *HYPERLIQUID_VENUE;
let instrument_id = InstrumentId::new(symbol, venue);
let raw_symbol = Symbol::new(base_code);
let clock = get_atomic_clock_realtime();
let ts = clock.get_time_ns();
let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
instrument_id,
raw_symbol,
base_currency,
quote_currency,
8,
8,
Price::from("0.00000001"),
Quantity::from("0.00000001"),
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None, None, ts,
ts,
));
client.cache_instrument(&instrument);
let instruments = client.instruments.load();
let by_full_symbol = instruments.get(&Ustr::from("vntls:vCURSOR-USDC-SPOT"));
assert!(
by_full_symbol.is_some(),
"Instrument should be accessible by full symbol"
);
assert_eq!(by_full_symbol.unwrap().id(), instrument.id());
let by_raw_symbol = instruments.get(&Ustr::from("vntls:vCURSOR"));
assert!(
by_raw_symbol.is_some(),
"Instrument should be accessible by raw_symbol (Hyperliquid coin identifier)"
);
assert_eq!(by_raw_symbol.unwrap().id(), instrument.id());
drop(instruments);
let instruments_by_coin = client.instruments_by_coin.load();
let by_coin =
instruments_by_coin.get(&(Ustr::from("vntls:vCURSOR"), HyperliquidProductType::Spot));
assert!(
by_coin.is_some(),
"Instrument should be accessible by coin and product type"
);
assert_eq!(by_coin.unwrap().id(), instrument.id());
drop(instruments_by_coin);
let retrieved_with_type = client.get_or_create_instrument(
&Ustr::from("vntls:vCURSOR"),
Some(HyperliquidProductType::Spot),
);
assert!(retrieved_with_type.is_some());
assert_eq!(retrieved_with_type.unwrap().id(), instrument.id());
let retrieved_without_type =
client.get_or_create_instrument(&Ustr::from("vntls:vCURSOR"), None);
assert!(retrieved_without_type.is_some());
assert_eq!(retrieved_without_type.unwrap().id(), instrument.id());
}
}