1use std::{
24 collections::HashMap,
25 env,
26 num::NonZeroU32,
27 sync::{Arc, LazyLock, RwLock},
28 time::Duration,
29};
30
31use ahash::AHashMap;
32use anyhow::Context;
33use nautilus_core::{
34 UUID4, UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime,
35};
36use nautilus_model::{
37 data::{Bar, BarType},
38 enums::{
39 AccountType, BarAggregation, CurrencyType, OrderSide, OrderStatus, OrderType, TimeInForce,
40 TriggerType,
41 },
42 events::AccountState,
43 identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
44 instruments::{CurrencyPair, Instrument, InstrumentAny},
45 orders::{Order, OrderAny},
46 reports::{FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, Currency, Money, Price, Quantity},
48};
49use nautilus_network::{
50 http::{HttpClient, HttpClientError, HttpResponse, Method, USER_AGENT},
51 ratelimiter::quota::Quota,
52};
53use rust_decimal::Decimal;
54use serde_json::Value;
55use ustr::Ustr;
56
57use crate::{
58 common::{
59 builder_fee::{resolve_builder_fee, resolve_builder_fee_batch},
60 consts::{HYPERLIQUID_VENUE, exchange_url, info_url},
61 credential::{Secrets, VaultAddress},
62 enums::{
63 HyperliquidBarInterval, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
64 HyperliquidProductType,
65 },
66 parse::{bar_type_to_interval, order_to_hyperliquid_request_with_asset},
67 },
68 http::{
69 error::{Error, Result},
70 models::{
71 Cloid, HyperliquidCandleSnapshot, HyperliquidExchangeRequest,
72 HyperliquidExchangeResponse, HyperliquidExecAction,
73 HyperliquidExecCancelByCloidRequest, HyperliquidExecCancelOrderRequest,
74 HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
75 HyperliquidExecOrderResponseData, HyperliquidExecOrderStatus,
76 HyperliquidExecPlaceOrderRequest, HyperliquidExecTif, HyperliquidExecTpSl,
77 HyperliquidExecTriggerParams, HyperliquidFills, HyperliquidL2Book, HyperliquidMeta,
78 HyperliquidOrderStatus, PerpMeta, PerpMetaAndCtxs, RESPONSE_STATUS_OK, SpotMeta,
79 SpotMetaAndCtxs,
80 },
81 parse::{
82 HyperliquidInstrumentDef, instruments_from_defs_owned, parse_perp_instruments,
83 parse_spot_instruments,
84 },
85 query::{ExchangeAction, InfoRequest},
86 rate_limits::{
87 RateLimitSnapshot, WeightedLimiter, backoff_full_jitter, exchange_weight,
88 info_base_weight, info_extra_weight,
89 },
90 },
91 signing::{
92 HyperliquidActionType, HyperliquidEip712Signer, NonceManager, SignRequest, types::SignerId,
93 },
94};
95
96pub static HYPERLIQUID_REST_QUOTA: LazyLock<Quota> =
98 LazyLock::new(|| Quota::per_minute(NonZeroU32::new(1200).unwrap()));
99
100#[derive(Debug, Clone)]
105#[cfg_attr(
106 feature = "python",
107 pyo3::pyclass(
108 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
109 from_py_object
110 )
111)]
112pub struct HyperliquidRawHttpClient {
113 client: HttpClient,
114 is_testnet: bool,
115 base_info: String,
116 base_exchange: String,
117 signer: Option<HyperliquidEip712Signer>,
118 nonce_manager: Option<Arc<NonceManager>>,
119 vault_address: Option<VaultAddress>,
120 rest_limiter: Arc<WeightedLimiter>,
121 rate_limit_backoff_base: Duration,
122 rate_limit_backoff_cap: Duration,
123 rate_limit_max_attempts_info: u32,
124}
125
126impl HyperliquidRawHttpClient {
127 pub fn new(
133 is_testnet: bool,
134 timeout_secs: Option<u64>,
135 proxy_url: Option<String>,
136 ) -> std::result::Result<Self, HttpClientError> {
137 Ok(Self {
138 client: HttpClient::new(
139 Self::default_headers(),
140 vec![],
141 vec![],
142 Some(*HYPERLIQUID_REST_QUOTA),
143 timeout_secs,
144 proxy_url,
145 )?,
146 is_testnet,
147 base_info: info_url(is_testnet).to_string(),
148 base_exchange: exchange_url(is_testnet).to_string(),
149 signer: None,
150 nonce_manager: None,
151 vault_address: None,
152 rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
153 rate_limit_backoff_base: Duration::from_millis(125),
154 rate_limit_backoff_cap: Duration::from_secs(5),
155 rate_limit_max_attempts_info: 3,
156 })
157 }
158
159 pub fn with_credentials(
166 secrets: &Secrets,
167 timeout_secs: Option<u64>,
168 proxy_url: Option<String>,
169 ) -> std::result::Result<Self, HttpClientError> {
170 let signer = HyperliquidEip712Signer::new(secrets.private_key.clone());
171 let nonce_manager = Arc::new(NonceManager::new());
172
173 Ok(Self {
174 client: HttpClient::new(
175 Self::default_headers(),
176 vec![],
177 vec![],
178 Some(*HYPERLIQUID_REST_QUOTA),
179 timeout_secs,
180 proxy_url,
181 )?,
182 is_testnet: secrets.is_testnet,
183 base_info: info_url(secrets.is_testnet).to_string(),
184 base_exchange: exchange_url(secrets.is_testnet).to_string(),
185 signer: Some(signer),
186 nonce_manager: Some(nonce_manager),
187 vault_address: secrets.vault_address,
188 rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
189 rate_limit_backoff_base: Duration::from_millis(125),
190 rate_limit_backoff_cap: Duration::from_secs(5),
191 rate_limit_max_attempts_info: 3,
192 })
193 }
194
195 pub fn set_base_info_url(&mut self, url: String) {
197 self.base_info = url;
198 }
199
200 pub fn set_base_exchange_url(&mut self, url: String) {
202 self.base_exchange = url;
203 }
204
205 pub fn from_env(is_testnet: bool) -> Result<Self> {
211 let secrets = Secrets::from_env(is_testnet)
212 .map_err(|e| Error::auth(format!("missing credentials in environment: {e}")))?;
213 Self::with_credentials(&secrets, None, None)
214 .map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
215 }
216
217 pub fn from_credentials(
223 private_key: &str,
224 vault_address: Option<&str>,
225 is_testnet: bool,
226 timeout_secs: Option<u64>,
227 proxy_url: Option<String>,
228 ) -> Result<Self> {
229 let secrets = Secrets::from_private_key(private_key, vault_address, is_testnet)
230 .map_err(|e| Error::auth(format!("invalid credentials: {e}")))?;
231 Self::with_credentials(&secrets, timeout_secs, proxy_url)
232 .map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
233 }
234
235 #[must_use]
237 pub fn with_rate_limits(mut self) -> Self {
238 self.rest_limiter = Arc::new(WeightedLimiter::per_minute(1200));
239 self.rate_limit_backoff_base = Duration::from_millis(125);
240 self.rate_limit_backoff_cap = Duration::from_secs(5);
241 self.rate_limit_max_attempts_info = 3;
242 self
243 }
244
245 #[must_use]
247 pub fn is_testnet(&self) -> bool {
248 self.is_testnet
249 }
250
251 pub fn get_user_address(&self) -> Result<String> {
257 self.signer
258 .as_ref()
259 .ok_or_else(|| Error::auth("No signer configured"))?
260 .address()
261 }
262
263 pub fn get_account_address(&self) -> Result<String> {
270 if let Some(vault) = &self.vault_address {
271 Ok(vault.to_hex())
272 } else {
273 self.get_user_address()
274 }
275 }
276
277 fn default_headers() -> HashMap<String, String> {
278 HashMap::from([
279 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
280 ("Content-Type".to_string(), "application/json".to_string()),
281 ])
282 }
283
284 fn signer_id(&self) -> Result<SignerId> {
285 Ok(SignerId("hyperliquid:default".into()))
286 }
287
288 fn parse_retry_after_simple(&self, headers: &HashMap<String, String>) -> Option<u64> {
289 let retry_after = headers.get("retry-after")?;
290 retry_after.parse::<u64>().ok().map(|s| s * 1000) }
292
293 pub async fn info_meta(&self) -> Result<HyperliquidMeta> {
295 let request = InfoRequest::meta();
296 let response = self.send_info_request(&request).await?;
297 serde_json::from_value(response).map_err(Error::Serde)
298 }
299
300 pub async fn get_spot_meta(&self) -> Result<SpotMeta> {
302 let request = InfoRequest::spot_meta();
303 let response = self.send_info_request(&request).await?;
304 serde_json::from_value(response).map_err(Error::Serde)
305 }
306
307 pub async fn get_perp_meta_and_ctxs(&self) -> Result<PerpMetaAndCtxs> {
309 let request = InfoRequest::meta_and_asset_ctxs();
310 let response = self.send_info_request(&request).await?;
311 serde_json::from_value(response).map_err(Error::Serde)
312 }
313
314 pub async fn get_spot_meta_and_ctxs(&self) -> Result<SpotMetaAndCtxs> {
316 let request = InfoRequest::spot_meta_and_asset_ctxs();
317 let response = self.send_info_request(&request).await?;
318 serde_json::from_value(response).map_err(Error::Serde)
319 }
320
321 pub(crate) async fn load_perp_meta(&self) -> Result<PerpMeta> {
322 let request = InfoRequest::meta();
323 let response = self.send_info_request(&request).await?;
324 serde_json::from_value(response).map_err(Error::Serde)
325 }
326
327 pub async fn info_l2_book(&self, coin: &str) -> Result<HyperliquidL2Book> {
329 let request = InfoRequest::l2_book(coin);
330 let response = self.send_info_request(&request).await?;
331 serde_json::from_value(response).map_err(Error::Serde)
332 }
333
334 pub async fn info_user_fills(&self, user: &str) -> Result<HyperliquidFills> {
336 let request = InfoRequest::user_fills(user);
337 let response = self.send_info_request(&request).await?;
338 serde_json::from_value(response).map_err(Error::Serde)
339 }
340
341 pub async fn info_order_status(&self, user: &str, oid: u64) -> Result<HyperliquidOrderStatus> {
343 let request = InfoRequest::order_status(user, oid);
344 let response = self.send_info_request(&request).await?;
345 serde_json::from_value(response).map_err(Error::Serde)
346 }
347
348 pub async fn info_open_orders(&self, user: &str) -> Result<Value> {
350 let request = InfoRequest::open_orders(user);
351 self.send_info_request(&request).await
352 }
353
354 pub async fn info_frontend_open_orders(&self, user: &str) -> Result<Value> {
356 let request = InfoRequest::frontend_open_orders(user);
357 self.send_info_request(&request).await
358 }
359
360 pub async fn info_clearinghouse_state(&self, user: &str) -> Result<Value> {
362 let request = InfoRequest::clearinghouse_state(user);
363 self.send_info_request(&request).await
364 }
365
366 pub async fn info_candle_snapshot(
368 &self,
369 coin: &str,
370 interval: HyperliquidBarInterval,
371 start_time: u64,
372 end_time: u64,
373 ) -> Result<HyperliquidCandleSnapshot> {
374 let request = InfoRequest::candle_snapshot(coin, interval, start_time, end_time);
375 let response = self.send_info_request(&request).await?;
376
377 log::trace!(
378 "Candle snapshot raw response (len={}): {:?}",
379 response.as_array().map_or(0, |a| a.len()),
380 response
381 );
382
383 serde_json::from_value(response).map_err(Error::Serde)
384 }
385
386 pub async fn send_info_request_raw(&self, request: &InfoRequest) -> Result<Value> {
388 self.send_info_request(request).await
389 }
390
391 async fn send_info_request(&self, request: &InfoRequest) -> Result<Value> {
392 let base_w = info_base_weight(request);
393 self.rest_limiter.acquire(base_w).await;
394
395 let mut attempt = 0u32;
396 loop {
397 let response = self.http_roundtrip_info(request).await?;
398
399 if response.status.is_success() {
400 let val: Value = serde_json::from_slice(&response.body).map_err(Error::Serde)?;
402 let extra = info_extra_weight(request, &val);
403 if extra > 0 {
404 self.rest_limiter.debit_extra(extra).await;
405 log::debug!(
406 "Info debited extra weight: endpoint={request:?}, base_w={base_w}, extra={extra}"
407 );
408 }
409 return Ok(val);
410 }
411
412 if response.status.as_u16() == 429 {
414 if attempt >= self.rate_limit_max_attempts_info {
415 let ra = self.parse_retry_after_simple(&response.headers);
416 return Err(Error::rate_limit("info", base_w, ra));
417 }
418 let delay = self
419 .parse_retry_after_simple(&response.headers)
420 .map_or_else(
421 || {
422 backoff_full_jitter(
423 attempt,
424 self.rate_limit_backoff_base,
425 self.rate_limit_backoff_cap,
426 )
427 },
428 Duration::from_millis,
429 );
430 log::warn!(
431 "429 Too Many Requests; backing off: endpoint={request:?}, attempt={attempt}, wait_ms={:?}",
432 delay.as_millis()
433 );
434 attempt += 1;
435 tokio::time::sleep(delay).await;
436 self.rest_limiter.acquire(1).await;
438 continue;
439 }
440
441 if (response.status.is_server_error() || response.status.as_u16() == 408)
443 && attempt < self.rate_limit_max_attempts_info
444 {
445 let delay = backoff_full_jitter(
446 attempt,
447 self.rate_limit_backoff_base,
448 self.rate_limit_backoff_cap,
449 );
450 log::warn!(
451 "Transient error; retrying: endpoint={request:?}, attempt={attempt}, status={:?}, wait_ms={:?}",
452 response.status.as_u16(),
453 delay.as_millis()
454 );
455 attempt += 1;
456 tokio::time::sleep(delay).await;
457 continue;
458 }
459
460 let error_body = String::from_utf8_lossy(&response.body);
462 return Err(Error::http(
463 response.status.as_u16(),
464 error_body.to_string(),
465 ));
466 }
467 }
468
469 async fn http_roundtrip_info(&self, request: &InfoRequest) -> Result<HttpResponse> {
470 let url = &self.base_info;
471 let body = serde_json::to_value(request).map_err(Error::Serde)?;
472 let body_bytes = serde_json::to_string(&body)
473 .map_err(Error::Serde)?
474 .into_bytes();
475
476 self.client
477 .request(
478 Method::POST,
479 url.clone(),
480 None,
481 None,
482 Some(body_bytes),
483 None,
484 None,
485 )
486 .await
487 .map_err(Error::from_http_client)
488 }
489
490 pub async fn post_action(
492 &self,
493 action: &ExchangeAction,
494 ) -> Result<HyperliquidExchangeResponse> {
495 let w = exchange_weight(action);
496 self.rest_limiter.acquire(w).await;
497
498 let signer = self
499 .signer
500 .as_ref()
501 .ok_or_else(|| Error::auth("credentials required for exchange operations"))?;
502
503 let nonce_manager = self
504 .nonce_manager
505 .as_ref()
506 .ok_or_else(|| Error::auth("nonce manager missing"))?;
507
508 let signer_id = self.signer_id()?;
509 let time_nonce = nonce_manager.next(signer_id)?;
510
511 let action_value = serde_json::to_value(action)
512 .context("serialize exchange action")
513 .map_err(|e| Error::bad_request(e.to_string()))?;
514
515 let action_bytes = rmp_serde::to_vec_named(action)
517 .context("serialize action with MessagePack")
518 .map_err(|e| Error::bad_request(e.to_string()))?;
519
520 let sign_request = SignRequest {
521 action: action_value.clone(),
522 action_bytes: Some(action_bytes),
523 time_nonce,
524 action_type: HyperliquidActionType::L1,
525 is_testnet: self.is_testnet,
526 vault_address: self.vault_address.as_ref().map(|v| v.to_hex()),
527 };
528
529 let sig = signer.sign(&sign_request)?.signature;
530
531 let nonce_u64 = time_nonce.as_millis() as u64;
532
533 let request = if let Some(vault) = self.vault_address {
534 HyperliquidExchangeRequest::with_vault(
535 action.clone(),
536 nonce_u64,
537 sig,
538 vault.to_string(),
539 )
540 .map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
541 } else {
542 HyperliquidExchangeRequest::new(action.clone(), nonce_u64, sig)
543 .map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
544 };
545
546 let response = self.http_roundtrip_exchange(&request).await?;
547
548 if response.status.is_success() {
549 let parsed_response: HyperliquidExchangeResponse =
550 serde_json::from_slice(&response.body).map_err(Error::Serde)?;
551
552 match &parsed_response {
554 HyperliquidExchangeResponse::Status {
555 status,
556 response: response_data,
557 } if status == "err" => {
558 let error_msg = response_data
559 .as_str()
560 .map_or_else(|| response_data.to_string(), |s| s.to_string());
561 log::error!("Hyperliquid API returned error: {error_msg}");
562 Err(Error::bad_request(format!("API error: {error_msg}")))
563 }
564 HyperliquidExchangeResponse::Error { error } => {
565 log::error!("Hyperliquid API returned error: {error}");
566 Err(Error::bad_request(format!("API error: {error}")))
567 }
568 _ => Ok(parsed_response),
569 }
570 } else if response.status.as_u16() == 429 {
571 let ra = self.parse_retry_after_simple(&response.headers);
572 Err(Error::rate_limit("exchange", w, ra))
573 } else {
574 let error_body = String::from_utf8_lossy(&response.body);
575 log::error!(
576 "Exchange API error (status {}): {}",
577 response.status.as_u16(),
578 error_body
579 );
580 Err(Error::http(
581 response.status.as_u16(),
582 error_body.to_string(),
583 ))
584 }
585 }
586
587 pub async fn post_action_exec(
592 &self,
593 action: &HyperliquidExecAction,
594 ) -> Result<HyperliquidExchangeResponse> {
595 let w = match action {
596 HyperliquidExecAction::Order { orders, .. } => 1 + (orders.len() as u32 / 40),
597 HyperliquidExecAction::Cancel { cancels } => 1 + (cancels.len() as u32 / 40),
598 HyperliquidExecAction::CancelByCloid { cancels } => 1 + (cancels.len() as u32 / 40),
599 HyperliquidExecAction::BatchModify { modifies } => 1 + (modifies.len() as u32 / 40),
600 _ => 1,
601 };
602 self.rest_limiter.acquire(w).await;
603
604 let signer = self
605 .signer
606 .as_ref()
607 .ok_or_else(|| Error::auth("credentials required for exchange operations"))?;
608
609 let nonce_manager = self
610 .nonce_manager
611 .as_ref()
612 .ok_or_else(|| Error::auth("nonce manager missing"))?;
613
614 let signer_id = self.signer_id()?;
615 let time_nonce = nonce_manager.next(signer_id)?;
616 let action_value = serde_json::to_value(action)
619 .context("serialize exchange action")
620 .map_err(|e| Error::bad_request(e.to_string()))?;
621
622 let action_bytes = rmp_serde::to_vec_named(action)
624 .context("serialize action with MessagePack")
625 .map_err(|e| Error::bad_request(e.to_string()))?;
626
627 let sig = signer
628 .sign(&SignRequest {
629 action: action_value.clone(),
630 action_bytes: Some(action_bytes),
631 time_nonce,
632 action_type: HyperliquidActionType::L1,
633 is_testnet: self.is_testnet,
634 vault_address: self.vault_address.as_ref().map(|v| v.to_hex()),
635 })?
636 .signature;
637
638 let request = if let Some(vault) = self.vault_address {
639 HyperliquidExchangeRequest::with_vault(
640 action.clone(),
641 time_nonce.as_millis() as u64,
642 sig,
643 vault.to_string(),
644 )
645 .map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
646 } else {
647 HyperliquidExchangeRequest::new(action.clone(), time_nonce.as_millis() as u64, sig)
648 .map_err(|e| Error::bad_request(format!("Failed to create request: {e}")))?
649 };
650
651 let response = self.http_roundtrip_exchange(&request).await?;
652
653 if response.status.is_success() {
654 let parsed_response: HyperliquidExchangeResponse =
655 serde_json::from_slice(&response.body).map_err(Error::Serde)?;
656
657 match &parsed_response {
659 HyperliquidExchangeResponse::Status {
660 status,
661 response: response_data,
662 } if status == "err" => {
663 let error_msg = response_data
664 .as_str()
665 .map_or_else(|| response_data.to_string(), |s| s.to_string());
666 log::error!("Hyperliquid API returned error: {error_msg}");
667 Err(Error::bad_request(format!("API error: {error_msg}")))
668 }
669 HyperliquidExchangeResponse::Error { error } => {
670 log::error!("Hyperliquid API returned error: {error}");
671 Err(Error::bad_request(format!("API error: {error}")))
672 }
673 _ => Ok(parsed_response),
674 }
675 } else if response.status.as_u16() == 429 {
676 let ra = self.parse_retry_after_simple(&response.headers);
677 Err(Error::rate_limit("exchange", w, ra))
678 } else {
679 let error_body = String::from_utf8_lossy(&response.body);
680 Err(Error::http(
681 response.status.as_u16(),
682 error_body.to_string(),
683 ))
684 }
685 }
686
687 pub async fn rest_limiter_snapshot(&self) -> RateLimitSnapshot {
690 self.rest_limiter.snapshot().await
691 }
692 async fn http_roundtrip_exchange<T>(
693 &self,
694 request: &HyperliquidExchangeRequest<T>,
695 ) -> Result<HttpResponse>
696 where
697 T: serde::Serialize,
698 {
699 let url = &self.base_exchange;
700 let body = serde_json::to_string(&request).map_err(Error::Serde)?;
701 let body_bytes = body.into_bytes();
702
703 let response = self
704 .client
705 .request(
706 Method::POST,
707 url.clone(),
708 None,
709 None,
710 Some(body_bytes),
711 None,
712 None,
713 )
714 .await
715 .map_err(Error::from_http_client)?;
716
717 Ok(response)
718 }
719}
720
721#[derive(Debug, Clone)]
727#[cfg_attr(
728 feature = "python",
729 pyo3::pyclass(
730 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
731 from_py_object
732 )
733)]
734pub struct HyperliquidHttpClient {
735 pub(crate) inner: Arc<HyperliquidRawHttpClient>,
736 instruments: Arc<RwLock<AHashMap<Ustr, InstrumentAny>>>,
737 instruments_by_coin: Arc<RwLock<AHashMap<(Ustr, HyperliquidProductType), InstrumentAny>>>,
738 asset_indices: Arc<RwLock<AHashMap<Ustr, u32>>>,
740 spot_fill_coins: Arc<RwLock<AHashMap<Ustr, Ustr>>>,
742 account_id: Option<AccountId>,
743}
744
745impl Default for HyperliquidHttpClient {
746 fn default() -> Self {
747 Self::new(true, None, None).expect("Failed to create default Hyperliquid HTTP client")
748 }
749}
750
751impl HyperliquidHttpClient {
752 pub fn new(
758 is_testnet: bool,
759 timeout_secs: Option<u64>,
760 proxy_url: Option<String>,
761 ) -> std::result::Result<Self, HttpClientError> {
762 let raw_client = HyperliquidRawHttpClient::new(is_testnet, timeout_secs, proxy_url)?;
763 Ok(Self::from_raw(raw_client))
764 }
765
766 pub fn with_secrets(
772 secrets: &Secrets,
773 timeout_secs: Option<u64>,
774 proxy_url: Option<String>,
775 ) -> std::result::Result<Self, HttpClientError> {
776 let raw_client =
777 HyperliquidRawHttpClient::with_credentials(secrets, timeout_secs, proxy_url)?;
778 Ok(Self::from_raw(raw_client))
779 }
780
781 fn from_raw(raw_client: HyperliquidRawHttpClient) -> Self {
782 Self {
783 inner: Arc::new(raw_client),
784 instruments: Arc::new(RwLock::new(AHashMap::new())),
785 instruments_by_coin: Arc::new(RwLock::new(AHashMap::new())),
786 asset_indices: Arc::new(RwLock::new(AHashMap::new())),
787 spot_fill_coins: Arc::new(RwLock::new(AHashMap::new())),
788 account_id: None,
789 }
790 }
791
792 pub fn set_base_info_url(&mut self, url: String) {
798 Arc::get_mut(&mut self.inner)
799 .expect("cannot override URL: Arc has multiple references")
800 .set_base_info_url(url);
801 }
802
803 pub fn set_base_exchange_url(&mut self, url: String) {
809 Arc::get_mut(&mut self.inner)
810 .expect("cannot override URL: Arc has multiple references")
811 .set_base_exchange_url(url);
812 }
813
814 pub fn from_env(is_testnet: bool) -> Result<Self> {
820 let raw_client = HyperliquidRawHttpClient::from_env(is_testnet)?;
821 Ok(Self {
822 inner: Arc::new(raw_client),
823 instruments: Arc::new(RwLock::new(AHashMap::new())),
824 instruments_by_coin: Arc::new(RwLock::new(AHashMap::new())),
825 asset_indices: Arc::new(RwLock::new(AHashMap::new())),
826 spot_fill_coins: Arc::new(RwLock::new(AHashMap::new())),
827 account_id: None,
828 })
829 }
830
831 pub fn with_credentials(
844 private_key: Option<String>,
845 vault_address: Option<String>,
846 is_testnet: bool,
847 timeout_secs: Option<u64>,
848 proxy_url: Option<String>,
849 ) -> Result<Self> {
850 let pk_env_var = if is_testnet {
852 "HYPERLIQUID_TESTNET_PK"
853 } else {
854 "HYPERLIQUID_PK"
855 };
856 let vault_env_var = if is_testnet {
857 "HYPERLIQUID_TESTNET_VAULT"
858 } else {
859 "HYPERLIQUID_VAULT"
860 };
861
862 let resolved_pk = match private_key {
864 Some(pk) => Some(pk),
865 None => env::var(pk_env_var).ok(),
866 };
867
868 let resolved_vault = match vault_address {
870 Some(vault) => Some(vault),
871 None => env::var(vault_env_var).ok(),
872 };
873
874 match resolved_pk {
875 Some(pk) => {
876 let raw_client = HyperliquidRawHttpClient::from_credentials(
877 &pk,
878 resolved_vault.as_deref(),
879 is_testnet,
880 timeout_secs,
881 proxy_url,
882 )?;
883 Ok(Self {
884 inner: Arc::new(raw_client),
885 instruments: Arc::new(RwLock::new(AHashMap::new())),
886 instruments_by_coin: Arc::new(RwLock::new(AHashMap::new())),
887 asset_indices: Arc::new(RwLock::new(AHashMap::new())),
888 spot_fill_coins: Arc::new(RwLock::new(AHashMap::new())),
889 account_id: None,
890 })
891 }
892 None => {
893 Self::new(is_testnet, timeout_secs, proxy_url)
895 .map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
896 }
897 }
898 }
899
900 pub fn from_credentials(
906 private_key: &str,
907 vault_address: Option<&str>,
908 is_testnet: bool,
909 timeout_secs: Option<u64>,
910 proxy_url: Option<String>,
911 ) -> Result<Self> {
912 let raw_client = HyperliquidRawHttpClient::from_credentials(
913 private_key,
914 vault_address,
915 is_testnet,
916 timeout_secs,
917 proxy_url,
918 )?;
919 Ok(Self {
920 inner: Arc::new(raw_client),
921 instruments: Arc::new(RwLock::new(AHashMap::new())),
922 instruments_by_coin: Arc::new(RwLock::new(AHashMap::new())),
923 asset_indices: Arc::new(RwLock::new(AHashMap::new())),
924 spot_fill_coins: Arc::new(RwLock::new(AHashMap::new())),
925 account_id: None,
926 })
927 }
928
929 #[must_use]
931 pub fn is_testnet(&self) -> bool {
932 self.inner.is_testnet()
933 }
934
935 pub fn get_user_address(&self) -> Result<String> {
941 self.inner.get_user_address()
942 }
943
944 pub fn get_account_address(&self) -> Result<String> {
951 self.inner.get_account_address()
952 }
953
954 pub fn cache_instrument(&self, instrument: InstrumentAny) {
963 let full_symbol = instrument.symbol().inner();
964 let coin = instrument.raw_symbol().inner();
965
966 {
967 let mut instruments = self
968 .instruments
969 .write()
970 .expect("Failed to acquire write lock");
971
972 instruments.insert(full_symbol, instrument.clone());
973
974 instruments.insert(coin, instrument.clone());
976 }
977
978 if let Ok(product_type) = HyperliquidProductType::from_symbol(full_symbol.as_str()) {
980 let mut instruments_by_coin = self
981 .instruments_by_coin
982 .write()
983 .expect("Failed to acquire write lock");
984 instruments_by_coin.insert((coin, product_type), instrument.clone());
985
986 if coin.as_str().starts_with('@')
990 && let Some(base) = full_symbol.as_str().split('-').next()
991 {
992 let base_ustr = Ustr::from(base);
993 if base_ustr != coin {
994 instruments_by_coin.insert((base_ustr, product_type), instrument);
995 }
996 }
997 } else {
998 log::warn!("Unable to determine product type for symbol: {full_symbol}");
999 }
1000 }
1001
1002 fn get_or_create_instrument(
1003 &self,
1004 coin: &Ustr,
1005 product_type: Option<HyperliquidProductType>,
1006 ) -> Option<InstrumentAny> {
1007 if let Some(pt) = product_type {
1008 let instruments_by_coin = self
1009 .instruments_by_coin
1010 .read()
1011 .expect("Failed to acquire read lock");
1012
1013 if let Some(instrument) = instruments_by_coin.get(&(*coin, pt)) {
1014 return Some(instrument.clone());
1015 }
1016 }
1017
1018 if product_type.is_none() {
1020 let instruments_by_coin = self
1021 .instruments_by_coin
1022 .read()
1023 .expect("Failed to acquire read lock");
1024
1025 if let Some(instrument) =
1026 instruments_by_coin.get(&(*coin, HyperliquidProductType::Perp))
1027 {
1028 return Some(instrument.clone());
1029 }
1030 if let Some(instrument) =
1031 instruments_by_coin.get(&(*coin, HyperliquidProductType::Spot))
1032 {
1033 return Some(instrument.clone());
1034 }
1035 }
1036
1037 if coin.as_str().starts_with('@') {
1039 let spot_fill_coins = self
1040 .spot_fill_coins
1041 .read()
1042 .expect("Failed to acquire read lock");
1043 if let Some(symbol) = spot_fill_coins.get(coin) {
1044 let instruments = self
1047 .instruments
1048 .read()
1049 .expect("Failed to acquire read lock");
1050 if let Some(instrument) = instruments.get(symbol) {
1051 return Some(instrument.clone());
1052 }
1053 }
1054 }
1055
1056 if coin.as_str().starts_with("vntls:") {
1058 log::info!("Creating synthetic instrument for vault token: {coin}");
1059
1060 let clock = nautilus_core::time::get_atomic_clock_realtime();
1061 let ts_event = clock.get_time_ns();
1062
1063 let symbol_str = format!("{coin}-USDC-SPOT");
1065 let symbol = Symbol::new(&symbol_str);
1066 let venue = *HYPERLIQUID_VENUE;
1067 let instrument_id = InstrumentId::new(symbol, venue);
1068
1069 let base_currency = Currency::new(
1071 coin.as_str(),
1072 8, 0, coin.as_str(),
1075 CurrencyType::Crypto,
1076 );
1077
1078 let quote_currency = Currency::new(
1079 "USDC",
1080 6, 0,
1082 "USDC",
1083 CurrencyType::Crypto,
1084 );
1085
1086 let price_increment = Price::from("0.00000001");
1087 let size_increment = Quantity::from("0.00000001");
1088
1089 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1090 instrument_id,
1091 symbol,
1092 base_currency,
1093 quote_currency,
1094 8, 8, price_increment,
1097 size_increment,
1098 None, None, None, None, None, None, None, None, None, None, None, None, None, ts_event,
1112 ts_event,
1113 ));
1114
1115 self.cache_instrument(instrument.clone());
1116
1117 Some(instrument)
1118 } else {
1119 log::warn!("Instrument not found in cache: {coin}");
1121 None
1122 }
1123 }
1124
1125 pub fn set_account_id(&mut self, account_id: AccountId) {
1129 self.account_id = Some(account_id);
1130 }
1131
1132 #[allow(clippy::missing_panics_doc)]
1135 pub async fn request_instruments(&self) -> Result<Vec<InstrumentAny>> {
1136 let mut defs: Vec<HyperliquidInstrumentDef> = Vec::new();
1137
1138 match self.inner.load_perp_meta().await {
1139 Ok(perp_meta) => match parse_perp_instruments(&perp_meta) {
1140 Ok(perp_defs) => {
1141 log::debug!(
1142 "Loaded Hyperliquid perp definitions: count={}",
1143 perp_defs.len(),
1144 );
1145 defs.extend(perp_defs);
1146 }
1147 Err(e) => {
1148 log::warn!("Failed to parse Hyperliquid perp instruments: {e}");
1149 }
1150 },
1151 Err(e) => {
1152 log::warn!("Failed to load Hyperliquid perp metadata: {e}");
1153 }
1154 }
1155
1156 match self.inner.get_spot_meta().await {
1157 Ok(spot_meta) => match parse_spot_instruments(&spot_meta) {
1158 Ok(spot_defs) => {
1159 log::debug!(
1160 "Loaded Hyperliquid spot definitions: count={}",
1161 spot_defs.len(),
1162 );
1163 defs.extend(spot_defs);
1164 }
1165 Err(e) => {
1166 log::warn!("Failed to parse Hyperliquid spot instruments: {e}");
1167 }
1168 },
1169 Err(e) => {
1170 log::warn!("Failed to load Hyperliquid spot metadata: {e}");
1171 }
1172 }
1173
1174 {
1176 let mut asset_indices = self
1177 .asset_indices
1178 .write()
1179 .expect("Failed to acquire write lock");
1180 for def in &defs {
1181 asset_indices.insert(def.symbol, def.asset_index);
1182 }
1183 log::debug!(
1184 "Populated asset indices map (count={})",
1185 asset_indices.len()
1186 );
1187 }
1188
1189 Ok(instruments_from_defs_owned(defs))
1190 }
1191
1192 pub fn get_asset_index(&self, symbol: &str) -> Option<u32> {
1204 let asset_indices = self
1205 .asset_indices
1206 .read()
1207 .expect("Failed to acquire read lock");
1208 asset_indices.get(&Ustr::from(symbol)).copied()
1209 }
1210
1211 #[must_use]
1223 pub fn get_spot_fill_coin_mapping(&self) -> AHashMap<Ustr, Ustr> {
1224 const SPOT_INDEX_OFFSET: u32 = 10000;
1225
1226 let asset_indices = self
1227 .asset_indices
1228 .read()
1229 .expect("Failed to acquire read lock");
1230
1231 let mut mapping = AHashMap::new();
1232 for (symbol, &asset_index) in asset_indices.iter() {
1233 if asset_index >= SPOT_INDEX_OFFSET {
1235 let pair_index = asset_index - SPOT_INDEX_OFFSET;
1236 let fill_coin = Ustr::from(&format!("@{pair_index}"));
1237 mapping.insert(fill_coin, *symbol);
1238 }
1239 }
1240
1241 {
1243 let mut spot_fill_coins = self
1244 .spot_fill_coins
1245 .write()
1246 .expect("Failed to acquire write lock");
1247 *spot_fill_coins = mapping.clone();
1248 }
1249
1250 mapping
1251 }
1252
1253 #[allow(dead_code)]
1255 pub(crate) async fn load_perp_meta(&self) -> Result<PerpMeta> {
1256 self.inner.load_perp_meta().await
1257 }
1258
1259 #[allow(dead_code)]
1261 pub(crate) async fn get_spot_meta(&self) -> Result<SpotMeta> {
1262 self.inner.get_spot_meta().await
1263 }
1264
1265 pub async fn info_l2_book(&self, coin: &str) -> Result<HyperliquidL2Book> {
1267 self.inner.info_l2_book(coin).await
1268 }
1269
1270 pub async fn info_user_fills(&self, user: &str) -> Result<HyperliquidFills> {
1272 self.inner.info_user_fills(user).await
1273 }
1274
1275 pub async fn info_order_status(&self, user: &str, oid: u64) -> Result<HyperliquidOrderStatus> {
1277 self.inner.info_order_status(user, oid).await
1278 }
1279
1280 pub async fn info_open_orders(&self, user: &str) -> Result<Value> {
1282 self.inner.info_open_orders(user).await
1283 }
1284
1285 pub async fn info_frontend_open_orders(&self, user: &str) -> Result<Value> {
1287 self.inner.info_frontend_open_orders(user).await
1288 }
1289
1290 pub async fn info_clearinghouse_state(&self, user: &str) -> Result<Value> {
1292 self.inner.info_clearinghouse_state(user).await
1293 }
1294
1295 pub async fn info_candle_snapshot(
1297 &self,
1298 coin: &str,
1299 interval: HyperliquidBarInterval,
1300 start_time: u64,
1301 end_time: u64,
1302 ) -> Result<HyperliquidCandleSnapshot> {
1303 self.inner
1304 .info_candle_snapshot(coin, interval, start_time, end_time)
1305 .await
1306 }
1307
1308 pub async fn post_action(
1310 &self,
1311 action: &ExchangeAction,
1312 ) -> Result<HyperliquidExchangeResponse> {
1313 self.inner.post_action(action).await
1314 }
1315
1316 pub async fn post_action_exec(
1318 &self,
1319 action: &HyperliquidExecAction,
1320 ) -> Result<HyperliquidExchangeResponse> {
1321 self.inner.post_action_exec(action).await
1322 }
1323
1324 pub async fn info_meta(&self) -> Result<HyperliquidMeta> {
1326 self.inner.info_meta().await
1327 }
1328
1329 pub async fn cancel_order(
1339 &self,
1340 instrument_id: InstrumentId,
1341 client_order_id: Option<ClientOrderId>,
1342 venue_order_id: Option<VenueOrderId>,
1343 ) -> Result<()> {
1344 let symbol = instrument_id.symbol.as_str();
1346 let asset_id = self.get_asset_index(symbol).ok_or_else(|| {
1347 Error::bad_request(format!(
1348 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
1349 ))
1350 })?;
1351
1352 let action = if let Some(cloid) = client_order_id {
1354 let cloid_hash = Cloid::from_client_order_id(cloid);
1356 let cancel_req = HyperliquidExecCancelByCloidRequest {
1357 asset: asset_id,
1358 cloid: cloid_hash,
1359 };
1360 HyperliquidExecAction::CancelByCloid {
1361 cancels: vec![cancel_req],
1362 }
1363 } else if let Some(oid) = venue_order_id {
1364 let oid_u64 = oid
1365 .as_str()
1366 .parse::<u64>()
1367 .map_err(|_| Error::bad_request("Invalid venue order ID format"))?;
1368 let cancel_req = HyperliquidExecCancelOrderRequest {
1369 asset: asset_id,
1370 oid: oid_u64,
1371 };
1372 HyperliquidExecAction::Cancel {
1373 cancels: vec![cancel_req],
1374 }
1375 } else {
1376 return Err(Error::bad_request(
1377 "Either client_order_id or venue_order_id must be provided",
1378 ));
1379 };
1380
1381 let response = self.inner.post_action_exec(&action).await?;
1383
1384 match response {
1386 ref r @ HyperliquidExchangeResponse::Status { .. } if r.is_ok() => Ok(()),
1387 HyperliquidExchangeResponse::Status {
1388 status,
1389 response: error_data,
1390 } => Err(Error::bad_request(format!(
1391 "Cancel order failed: status={status}, error={error_data}"
1392 ))),
1393 HyperliquidExchangeResponse::Error { error } => {
1394 Err(Error::bad_request(format!("Cancel order error: {error}")))
1395 }
1396 }
1397 }
1398
1399 #[allow(clippy::missing_panics_doc)]
1412 pub async fn request_order_status_reports(
1413 &self,
1414 user: &str,
1415 instrument_id: Option<InstrumentId>,
1416 ) -> Result<Vec<OrderStatusReport>> {
1417 let account_id = self
1418 .account_id
1419 .ok_or_else(|| Error::bad_request("Account ID not set"))?;
1420 let response = self.info_frontend_open_orders(user).await?;
1421
1422 let orders: Vec<serde_json::Value> = serde_json::from_value(response)
1424 .map_err(|e| Error::bad_request(format!("Failed to parse orders: {e}")))?;
1425
1426 let mut reports = Vec::new();
1427 let ts_init = get_atomic_clock_realtime().get_time_ns();
1428
1429 for order_value in orders {
1430 let order: crate::websocket::messages::WsBasicOrderData =
1432 match serde_json::from_value(order_value.clone()) {
1433 Ok(o) => o,
1434 Err(e) => {
1435 log::warn!("Failed to parse order: {e}");
1436 continue;
1437 }
1438 };
1439
1440 let instrument = match self.get_or_create_instrument(&order.coin, None) {
1442 Some(inst) => inst,
1443 None => continue, };
1445
1446 if let Some(filter_id) = instrument_id
1448 && instrument.id() != filter_id
1449 {
1450 continue;
1451 }
1452
1453 let status = HyperliquidOrderStatusEnum::Open;
1455
1456 match crate::http::parse::parse_order_status_report_from_basic(
1458 &order,
1459 &status,
1460 &instrument,
1461 account_id,
1462 ts_init,
1463 ) {
1464 Ok(report) => reports.push(report),
1465 Err(e) => log::error!("Failed to parse order status report: {e}"),
1466 }
1467 }
1468
1469 Ok(reports)
1470 }
1471
1472 pub async fn request_fill_reports(
1486 &self,
1487 user: &str,
1488 instrument_id: Option<InstrumentId>,
1489 ) -> Result<Vec<FillReport>> {
1490 let account_id = self
1491 .account_id
1492 .ok_or_else(|| Error::bad_request("Account ID not set"))?;
1493 let fills_response = self.info_user_fills(user).await?;
1494
1495 let mut reports = Vec::new();
1496 let ts_init = get_atomic_clock_realtime().get_time_ns();
1497
1498 for fill in fills_response {
1499 let instrument = match self.get_or_create_instrument(&fill.coin, None) {
1501 Some(inst) => inst,
1502 None => continue, };
1504
1505 if let Some(filter_id) = instrument_id
1507 && instrument.id() != filter_id
1508 {
1509 continue;
1510 }
1511
1512 match crate::http::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1514 Ok(report) => reports.push(report),
1515 Err(e) => log::error!("Failed to parse fill report: {e}"),
1516 }
1517 }
1518
1519 Ok(reports)
1520 }
1521
1522 pub async fn request_position_status_reports(
1536 &self,
1537 user: &str,
1538 instrument_id: Option<InstrumentId>,
1539 ) -> Result<Vec<PositionStatusReport>> {
1540 let account_id = self
1541 .account_id
1542 .ok_or_else(|| Error::bad_request("Account ID not set"))?;
1543 let state_response = self.info_clearinghouse_state(user).await?;
1544
1545 let asset_positions: Vec<serde_json::Value> = state_response
1547 .get("assetPositions")
1548 .and_then(|v| v.as_array())
1549 .ok_or_else(|| Error::bad_request("assetPositions not found in clearinghouse state"))?
1550 .clone();
1551
1552 let mut reports = Vec::new();
1553 let ts_init = get_atomic_clock_realtime().get_time_ns();
1554
1555 for position_value in asset_positions {
1556 let coin = position_value
1558 .get("position")
1559 .and_then(|p| p.get("coin"))
1560 .and_then(|c| c.as_str())
1561 .ok_or_else(|| Error::bad_request("coin not found in position"))?;
1562
1563 let coin_ustr = Ustr::from(coin);
1565 let instrument = match self.get_or_create_instrument(&coin_ustr, None) {
1566 Some(inst) => inst,
1567 None => continue, };
1569
1570 if let Some(filter_id) = instrument_id
1572 && instrument.id() != filter_id
1573 {
1574 continue;
1575 }
1576
1577 match crate::http::parse::parse_position_status_report(
1579 &position_value,
1580 &instrument,
1581 account_id,
1582 ts_init,
1583 ) {
1584 Ok(report) => reports.push(report),
1585 Err(e) => log::error!("Failed to parse position status report: {e}"),
1586 }
1587 }
1588
1589 Ok(reports)
1590 }
1591
1592 pub async fn request_account_state(&self, user: &str) -> Result<AccountState> {
1600 let account_id = self
1601 .account_id
1602 .ok_or_else(|| Error::bad_request("Account ID not set"))?;
1603 let state_response = self.info_clearinghouse_state(user).await?;
1604 let ts_init = get_atomic_clock_realtime().get_time_ns();
1605
1606 log::trace!("Clearinghouse state response: {state_response}");
1607
1608 let state: crate::http::models::ClearinghouseState =
1610 serde_json::from_value(state_response.clone()).map_err(|e| {
1611 log::error!("Failed to parse clearinghouse state: {e}");
1612 log::debug!("Raw response: {state_response}");
1613 Error::bad_request(format!("Failed to parse clearinghouse state: {e}"))
1614 })?;
1615
1616 let usdc = Currency::new("USDC", 6, 0, "0.000001", CurrencyType::Crypto);
1618
1619 let balances = if let Some(margin) = &state.cross_margin_summary {
1621 let mut total = margin.total_raw_usd.max(Decimal::ZERO);
1622 let free = state.withdrawable.unwrap_or(total).max(Decimal::ZERO);
1623
1624 if free > total {
1626 log::debug!("Adjusting total ({total}) to match withdrawable ({free})");
1627 total = free;
1628 }
1629
1630 let locked = (total - free).max(Decimal::ZERO);
1631
1632 vec![AccountBalance::new(
1633 Money::from_decimal(total, usdc).map_err(|e| Error::decode(e.to_string()))?,
1634 Money::from_decimal(locked, usdc).map_err(|e| Error::decode(e.to_string()))?,
1635 Money::from_decimal(free, usdc).map_err(|e| Error::decode(e.to_string()))?,
1636 )]
1637 } else {
1638 let free = state
1640 .withdrawable
1641 .unwrap_or(Decimal::ZERO)
1642 .max(Decimal::ZERO);
1643
1644 vec![AccountBalance::new(
1645 Money::from_decimal(free, usdc).map_err(|e| Error::decode(e.to_string()))?,
1646 Money::zero(usdc),
1647 Money::from_decimal(free, usdc).map_err(|e| Error::decode(e.to_string()))?,
1648 )]
1649 };
1650
1651 Ok(AccountState::new(
1652 account_id,
1653 AccountType::Margin,
1654 balances,
1655 vec![], true, UUID4::new(),
1658 ts_init,
1659 ts_init,
1660 None,
1661 ))
1662 }
1663
1664 pub async fn request_bars(
1681 &self,
1682 bar_type: BarType,
1683 start: Option<chrono::DateTime<chrono::Utc>>,
1684 end: Option<chrono::DateTime<chrono::Utc>>,
1685 limit: Option<u32>,
1686 ) -> Result<Vec<Bar>> {
1687 let instrument_id = bar_type.instrument_id();
1688 let symbol = instrument_id.symbol;
1689
1690 let product_type = HyperliquidProductType::from_symbol(symbol.as_str()).ok();
1691
1692 let base = Ustr::from(
1694 symbol
1695 .as_str()
1696 .split('-')
1697 .next()
1698 .ok_or_else(|| Error::bad_request("Invalid instrument symbol"))?,
1699 );
1700
1701 let instrument = self
1702 .get_or_create_instrument(&base, product_type)
1703 .ok_or_else(|| {
1704 Error::bad_request(format!("Instrument not found in cache: {instrument_id}"))
1705 })?;
1706
1707 let coin = instrument.raw_symbol().inner();
1712
1713 let price_precision = instrument.price_precision();
1714 let size_precision = instrument.size_precision();
1715
1716 let interval =
1717 bar_type_to_interval(&bar_type).map_err(|e| Error::bad_request(e.to_string()))?;
1718
1719 let now = chrono::Utc::now();
1721 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1722 let start_time = if let Some(start) = start {
1723 start.timestamp_millis() as u64
1724 } else {
1725 let spec = bar_type.spec();
1727 let step_ms = match spec.aggregation {
1728 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1729 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1730 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1731 BarAggregation::Week => spec.step.get() as u64 * 604_800_000,
1732 BarAggregation::Month => spec.step.get() as u64 * 2_592_000_000,
1733 _ => 60_000,
1734 };
1735 end_time.saturating_sub(1000 * step_ms)
1736 };
1737
1738 let candles = self
1739 .info_candle_snapshot(coin.as_str(), interval, start_time, end_time)
1740 .await?;
1741
1742 let now_ms = now.timestamp_millis() as u64;
1744
1745 let mut bars: Vec<Bar> = candles
1746 .iter()
1747 .filter(|candle| candle.end_timestamp < now_ms)
1748 .enumerate()
1749 .filter_map(|(i, candle)| {
1750 crate::data::candle_to_bar(candle, bar_type, price_precision, size_precision)
1751 .map_err(|e| {
1752 log::error!("Failed to convert candle {i} to bar: {candle:?} error: {e}");
1753 e
1754 })
1755 .ok()
1756 })
1757 .collect();
1758
1759 if let Some(limit) = limit
1761 && limit > 0
1762 && bars.len() > limit as usize
1763 {
1764 bars.truncate(limit as usize);
1765 }
1766
1767 log::debug!(
1768 "Received {} bars for {} (filtered {} incomplete)",
1769 bars.len(),
1770 bar_type,
1771 candles.len() - bars.len()
1772 );
1773 Ok(bars)
1774 }
1775 #[allow(clippy::too_many_arguments)]
1782 pub async fn submit_order(
1783 &self,
1784 instrument_id: InstrumentId,
1785 client_order_id: ClientOrderId,
1786 order_side: OrderSide,
1787 order_type: OrderType,
1788 quantity: Quantity,
1789 time_in_force: TimeInForce,
1790 price: Option<Price>,
1791 trigger_price: Option<Price>,
1792 post_only: bool,
1793 reduce_only: bool,
1794 ) -> Result<OrderStatusReport> {
1795 let symbol = instrument_id.symbol.as_str();
1796 let asset = self.get_asset_index(symbol).ok_or_else(|| {
1797 Error::bad_request(format!(
1798 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
1799 ))
1800 })?;
1801
1802 let is_buy = matches!(order_side, OrderSide::Buy);
1803
1804 let price_decimal = match price {
1805 Some(px) => px.as_decimal().normalize(),
1806 None => {
1807 if matches!(
1808 order_type,
1809 OrderType::Market | OrderType::StopMarket | OrderType::MarketIfTouched
1810 ) {
1811 Decimal::ZERO
1812 } else {
1813 return Err(Error::bad_request("Limit orders require a price"));
1814 }
1815 }
1816 };
1817
1818 let size_decimal = quantity.as_decimal().normalize();
1819
1820 let kind = match order_type {
1821 OrderType::Market => HyperliquidExecOrderKind::Limit {
1822 limit: HyperliquidExecLimitParams {
1823 tif: HyperliquidExecTif::Ioc,
1824 },
1825 },
1826 OrderType::Limit => {
1827 let tif = if post_only {
1828 HyperliquidExecTif::Alo
1829 } else {
1830 match time_in_force {
1831 TimeInForce::Gtc => HyperliquidExecTif::Gtc,
1832 TimeInForce::Ioc => HyperliquidExecTif::Ioc,
1833 TimeInForce::Fok
1834 | TimeInForce::Day
1835 | TimeInForce::Gtd
1836 | TimeInForce::AtTheOpen
1837 | TimeInForce::AtTheClose => {
1838 return Err(Error::bad_request(format!(
1839 "Time in force {time_in_force:?} not supported"
1840 )));
1841 }
1842 }
1843 };
1844 HyperliquidExecOrderKind::Limit {
1845 limit: HyperliquidExecLimitParams { tif },
1846 }
1847 }
1848 OrderType::StopMarket
1849 | OrderType::StopLimit
1850 | OrderType::MarketIfTouched
1851 | OrderType::LimitIfTouched => {
1852 if let Some(trig_px) = trigger_price {
1853 let trigger_price_decimal = trig_px.as_decimal().normalize();
1854
1855 let tpsl = match order_type {
1859 OrderType::StopMarket | OrderType::StopLimit => HyperliquidExecTpSl::Sl,
1860 OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
1861 HyperliquidExecTpSl::Tp
1862 }
1863 _ => unreachable!(),
1864 };
1865
1866 let is_market = matches!(
1867 order_type,
1868 OrderType::StopMarket | OrderType::MarketIfTouched
1869 );
1870
1871 HyperliquidExecOrderKind::Trigger {
1872 trigger: HyperliquidExecTriggerParams {
1873 is_market,
1874 trigger_px: trigger_price_decimal,
1875 tpsl,
1876 },
1877 }
1878 } else {
1879 return Err(Error::bad_request("Trigger orders require a trigger price"));
1880 }
1881 }
1882 _ => {
1883 return Err(Error::bad_request(format!(
1884 "Order type {order_type:?} not supported"
1885 )));
1886 }
1887 };
1888
1889 let hyperliquid_order = HyperliquidExecPlaceOrderRequest {
1890 asset,
1891 is_buy,
1892 price: price_decimal,
1893 size: size_decimal,
1894 reduce_only,
1895 kind,
1896 cloid: Some(Cloid::from_client_order_id(client_order_id)),
1897 };
1898
1899 let action = HyperliquidExecAction::Order {
1900 orders: vec![hyperliquid_order],
1901 grouping: HyperliquidExecGrouping::Na,
1902 builder: resolve_builder_fee(symbol, post_only),
1903 };
1904
1905 let response = self.inner.post_action_exec(&action).await?;
1906
1907 match response {
1908 HyperliquidExchangeResponse::Status {
1909 status,
1910 response: response_data,
1911 } if status == RESPONSE_STATUS_OK => {
1912 let data_value = if let Some(data) = response_data.get("data") {
1913 data.clone()
1914 } else {
1915 response_data
1916 };
1917
1918 let order_response: HyperliquidExecOrderResponseData =
1919 serde_json::from_value(data_value).map_err(|e| {
1920 Error::bad_request(format!("Failed to parse order response: {e}"))
1921 })?;
1922
1923 let order_status = order_response
1924 .statuses
1925 .first()
1926 .ok_or_else(|| Error::bad_request("No order status in response"))?;
1927
1928 let symbol_str = instrument_id.symbol.as_str();
1929 let product_type = HyperliquidProductType::from_symbol(symbol_str).ok();
1930
1931 let asset_str = symbol_str.split('-').next().unwrap_or(symbol_str);
1933 let instrument = self
1934 .get_or_create_instrument(&Ustr::from(asset_str), product_type)
1935 .ok_or_else(|| {
1936 Error::bad_request(format!("Instrument not found for {asset_str}"))
1937 })?;
1938
1939 let account_id = self
1940 .account_id
1941 .ok_or_else(|| Error::bad_request("Account ID not set"))?;
1942 let ts_init = get_atomic_clock_realtime().get_time_ns();
1943
1944 match order_status {
1945 HyperliquidExecOrderStatus::Resting { resting } => self
1946 .create_order_status_report(
1947 instrument_id,
1948 Some(client_order_id),
1949 VenueOrderId::new(resting.oid.to_string()),
1950 order_side,
1951 order_type,
1952 quantity,
1953 time_in_force,
1954 price,
1955 trigger_price,
1956 OrderStatus::Accepted,
1957 Quantity::new(0.0, instrument.size_precision()),
1958 &instrument,
1959 account_id,
1960 ts_init,
1961 ),
1962 HyperliquidExecOrderStatus::Filled { filled } => {
1963 let filled_qty = Quantity::new(
1964 filled.total_sz.to_string().parse::<f64>().unwrap_or(0.0),
1965 instrument.size_precision(),
1966 );
1967 self.create_order_status_report(
1968 instrument_id,
1969 Some(client_order_id),
1970 VenueOrderId::new(filled.oid.to_string()),
1971 order_side,
1972 order_type,
1973 quantity,
1974 time_in_force,
1975 price,
1976 trigger_price,
1977 OrderStatus::Filled,
1978 filled_qty,
1979 &instrument,
1980 account_id,
1981 ts_init,
1982 )
1983 }
1984 HyperliquidExecOrderStatus::Error { error } => {
1985 Err(Error::bad_request(format!("Order rejected: {error}")))
1986 }
1987 }
1988 }
1989 HyperliquidExchangeResponse::Error { error } => Err(Error::bad_request(format!(
1990 "Order submission failed: {error}"
1991 ))),
1992 _ => Err(Error::bad_request("Unexpected response format")),
1993 }
1994 }
1995
1996 pub async fn submit_order_from_order_any(&self, order: &OrderAny) -> Result<OrderStatusReport> {
2000 self.submit_order(
2001 order.instrument_id(),
2002 order.client_order_id(),
2003 order.order_side(),
2004 order.order_type(),
2005 order.quantity(),
2006 order.time_in_force(),
2007 order.price(),
2008 order.trigger_price(),
2009 order.is_post_only(),
2010 order.is_reduce_only(),
2011 )
2012 .await
2013 }
2014
2015 #[allow(clippy::too_many_arguments)]
2016 fn create_order_status_report(
2017 &self,
2018 instrument_id: InstrumentId,
2019 client_order_id: Option<ClientOrderId>,
2020 venue_order_id: VenueOrderId,
2021 order_side: OrderSide,
2022 order_type: OrderType,
2023 quantity: Quantity,
2024 time_in_force: TimeInForce,
2025 price: Option<Price>,
2026 trigger_price: Option<Price>,
2027 order_status: OrderStatus,
2028 filled_qty: Quantity,
2029 _instrument: &InstrumentAny,
2030 account_id: AccountId,
2031 ts_init: UnixNanos,
2032 ) -> Result<OrderStatusReport> {
2033 let clock = get_atomic_clock_realtime();
2034 let ts_accepted = clock.get_time_ns();
2035 let ts_last = ts_accepted;
2036 let report_id = UUID4::new();
2037
2038 let mut report = OrderStatusReport::new(
2039 account_id,
2040 instrument_id,
2041 client_order_id,
2042 venue_order_id,
2043 order_side,
2044 order_type,
2045 time_in_force,
2046 order_status,
2047 quantity,
2048 filled_qty,
2049 ts_accepted,
2050 ts_last,
2051 ts_init,
2052 Some(report_id),
2053 );
2054
2055 if let Some(px) = price {
2056 report = report.with_price(px);
2057 }
2058
2059 if let Some(trig_px) = trigger_price {
2060 report = report
2061 .with_trigger_price(trig_px)
2062 .with_trigger_type(TriggerType::Default);
2063 }
2064
2065 Ok(report)
2066 }
2067
2068 pub async fn submit_orders(&self, orders: &[&OrderAny]) -> Result<Vec<OrderStatusReport>> {
2075 let mut hyperliquid_orders = Vec::with_capacity(orders.len());
2077
2078 for order in orders {
2079 let instrument_id = order.instrument_id();
2080 let symbol = instrument_id.symbol.as_str();
2081 let asset = self.get_asset_index(symbol).ok_or_else(|| {
2082 Error::bad_request(format!(
2083 "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
2084 ))
2085 })?;
2086 let request = order_to_hyperliquid_request_with_asset(order, asset)
2087 .map_err(|e| Error::bad_request(format!("Failed to convert order: {e}")))?;
2088 hyperliquid_orders.push(request);
2089 }
2090
2091 let order_props: Vec<(String, bool)> = orders
2092 .iter()
2093 .map(|o| (o.instrument_id().symbol.to_string(), o.is_post_only()))
2094 .collect();
2095 let batch_refs: Vec<(&str, bool)> =
2096 order_props.iter().map(|(s, p)| (s.as_str(), *p)).collect();
2097 let builder = resolve_builder_fee_batch(&batch_refs);
2098
2099 let action = HyperliquidExecAction::Order {
2100 orders: hyperliquid_orders,
2101 grouping: HyperliquidExecGrouping::Na,
2102 builder,
2103 };
2104
2105 let response = self.inner.post_action_exec(&action).await?;
2107
2108 match response {
2110 HyperliquidExchangeResponse::Status {
2111 status,
2112 response: response_data,
2113 } if status == RESPONSE_STATUS_OK => {
2114 let data_value = if let Some(data) = response_data.get("data") {
2117 data.clone()
2118 } else {
2119 response_data
2120 };
2121
2122 let order_response: HyperliquidExecOrderResponseData =
2124 serde_json::from_value(data_value).map_err(|e| {
2125 Error::bad_request(format!("Failed to parse order response: {e}"))
2126 })?;
2127
2128 let account_id = self
2129 .account_id
2130 .ok_or_else(|| Error::bad_request("Account ID not set"))?;
2131 let ts_init = get_atomic_clock_realtime().get_time_ns();
2132
2133 if order_response.statuses.len() != orders.len() {
2135 return Err(Error::bad_request(format!(
2136 "Mismatch between submitted orders ({}) and response statuses ({})",
2137 orders.len(),
2138 order_response.statuses.len()
2139 )));
2140 }
2141
2142 let mut reports = Vec::new();
2143
2144 for (order, order_status) in orders.iter().zip(order_response.statuses.iter()) {
2146 let instrument_id = order.instrument_id();
2148 let symbol = instrument_id.symbol.as_str();
2149 let product_type = HyperliquidProductType::from_symbol(symbol).ok();
2150
2151 let asset = symbol.split('-').next().unwrap_or(symbol);
2153 let instrument = self
2154 .get_or_create_instrument(&Ustr::from(asset), product_type)
2155 .ok_or_else(|| {
2156 Error::bad_request(format!("Instrument not found for {asset}"))
2157 })?;
2158
2159 let report = match order_status {
2161 HyperliquidExecOrderStatus::Resting { resting } => {
2162 self.create_order_status_report(
2164 order.instrument_id(),
2165 Some(order.client_order_id()),
2166 VenueOrderId::new(resting.oid.to_string()),
2167 order.order_side(),
2168 order.order_type(),
2169 order.quantity(),
2170 order.time_in_force(),
2171 order.price(),
2172 order.trigger_price(),
2173 OrderStatus::Accepted,
2174 Quantity::new(0.0, instrument.size_precision()),
2175 &instrument,
2176 account_id,
2177 ts_init,
2178 )?
2179 }
2180 HyperliquidExecOrderStatus::Filled { filled } => {
2181 let filled_qty = Quantity::new(
2183 filled.total_sz.to_string().parse::<f64>().unwrap_or(0.0),
2184 instrument.size_precision(),
2185 );
2186 self.create_order_status_report(
2187 order.instrument_id(),
2188 Some(order.client_order_id()),
2189 VenueOrderId::new(filled.oid.to_string()),
2190 order.order_side(),
2191 order.order_type(),
2192 order.quantity(),
2193 order.time_in_force(),
2194 order.price(),
2195 order.trigger_price(),
2196 OrderStatus::Filled,
2197 filled_qty,
2198 &instrument,
2199 account_id,
2200 ts_init,
2201 )?
2202 }
2203 HyperliquidExecOrderStatus::Error { error } => {
2204 return Err(Error::bad_request(format!(
2205 "Order {} rejected: {error}",
2206 order.client_order_id()
2207 )));
2208 }
2209 };
2210
2211 reports.push(report);
2212 }
2213
2214 Ok(reports)
2215 }
2216 HyperliquidExchangeResponse::Error { error } => Err(Error::bad_request(format!(
2217 "Order submission failed: {error}"
2218 ))),
2219 _ => Err(Error::bad_request("Unexpected response format")),
2220 }
2221 }
2222}
2223
2224#[cfg(test)]
2225mod tests {
2226 use nautilus_core::{MUTEX_POISONED, time::get_atomic_clock_realtime};
2227 use nautilus_model::{
2228 currencies::CURRENCY_MAP,
2229 enums::CurrencyType,
2230 identifiers::{InstrumentId, Symbol},
2231 instruments::{CurrencyPair, Instrument, InstrumentAny},
2232 types::{Currency, Price, Quantity},
2233 };
2234 use rstest::rstest;
2235 use ustr::Ustr;
2236
2237 use super::HyperliquidHttpClient;
2238 use crate::{common::enums::HyperliquidProductType, http::query::InfoRequest};
2239
2240 #[rstest]
2241 fn stable_json_roundtrips() {
2242 let v = serde_json::json!({"type":"l2Book","coin":"BTC"});
2243 let s = serde_json::to_string(&v).unwrap();
2244 let parsed: serde_json::Value = serde_json::from_str(&s).unwrap();
2246 assert_eq!(parsed["type"], "l2Book");
2247 assert_eq!(parsed["coin"], "BTC");
2248 assert_eq!(parsed, v);
2249 }
2250
2251 #[rstest]
2252 fn info_pretty_shape() {
2253 let r = InfoRequest::l2_book("BTC");
2254 let val = serde_json::to_value(&r).unwrap();
2255 let pretty = serde_json::to_string_pretty(&val).unwrap();
2256 assert!(pretty.contains("\"type\": \"l2Book\""));
2257 assert!(pretty.contains("\"coin\": \"BTC\""));
2258 }
2259
2260 #[rstest]
2261 fn test_cache_instrument_by_raw_symbol() {
2262 let client = HyperliquidHttpClient::new(true, None, None).unwrap();
2263
2264 let base_code = "vntls:vCURSOR";
2266 let quote_code = "USDC";
2267
2268 {
2270 let mut currency_map = CURRENCY_MAP.lock().expect(MUTEX_POISONED);
2271 if !currency_map.contains_key(base_code) {
2272 currency_map.insert(
2273 base_code.to_string(),
2274 Currency::new(base_code, 8, 0, base_code, CurrencyType::Crypto),
2275 );
2276 }
2277 }
2278
2279 let base_currency = Currency::new(base_code, 8, 0, base_code, CurrencyType::Crypto);
2280 let quote_currency = Currency::new(quote_code, 6, 0, quote_code, CurrencyType::Crypto);
2281
2282 let symbol = Symbol::new("vntls:vCURSOR-USDC-SPOT");
2284 let venue = *crate::common::consts::HYPERLIQUID_VENUE;
2285 let instrument_id = InstrumentId::new(symbol, venue);
2286
2287 let raw_symbol = Symbol::new(base_code);
2289
2290 let clock = get_atomic_clock_realtime();
2291 let ts = clock.get_time_ns();
2292
2293 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
2294 instrument_id,
2295 raw_symbol,
2296 base_currency,
2297 quote_currency,
2298 8,
2299 8,
2300 Price::from("0.00000001"),
2301 Quantity::from("0.00000001"),
2302 None,
2303 None,
2304 None,
2305 None,
2306 None,
2307 None,
2308 None,
2309 None,
2310 None,
2311 None,
2312 None,
2313 None, None, ts,
2316 ts,
2317 ));
2318
2319 client.cache_instrument(instrument.clone());
2321
2322 let instruments = client.instruments.read().unwrap();
2324 let by_full_symbol = instruments.get(&Ustr::from("vntls:vCURSOR-USDC-SPOT"));
2325 assert!(
2326 by_full_symbol.is_some(),
2327 "Instrument should be accessible by full symbol"
2328 );
2329 assert_eq!(by_full_symbol.unwrap().id(), instrument.id());
2330
2331 let by_raw_symbol = instruments.get(&Ustr::from("vntls:vCURSOR"));
2333 assert!(
2334 by_raw_symbol.is_some(),
2335 "Instrument should be accessible by raw_symbol (Hyperliquid coin identifier)"
2336 );
2337 assert_eq!(by_raw_symbol.unwrap().id(), instrument.id());
2338 drop(instruments);
2339
2340 let instruments_by_coin = client.instruments_by_coin.read().unwrap();
2342 let by_coin =
2343 instruments_by_coin.get(&(Ustr::from("vntls:vCURSOR"), HyperliquidProductType::Spot));
2344 assert!(
2345 by_coin.is_some(),
2346 "Instrument should be accessible by coin and product type"
2347 );
2348 assert_eq!(by_coin.unwrap().id(), instrument.id());
2349 drop(instruments_by_coin);
2350
2351 let retrieved_with_type = client.get_or_create_instrument(
2353 &Ustr::from("vntls:vCURSOR"),
2354 Some(HyperliquidProductType::Spot),
2355 );
2356 assert!(retrieved_with_type.is_some());
2357 assert_eq!(retrieved_with_type.unwrap().id(), instrument.id());
2358
2359 let retrieved_without_type =
2361 client.get_or_create_instrument(&Ustr::from("vntls:vCURSOR"), None);
2362 assert!(retrieved_without_type.is_some());
2363 assert_eq!(retrieved_without_type.unwrap().id(), instrument.id());
2364 }
2365}