1use std::{
19 str::FromStr,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use nautilus_common::{
27 cache::fifo::FifoCache,
28 clients::ExecutionClient,
29 live::{runner::get_exec_event_sender, runtime::get_runtime},
30 messages::execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 MUTEX_POISONED, UUID4, UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::{AccountType, OmsType, OrderStatus, OrderType},
44 identifiers::{AccountId, ClientId, ClientOrderId, Venue},
45 orders::{Order, any::OrderAny},
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50use ustr::Ustr;
51
52use crate::{
53 common::{
54 builder_fee::{resolve_builder_fee, resolve_builder_fee_batch},
55 consts::HYPERLIQUID_VENUE,
56 credential::Secrets,
57 parse::{
58 client_order_id_to_cancel_request_with_asset, extract_error_message,
59 order_to_hyperliquid_request_with_asset, parse_account_balances_and_margins,
60 },
61 },
62 config::HyperliquidExecClientConfig,
63 http::{
64 client::HyperliquidHttpClient,
65 models::{
66 ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecGrouping,
67 HyperliquidExecModifyOrderRequest,
68 },
69 },
70 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
71};
72
73#[derive(Debug)]
74pub struct HyperliquidExecutionClient {
75 core: ExecutionClientCore,
76 clock: &'static AtomicTime,
77 config: HyperliquidExecClientConfig,
78 emitter: ExecutionEventEmitter,
79 http_client: HyperliquidHttpClient,
80 ws_client: HyperliquidWebSocketClient,
81 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
82 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
83}
84
85impl HyperliquidExecutionClient {
86 pub fn config(&self) -> &HyperliquidExecClientConfig {
88 &self.config
89 }
90
91 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
92 let instrument_id = order.instrument_id();
95 let symbol = instrument_id.symbol.as_str();
96 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
97 anyhow::bail!(
98 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
99 );
100 }
101
102 match order.order_type() {
104 OrderType::Market
105 | OrderType::Limit
106 | OrderType::StopMarket
107 | OrderType::StopLimit
108 | OrderType::MarketIfTouched
109 | OrderType::LimitIfTouched => {}
110 _ => anyhow::bail!(
111 "Unsupported order type for Hyperliquid: {:?}",
112 order.order_type()
113 ),
114 }
115
116 if matches!(
118 order.order_type(),
119 OrderType::StopMarket
120 | OrderType::StopLimit
121 | OrderType::MarketIfTouched
122 | OrderType::LimitIfTouched
123 ) && order.trigger_price().is_none()
124 {
125 anyhow::bail!(
126 "Conditional orders require a trigger price for Hyperliquid: {:?}",
127 order.order_type()
128 );
129 }
130
131 if matches!(
133 order.order_type(),
134 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
135 ) && order.price().is_none()
136 {
137 anyhow::bail!(
138 "Limit orders require a limit price for Hyperliquid: {:?}",
139 order.order_type()
140 );
141 }
142
143 Ok(())
144 }
145
146 pub fn new(
152 core: ExecutionClientCore,
153 config: HyperliquidExecClientConfig,
154 ) -> anyhow::Result<Self> {
155 if !config.has_credentials() {
156 anyhow::bail!("Hyperliquid execution client requires private key");
157 }
158
159 let secrets = Secrets::from_private_key(
160 &config.private_key,
161 config.vault_address.as_deref(),
162 config.is_testnet,
163 )
164 .context("failed to create secrets from private key")?;
165
166 let mut http_client = HyperliquidHttpClient::with_secrets(
167 &secrets,
168 Some(config.http_timeout_secs),
169 config.http_proxy_url.clone(),
170 )
171 .context("failed to create Hyperliquid HTTP client")?;
172
173 http_client.set_account_id(core.account_id);
174
175 if let Some(url) = &config.base_url_http {
177 http_client.set_base_info_url(url.clone());
178 }
179 if let Some(url) = &config.base_url_exchange {
180 http_client.set_base_exchange_url(url.clone());
181 }
182
183 let ws_url = config.base_url_ws.clone();
184 let ws_client =
185 HyperliquidWebSocketClient::new(ws_url, config.is_testnet, Some(core.account_id));
186
187 let clock = get_atomic_clock_realtime();
188 let emitter = ExecutionEventEmitter::new(
189 clock,
190 core.trader_id,
191 core.account_id,
192 AccountType::Margin,
193 None,
194 );
195
196 Ok(Self {
197 core,
198 clock,
199 config,
200 emitter,
201 http_client,
202 ws_client,
203 pending_tasks: Mutex::new(Vec::new()),
204 ws_stream_handle: Mutex::new(None),
205 })
206 }
207
208 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
209 if self.core.instruments_initialized() {
210 return Ok(());
211 }
212
213 let instruments = self
214 .http_client
215 .request_instruments()
216 .await
217 .context("failed to request Hyperliquid instruments")?;
218
219 if instruments.is_empty() {
220 log::warn!(
221 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
222 );
223 } else {
224 log::info!("Initialized {} instruments", instruments.len());
225
226 for instrument in &instruments {
227 self.http_client.cache_instrument(instrument.clone());
228 }
229 }
230
231 self.core.set_instruments_initialized();
232 Ok(())
233 }
234
235 async fn refresh_account_state(&self) -> anyhow::Result<()> {
236 let account_address = self.get_account_address()?;
237
238 let clearinghouse_state = self
239 .http_client
240 .info_clearinghouse_state(&account_address)
241 .await
242 .context("failed to fetch clearinghouse state")?;
243
244 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
246 .context("failed to deserialize clearinghouse state")?;
247
248 log::debug!(
249 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
250 state.cross_margin_summary,
251 state.asset_positions.len()
252 );
253
254 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
256 let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
257 .context("failed to parse account balances and margins")?;
258
259 let ts_event = self.clock.get_time_ns();
261 self.emitter
262 .emit_account_state(balances, margins, true, ts_event);
263
264 log::info!("Account state updated successfully");
265 } else {
266 log::warn!("No cross margin summary in clearinghouse state");
267 }
268
269 Ok(())
270 }
271
272 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
273 let account_id = self.core.account_id;
274
275 if self.core.cache().account(&account_id).is_some() {
276 log::info!("Account {account_id} registered");
277 return Ok(());
278 }
279
280 let start = Instant::now();
281 let timeout = Duration::from_secs_f64(timeout_secs);
282 let interval = Duration::from_millis(10);
283
284 loop {
285 tokio::time::sleep(interval).await;
286
287 if self.core.cache().account(&account_id).is_some() {
288 log::info!("Account {account_id} registered");
289 return Ok(());
290 }
291
292 if start.elapsed() >= timeout {
293 anyhow::bail!(
294 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
295 );
296 }
297 }
298 }
299
300 fn get_user_address(&self) -> anyhow::Result<String> {
301 self.http_client
302 .get_user_address()
303 .context("failed to get user address from HTTP client")
304 }
305
306 fn get_account_address(&self) -> anyhow::Result<String> {
307 match &self.config.vault_address {
308 Some(vault) => Ok(vault.clone()),
309 None => self.get_user_address(),
310 }
311 }
312
313 fn spawn_task<F>(&self, description: &'static str, fut: F)
314 where
315 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
316 {
317 let runtime = get_runtime();
318 let handle = runtime.spawn(async move {
319 if let Err(e) = fut.await {
320 log::warn!("{description} failed: {e:?}");
321 }
322 });
323
324 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
325 tasks.retain(|handle| !handle.is_finished());
326 tasks.push(handle);
327 }
328
329 fn abort_pending_tasks(&self) {
330 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
331 for handle in tasks.drain(..) {
332 handle.abort();
333 }
334 }
335}
336
337#[async_trait(?Send)]
338impl ExecutionClient for HyperliquidExecutionClient {
339 fn is_connected(&self) -> bool {
340 self.core.is_connected()
341 }
342
343 fn client_id(&self) -> ClientId {
344 self.core.client_id
345 }
346
347 fn account_id(&self) -> AccountId {
348 self.core.account_id
349 }
350
351 fn venue(&self) -> Venue {
352 *HYPERLIQUID_VENUE
353 }
354
355 fn oms_type(&self) -> OmsType {
356 self.core.oms_type
357 }
358
359 fn get_account(&self) -> Option<AccountAny> {
360 self.core.cache().account(&self.core.account_id).cloned()
361 }
362
363 fn generate_account_state(
364 &self,
365 balances: Vec<AccountBalance>,
366 margins: Vec<MarginBalance>,
367 reported: bool,
368 ts_event: UnixNanos,
369 ) -> anyhow::Result<()> {
370 self.emitter
371 .emit_account_state(balances, margins, reported, ts_event);
372 Ok(())
373 }
374
375 fn start(&mut self) -> anyhow::Result<()> {
376 if self.core.is_started() {
377 return Ok(());
378 }
379
380 let sender = get_exec_event_sender();
381 self.emitter.set_sender(sender);
382 self.core.set_started();
383
384 log::info!(
385 "Started: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
386 self.core.client_id,
387 self.core.account_id,
388 self.config.is_testnet,
389 self.config.vault_address,
390 self.config.http_proxy_url,
391 self.config.ws_proxy_url,
392 );
393
394 Ok(())
395 }
396
397 fn stop(&mut self) -> anyhow::Result<()> {
398 if self.core.is_stopped() {
399 return Ok(());
400 }
401
402 log::info!("Stopping Hyperliquid execution client");
403
404 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
406 handle.abort();
407 }
408
409 self.abort_pending_tasks();
411
412 if self.core.is_connected() {
414 let runtime = get_runtime();
415 runtime.block_on(async {
416 if let Err(e) = self.ws_client.disconnect().await {
417 log::warn!("Error disconnecting WebSocket client: {e}");
418 }
419 });
420 }
421
422 self.core.set_disconnected();
423 self.core.set_stopped();
424
425 log::info!("Hyperliquid execution client stopped");
426 Ok(())
427 }
428
429 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
430 let order = self
431 .core
432 .cache()
433 .order(&cmd.client_order_id)
434 .cloned()
435 .ok_or_else(|| {
436 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
437 })?;
438
439 if order.is_closed() {
440 log::warn!("Cannot submit closed order {}", order.client_order_id());
441 return Ok(());
442 }
443
444 if let Err(e) = self.validate_order_submission(&order) {
445 self.emitter
446 .emit_order_denied(&order, &format!("Validation failed: {e}"));
447 return Err(e);
448 }
449
450 let http_client = self.http_client.clone();
451 let symbol = order.instrument_id().symbol.to_string();
452
453 let asset = match http_client.get_asset_index(&symbol) {
455 Some(a) => a,
456 None => {
457 self.emitter
458 .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
459 return Ok(());
460 }
461 };
462
463 let hyperliquid_order = match order_to_hyperliquid_request_with_asset(&order, asset) {
465 Ok(req) => req,
466 Err(e) => {
467 self.emitter
468 .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
469 return Ok(());
470 }
471 };
472
473 let cloid = Cloid::from_client_order_id(order.client_order_id());
476 self.ws_client
477 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
478
479 self.emitter.emit_order_submitted(&order);
480
481 let builder = resolve_builder_fee(&symbol, order.is_post_only());
482
483 let emitter = self.emitter.clone();
484 let clock = self.clock;
485 let ws_client = self.ws_client.clone();
486 let cloid_hex = Ustr::from(&cloid.to_hex());
487
488 self.spawn_task("submit_order", async move {
489 let action = HyperliquidExecAction::Order {
490 orders: vec![hyperliquid_order],
491 grouping: HyperliquidExecGrouping::Na,
492 builder,
493 };
494
495 match http_client.post_action_exec(&action).await {
496 Ok(response) => {
497 if response.is_ok() {
498 log::info!("Order submitted successfully: {response:?}");
499 } else {
500 let error_msg = extract_error_message(&response);
501 log::warn!("Order submission rejected by exchange: {error_msg}");
502 let ts = clock.get_time_ns();
503 emitter.emit_order_rejected(&order, &error_msg, ts, false);
504 ws_client.remove_cloid_mapping(&cloid_hex);
505 }
506 }
507 Err(e) => {
508 log::error!("Order submission HTTP request failed: {e}");
512 }
513 }
514
515 Ok(())
516 });
517
518 Ok(())
519 }
520
521 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
522 log::debug!(
523 "Submitting order list with {} orders",
524 cmd.order_list.client_order_ids.len()
525 );
526
527 let http_client = self.http_client.clone();
528
529 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
530
531 let mut valid_orders = Vec::new();
533 let mut hyperliquid_orders = Vec::new();
534
535 for order in &orders {
536 let symbol = order.instrument_id().symbol.to_string();
537 let asset = match http_client.get_asset_index(&symbol) {
538 Some(a) => a,
539 None => {
540 self.emitter
541 .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
542 continue;
543 }
544 };
545
546 match order_to_hyperliquid_request_with_asset(order, asset) {
547 Ok(req) => {
548 hyperliquid_orders.push(req);
549 valid_orders.push(order.clone());
550 }
551 Err(e) => {
552 self.emitter
553 .emit_order_denied(order, &format!("Order conversion failed: {e}"));
554 }
555 }
556 }
557
558 if valid_orders.is_empty() {
559 log::warn!("No valid orders to submit in order list");
560 return Ok(());
561 }
562
563 for order in &valid_orders {
564 let cloid = Cloid::from_client_order_id(order.client_order_id());
565 self.ws_client
566 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
567 self.emitter.emit_order_submitted(order);
568 }
569
570 let order_props: Vec<(String, bool)> = valid_orders
571 .iter()
572 .map(|o| (o.instrument_id().symbol.to_string(), o.is_post_only()))
573 .collect();
574 let batch_refs: Vec<(&str, bool)> =
575 order_props.iter().map(|(s, p)| (s.as_str(), *p)).collect();
576 let builder = resolve_builder_fee_batch(&batch_refs);
577
578 let emitter = self.emitter.clone();
579 let clock = self.clock;
580 let ws_client = self.ws_client.clone();
581 let cloid_hexes: Vec<Ustr> = valid_orders
582 .iter()
583 .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
584 .collect();
585
586 self.spawn_task("submit_order_list", async move {
587 let action = HyperliquidExecAction::Order {
588 orders: hyperliquid_orders,
589 grouping: HyperliquidExecGrouping::Na,
590 builder,
591 };
592 match http_client.post_action_exec(&action).await {
593 Ok(response) => {
594 if response.is_ok() {
595 log::info!("Order list submitted successfully: {response:?}");
596 } else {
597 let error_msg = extract_error_message(&response);
599 log::warn!("Order list submission rejected by exchange: {error_msg}");
600 let ts = clock.get_time_ns();
601 for order in &valid_orders {
602 emitter.emit_order_rejected(order, &error_msg, ts, false);
603 }
604 for cloid_hex in &cloid_hexes {
605 ws_client.remove_cloid_mapping(cloid_hex);
606 }
607 }
608 }
609 Err(e) => {
610 log::error!("Order list submission HTTP request failed: {e}");
614 }
615 }
616
617 Ok(())
618 });
619
620 Ok(())
621 }
622
623 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
624 log::debug!("Modifying order: {cmd:?}");
625
626 let venue_order_id = match cmd.venue_order_id {
628 Some(id) => id,
629 None => {
630 log::warn!("Cannot modify order: venue_order_id is None");
631 return Ok(());
632 }
633 };
634
635 let oid: u64 = match venue_order_id.as_str().parse() {
636 Ok(id) => id,
637 Err(e) => {
638 log::warn!("Failed to parse venue_order_id '{venue_order_id}' as u64: {e}");
639 return Ok(());
640 }
641 };
642
643 let http_client = self.http_client.clone();
644 let price = cmd.price;
645 let quantity = cmd.quantity;
646 let symbol = cmd.instrument_id.symbol.to_string();
647
648 self.spawn_task("modify_order", async move {
649 let asset = match http_client.get_asset_index(&symbol) {
650 Some(a) => a,
651 None => {
652 log::warn!(
653 "Asset index not found for symbol {symbol}, ensure instruments are loaded"
654 );
655 return Ok(());
656 }
657 };
658
659 let modify_request = HyperliquidExecModifyOrderRequest {
661 asset,
662 oid,
663 price: price.map(|p| (*p).into()),
664 size: quantity.map(|q| (*q).into()),
665 reduce_only: None,
666 kind: None,
667 };
668
669 let action = HyperliquidExecAction::Modify {
670 modify: modify_request,
671 };
672
673 match http_client.post_action_exec(&action).await {
674 Ok(response) => {
675 if response.is_ok() {
676 log::info!("Order modified successfully: {response:?}");
677 } else {
679 let error_msg = extract_error_message(&response);
680 log::warn!("Order modification rejected by exchange: {error_msg}");
681 }
683 }
684 Err(e) => {
685 log::warn!("Order modification HTTP request failed: {e}");
686 }
688 }
689
690 Ok(())
691 });
692
693 Ok(())
694 }
695
696 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
697 log::debug!("Cancelling order: {cmd:?}");
698
699 let http_client = self.http_client.clone();
700 let client_order_id = cmd.client_order_id.to_string();
701 let symbol = cmd.instrument_id.symbol.to_string();
702
703 self.spawn_task("cancel_order", async move {
704 let asset = match http_client.get_asset_index(&symbol) {
705 Some(a) => a,
706 None => {
707 log::warn!(
708 "Asset index not found for symbol {symbol}, ensure instruments are loaded"
709 );
710 return Ok(());
711 }
712 };
713
714 let cancel_request =
715 client_order_id_to_cancel_request_with_asset(&client_order_id, asset);
716 let action = HyperliquidExecAction::CancelByCloid {
717 cancels: vec![cancel_request],
718 };
719
720 match http_client.post_action_exec(&action).await {
721 Ok(response) => {
722 if response.is_ok() {
723 log::info!("Order cancelled successfully: {response:?}");
724 } else {
725 let error_msg = extract_error_message(&response);
726 log::warn!("Order cancellation rejected by exchange: {error_msg}");
727 }
728 }
729 Err(e) => {
730 log::warn!("Order cancellation HTTP request failed: {e}");
731 }
732 }
733
734 Ok(())
735 });
736
737 Ok(())
738 }
739
740 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
741 log::debug!("Cancelling all orders: {cmd:?}");
742
743 let cache = self.core.cache();
744 let open_orders = cache.orders_open(
745 Some(&self.core.venue),
746 Some(&cmd.instrument_id),
747 None,
748 None,
749 Some(cmd.order_side),
750 );
751
752 if open_orders.is_empty() {
753 log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
754 return Ok(());
755 }
756
757 let symbol = cmd.instrument_id.symbol.to_string();
758 let client_order_ids: Vec<String> = open_orders
759 .iter()
760 .map(|o| o.client_order_id().to_string())
761 .collect();
762
763 let http_client = self.http_client.clone();
764
765 self.spawn_task("cancel_all_orders", async move {
766 let asset = match http_client.get_asset_index(&symbol) {
767 Some(a) => a,
768 None => {
769 log::warn!(
770 "Asset index not found for symbol {symbol}, ensure instruments are loaded"
771 );
772 return Ok(());
773 }
774 };
775
776 let cancel_requests: Vec<_> = client_order_ids
777 .iter()
778 .map(|id| client_order_id_to_cancel_request_with_asset(id, asset))
779 .collect();
780
781 if cancel_requests.is_empty() {
782 log::debug!("No valid cancel requests to send");
783 return Ok(());
784 }
785
786 let action = HyperliquidExecAction::CancelByCloid {
787 cancels: cancel_requests,
788 };
789 if let Err(e) = http_client.post_action_exec(&action).await {
790 log::warn!("Failed to send cancel all orders request: {e}");
791 }
792
793 Ok(())
794 });
795
796 Ok(())
797 }
798
799 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
800 log::debug!("Batch cancelling orders: {cmd:?}");
801
802 if cmd.cancels.is_empty() {
803 log::debug!("No orders to cancel in batch");
804 return Ok(());
805 }
806
807 let cancel_info: Vec<(String, String)> = cmd
808 .cancels
809 .iter()
810 .map(|c| {
811 (
812 c.client_order_id.to_string(),
813 c.instrument_id.symbol.to_string(),
814 )
815 })
816 .collect();
817
818 let http_client = self.http_client.clone();
819
820 self.spawn_task("batch_cancel_orders", async move {
821 let mut cancel_requests = Vec::new();
822
823 for (client_order_id, symbol) in &cancel_info {
824 let asset = match http_client.get_asset_index(symbol) {
825 Some(a) => a,
826 None => {
827 log::warn!("Asset index not found for symbol {symbol}, skipping cancel");
828 continue;
829 }
830 };
831 cancel_requests.push(client_order_id_to_cancel_request_with_asset(
832 client_order_id,
833 asset,
834 ));
835 }
836
837 if cancel_requests.is_empty() {
838 log::warn!("No valid cancel requests in batch");
839 return Ok(());
840 }
841
842 let action = HyperliquidExecAction::CancelByCloid {
843 cancels: cancel_requests,
844 };
845 if let Err(e) = http_client.post_action_exec(&action).await {
846 log::warn!("Failed to send batch cancel orders request: {e}");
847 }
848
849 Ok(())
850 });
851
852 Ok(())
853 }
854
855 fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
856 log::debug!("Querying account: {cmd:?}");
857
858 let runtime = get_runtime();
859 runtime.block_on(async {
860 if let Err(e) = self.refresh_account_state().await {
861 log::warn!("Failed to query account state: {e}");
862 }
863 });
864
865 Ok(())
866 }
867
868 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
869 log::debug!("Querying order: {cmd:?}");
870
871 let cache = self.core.cache();
872 let venue_order_id = cache.venue_order_id(&cmd.client_order_id);
873
874 let venue_order_id = match venue_order_id {
875 Some(oid) => *oid,
876 None => {
877 log::warn!(
878 "No venue order ID found for client order {}",
879 cmd.client_order_id
880 );
881 return Ok(());
882 }
883 };
884 drop(cache);
885
886 let oid = match u64::from_str(venue_order_id.as_ref()) {
887 Ok(id) => id,
888 Err(e) => {
889 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
890 return Ok(());
891 }
892 };
893
894 let account_address = self.get_account_address()?;
895
896 let http_client = self.http_client.clone();
900 let runtime = get_runtime();
901 runtime.spawn(async move {
902 match http_client.info_order_status(&account_address, oid).await {
903 Ok(status) => {
904 log::debug!("Order status for oid {oid}: {status:?}");
905 }
906 Err(e) => {
907 log::warn!("Failed to query order status for oid {oid}: {e}");
908 }
909 }
910 });
911
912 Ok(())
913 }
914
915 async fn connect(&mut self) -> anyhow::Result<()> {
916 if self.core.is_connected() {
917 return Ok(());
918 }
919
920 log::info!("Connecting Hyperliquid execution client");
921
922 self.ensure_instruments_initialized_async().await?;
924
925 self.start_ws_stream().await?;
927
928 self.refresh_account_state().await?;
930 self.await_account_registered(30.0).await?;
931
932 self.core.set_connected();
933
934 log::info!("Connected: client_id={}", self.core.client_id);
935 Ok(())
936 }
937
938 async fn disconnect(&mut self) -> anyhow::Result<()> {
939 if self.core.is_disconnected() {
940 return Ok(());
941 }
942
943 log::info!("Disconnecting Hyperliquid execution client");
944
945 self.ws_client.disconnect().await?;
947
948 self.abort_pending_tasks();
950
951 self.core.set_disconnected();
952
953 log::info!("Disconnected: client_id={}", self.core.client_id);
954 Ok(())
955 }
956
957 async fn generate_order_status_report(
958 &self,
959 _cmd: &GenerateOrderStatusReport,
960 ) -> anyhow::Result<Option<OrderStatusReport>> {
961 log::warn!("generate_order_status_report not yet fully implemented");
965 Ok(None)
966 }
967
968 async fn generate_order_status_reports(
969 &self,
970 cmd: &GenerateOrderStatusReports,
971 ) -> anyhow::Result<Vec<OrderStatusReport>> {
972 let account_address = self.get_account_address()?;
973
974 let reports = self
975 .http_client
976 .request_order_status_reports(&account_address, cmd.instrument_id)
977 .await
978 .context("failed to generate order status reports")?;
979
980 let reports = if cmd.open_only {
982 reports
983 .into_iter()
984 .filter(|r| r.order_status.is_open())
985 .collect()
986 } else {
987 reports
988 };
989
990 let reports = match (cmd.start, cmd.end) {
992 (Some(start), Some(end)) => reports
993 .into_iter()
994 .filter(|r| r.ts_last >= start && r.ts_last <= end)
995 .collect(),
996 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
997 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
998 (None, None) => reports,
999 };
1000
1001 log::info!("Generated {} order status reports", reports.len());
1002 Ok(reports)
1003 }
1004
1005 async fn generate_fill_reports(
1006 &self,
1007 cmd: GenerateFillReports,
1008 ) -> anyhow::Result<Vec<FillReport>> {
1009 let account_address = self.get_account_address()?;
1010
1011 let reports = self
1012 .http_client
1013 .request_fill_reports(&account_address, cmd.instrument_id)
1014 .await
1015 .context("failed to generate fill reports")?;
1016
1017 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1019 reports
1020 .into_iter()
1021 .filter(|r| r.ts_event >= start && r.ts_event <= end)
1022 .collect()
1023 } else if let Some(start) = cmd.start {
1024 reports
1025 .into_iter()
1026 .filter(|r| r.ts_event >= start)
1027 .collect()
1028 } else if let Some(end) = cmd.end {
1029 reports.into_iter().filter(|r| r.ts_event <= end).collect()
1030 } else {
1031 reports
1032 };
1033
1034 log::info!("Generated {} fill reports", reports.len());
1035 Ok(reports)
1036 }
1037
1038 async fn generate_position_status_reports(
1039 &self,
1040 cmd: &GeneratePositionStatusReports,
1041 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1042 let account_address = self.get_account_address()?;
1043
1044 let reports = self
1045 .http_client
1046 .request_position_status_reports(&account_address, cmd.instrument_id)
1047 .await
1048 .context("failed to generate position status reports")?;
1049
1050 log::info!("Generated {} position status reports", reports.len());
1051 Ok(reports)
1052 }
1053
1054 async fn generate_mass_status(
1055 &self,
1056 lookback_mins: Option<u64>,
1057 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1058 let ts_init = self.clock.get_time_ns();
1059
1060 let order_cmd = GenerateOrderStatusReports::new(
1061 UUID4::new(),
1062 ts_init,
1063 true, None,
1065 None,
1066 None,
1067 None,
1068 None,
1069 );
1070 let fill_cmd =
1071 GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1072 let position_cmd =
1073 GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1074
1075 let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1076 let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1077 let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1078
1079 if let Some(mins) = lookback_mins {
1082 let cutoff_ns = ts_init
1083 .as_u64()
1084 .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1085 let cutoff = UnixNanos::from(cutoff_ns);
1086
1087 fill_reports.retain(|r| r.ts_event >= cutoff);
1088 }
1089
1090 let mut mass_status = ExecutionMassStatus::new(
1091 self.core.client_id,
1092 self.core.account_id,
1093 self.core.venue,
1094 ts_init,
1095 None,
1096 );
1097 mass_status.add_order_reports(order_reports);
1098 mass_status.add_fill_reports(fill_reports);
1099 mass_status.add_position_reports(position_reports);
1100
1101 log::info!(
1102 "Generated mass status: {} orders, {} fills, {} positions",
1103 mass_status.order_reports().len(),
1104 mass_status.fill_reports().len(),
1105 mass_status.position_reports().len(),
1106 );
1107
1108 Ok(Some(mass_status))
1109 }
1110}
1111
1112impl HyperliquidExecutionClient {
1113 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1114 {
1115 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1116 if handle_guard.is_some() {
1117 return Ok(());
1118 }
1119 }
1120
1121 let user_address = self.get_user_address()?;
1122
1123 let subscription_address = self
1126 .config
1127 .vault_address
1128 .as_ref()
1129 .unwrap_or(&user_address)
1130 .clone();
1131
1132 let mut ws_client = self.ws_client.clone();
1133
1134 let instruments = self
1135 .http_client
1136 .request_instruments()
1137 .await
1138 .unwrap_or_default();
1139
1140 for instrument in instruments {
1141 ws_client.cache_instrument(instrument);
1142 }
1143
1144 ws_client.connect().await?;
1146 ws_client
1147 .subscribe_order_updates(&subscription_address)
1148 .await?;
1149 ws_client
1150 .subscribe_user_events(&subscription_address)
1151 .await?;
1152 log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1153
1154 if let Some(handle) = ws_client.take_task_handle() {
1156 self.ws_client.set_task_handle(handle);
1157 }
1158
1159 let emitter = self.emitter.clone();
1160 let runtime = get_runtime();
1161 let handle = runtime.spawn(async move {
1162 let mut pending_filled: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1167
1168 loop {
1169 let event = ws_client.next_event().await;
1170
1171 match event {
1172 Some(msg) => {
1173 match msg {
1174 NautilusWsMessage::ExecutionReports(reports) => {
1175 let mut immediate_cleanup: Vec<ClientOrderId> = Vec::new();
1176
1177 for report in &reports {
1178 if let ExecutionReport::Order(order_report) = report
1179 && let Some(id) = order_report.client_order_id
1180 && !order_report.order_status.is_open()
1181 {
1182 if order_report.order_status == OrderStatus::Filled {
1183 pending_filled.add(id);
1184 } else {
1185 immediate_cleanup.push(id);
1186 }
1187 }
1188 }
1189
1190 for report in &reports {
1191 if let ExecutionReport::Fill(fill_report) = report
1192 && let Some(id) = fill_report.client_order_id
1193 && pending_filled.contains(&id)
1194 {
1195 pending_filled.remove(&id);
1196 immediate_cleanup.push(id);
1197 }
1198 }
1199
1200 for report in reports {
1201 match report {
1202 ExecutionReport::Order(r) => {
1203 emitter.send_order_status_report(r);
1204 }
1205 ExecutionReport::Fill(r) => {
1206 emitter.send_fill_report(r);
1207 }
1208 }
1209 }
1210
1211 for id in immediate_cleanup {
1212 let cloid = Cloid::from_client_order_id(id);
1213 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1214 }
1215 }
1216 NautilusWsMessage::Reconnected => {}
1219 NautilusWsMessage::Error(e) => {
1220 log::error!("WebSocket error: {e}");
1221 }
1222 NautilusWsMessage::Trades(_)
1224 | NautilusWsMessage::Quote(_)
1225 | NautilusWsMessage::Deltas(_)
1226 | NautilusWsMessage::Candle(_)
1227 | NautilusWsMessage::MarkPrice(_)
1228 | NautilusWsMessage::IndexPrice(_)
1229 | NautilusWsMessage::FundingRate(_) => {}
1230 }
1231 }
1232 None => {
1233 log::warn!("WebSocket next_event returned None");
1234 break;
1235 }
1236 }
1237 }
1238 });
1239
1240 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1241 log::info!("Hyperliquid WebSocket execution stream started");
1242 Ok(())
1243 }
1244}