1use std::{
19 str::FromStr,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use nautilus_common::{
27 cache::fifo::FifoCache,
28 clients::ExecutionClient,
29 live::{runner::get_exec_event_sender, runtime::get_runtime},
30 messages::execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 MUTEX_POISONED, UUID4, UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
44 identifiers::{AccountId, ClientId, ClientOrderId, Venue},
45 orders::{Order, any::OrderAny},
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50use ustr::Ustr;
51
52use crate::{
53 common::{
54 consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_ADDRESS},
55 credential::Secrets,
56 parse::{
57 clamp_price_to_precision, client_order_id_to_cancel_request_with_asset,
58 derive_limit_from_trigger, derive_market_order_price, extract_error_message,
59 extract_inner_error, extract_inner_errors, normalize_price,
60 order_to_hyperliquid_request_with_asset, parse_account_balances_and_margins,
61 round_to_sig_figs,
62 },
63 },
64 config::HyperliquidExecClientConfig,
65 http::{
66 client::HyperliquidHttpClient,
67 models::{
68 ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
69 HyperliquidExecGrouping, HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind,
70 },
71 },
72 websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
73};
74
75#[derive(Debug)]
76pub struct HyperliquidExecutionClient {
77 core: ExecutionClientCore,
78 clock: &'static AtomicTime,
79 config: HyperliquidExecClientConfig,
80 emitter: ExecutionEventEmitter,
81 http_client: HyperliquidHttpClient,
82 ws_client: HyperliquidWebSocketClient,
83 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
84 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
85}
86
87impl HyperliquidExecutionClient {
88 pub fn config(&self) -> &HyperliquidExecClientConfig {
90 &self.config
91 }
92
93 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
94 let instrument_id = order.instrument_id();
97 let symbol = instrument_id.symbol.as_str();
98 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
99 anyhow::bail!(
100 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
101 );
102 }
103
104 match order.order_type() {
106 OrderType::Market
107 | OrderType::Limit
108 | OrderType::StopMarket
109 | OrderType::StopLimit
110 | OrderType::MarketIfTouched
111 | OrderType::LimitIfTouched => {}
112 _ => anyhow::bail!(
113 "Unsupported order type for Hyperliquid: {:?}",
114 order.order_type()
115 ),
116 }
117
118 if matches!(
120 order.order_type(),
121 OrderType::StopMarket
122 | OrderType::StopLimit
123 | OrderType::MarketIfTouched
124 | OrderType::LimitIfTouched
125 ) && order.trigger_price().is_none()
126 {
127 anyhow::bail!(
128 "Conditional orders require a trigger price for Hyperliquid: {:?}",
129 order.order_type()
130 );
131 }
132
133 if matches!(
135 order.order_type(),
136 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
137 ) && order.price().is_none()
138 {
139 anyhow::bail!(
140 "Limit orders require a limit price for Hyperliquid: {:?}",
141 order.order_type()
142 );
143 }
144
145 Ok(())
146 }
147
148 pub fn new(
154 core: ExecutionClientCore,
155 config: HyperliquidExecClientConfig,
156 ) -> anyhow::Result<Self> {
157 let secrets = Secrets::resolve(
158 config.private_key.as_deref(),
159 config.vault_address.as_deref(),
160 config.is_testnet,
161 )
162 .context("Hyperliquid execution client requires private key")?;
163
164 let mut http_client = HyperliquidHttpClient::with_secrets(
165 &secrets,
166 config.http_timeout_secs,
167 config.http_proxy_url.clone(),
168 )
169 .context("failed to create Hyperliquid HTTP client")?;
170
171 http_client.set_account_id(core.account_id);
172 http_client.set_account_address(config.account_address.clone());
173
174 if let Some(url) = &config.base_url_http {
176 http_client.set_base_info_url(url.clone());
177 }
178
179 if let Some(url) = &config.base_url_exchange {
180 http_client.set_base_exchange_url(url.clone());
181 }
182
183 let ws_url = config.base_url_ws.clone();
184 let ws_client =
185 HyperliquidWebSocketClient::new(ws_url, config.is_testnet, Some(core.account_id));
186
187 let clock = get_atomic_clock_realtime();
188 let emitter = ExecutionEventEmitter::new(
189 clock,
190 core.trader_id,
191 core.account_id,
192 AccountType::Margin,
193 None,
194 );
195
196 Ok(Self {
197 core,
198 clock,
199 config,
200 emitter,
201 http_client,
202 ws_client,
203 pending_tasks: Mutex::new(Vec::new()),
204 ws_stream_handle: Mutex::new(None),
205 })
206 }
207
208 async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
209 if self.core.instruments_initialized() {
210 return Ok(());
211 }
212
213 let instruments = self
214 .http_client
215 .request_instruments()
216 .await
217 .context("failed to request Hyperliquid instruments")?;
218
219 if instruments.is_empty() {
220 log::warn!(
221 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
222 );
223 } else {
224 log::info!("Initialized {} instruments", instruments.len());
225
226 for instrument in &instruments {
227 self.http_client.cache_instrument(instrument);
228 }
229 }
230
231 self.core.set_instruments_initialized();
232 Ok(())
233 }
234
235 async fn refresh_account_state(&self) -> anyhow::Result<()> {
236 let account_address = self.get_account_address()?;
237
238 let clearinghouse_state = self
239 .http_client
240 .info_clearinghouse_state(&account_address)
241 .await
242 .context("failed to fetch clearinghouse state")?;
243
244 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
246 .context("failed to deserialize clearinghouse state")?;
247
248 log::debug!(
249 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
250 state.cross_margin_summary,
251 state.asset_positions.len()
252 );
253
254 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
256 let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
257 .context("failed to parse account balances and margins")?;
258
259 let ts_event = self.clock.get_time_ns();
261 self.emitter
262 .emit_account_state(balances, margins, true, ts_event);
263
264 log::info!("Account state updated successfully");
265 } else {
266 log::warn!("No cross margin summary in clearinghouse state");
267 }
268
269 Ok(())
270 }
271
272 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
273 let account_id = self.core.account_id;
274
275 if self.core.cache().account(&account_id).is_some() {
276 log::info!("Account {account_id} registered");
277 return Ok(());
278 }
279
280 let start = Instant::now();
281 let timeout = Duration::from_secs_f64(timeout_secs);
282 let interval = Duration::from_millis(10);
283
284 loop {
285 tokio::time::sleep(interval).await;
286
287 if self.core.cache().account(&account_id).is_some() {
288 log::info!("Account {account_id} registered");
289 return Ok(());
290 }
291
292 if start.elapsed() >= timeout {
293 anyhow::bail!(
294 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
295 );
296 }
297 }
298 }
299
300 fn get_user_address(&self) -> anyhow::Result<String> {
301 self.http_client
302 .get_user_address()
303 .context("failed to get user address from HTTP client")
304 }
305
306 fn get_account_address(&self) -> anyhow::Result<String> {
307 if let Some(addr) = &self.config.account_address {
308 return Ok(addr.clone());
309 }
310 match &self.config.vault_address {
311 Some(vault) => Ok(vault.clone()),
312 None => self.get_user_address(),
313 }
314 }
315
316 fn spawn_task<F>(&self, description: &'static str, fut: F)
317 where
318 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
319 {
320 let runtime = get_runtime();
321 let handle = runtime.spawn(async move {
322 if let Err(e) = fut.await {
323 log::warn!("{description} failed: {e:?}");
324 }
325 });
326
327 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
328 tasks.retain(|handle| !handle.is_finished());
329 tasks.push(handle);
330 }
331
332 fn abort_pending_tasks(&self) {
333 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
334 for handle in tasks.drain(..) {
335 handle.abort();
336 }
337 }
338}
339
340#[async_trait(?Send)]
341impl ExecutionClient for HyperliquidExecutionClient {
342 fn is_connected(&self) -> bool {
343 self.core.is_connected()
344 }
345
346 fn client_id(&self) -> ClientId {
347 self.core.client_id
348 }
349
350 fn account_id(&self) -> AccountId {
351 self.core.account_id
352 }
353
354 fn venue(&self) -> Venue {
355 *HYPERLIQUID_VENUE
356 }
357
358 fn oms_type(&self) -> OmsType {
359 self.core.oms_type
360 }
361
362 fn get_account(&self) -> Option<AccountAny> {
363 self.core.cache().account(&self.core.account_id).cloned()
364 }
365
366 fn generate_account_state(
367 &self,
368 balances: Vec<AccountBalance>,
369 margins: Vec<MarginBalance>,
370 reported: bool,
371 ts_event: UnixNanos,
372 ) -> anyhow::Result<()> {
373 self.emitter
374 .emit_account_state(balances, margins, reported, ts_event);
375 Ok(())
376 }
377
378 fn start(&mut self) -> anyhow::Result<()> {
379 if self.core.is_started() {
380 return Ok(());
381 }
382
383 let sender = get_exec_event_sender();
384 self.emitter.set_sender(sender);
385 self.core.set_started();
386
387 log::info!(
388 "Started: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
389 self.core.client_id,
390 self.core.account_id,
391 self.config.is_testnet,
392 self.config.vault_address,
393 self.config.http_proxy_url,
394 self.config.ws_proxy_url,
395 );
396
397 Ok(())
398 }
399
400 fn stop(&mut self) -> anyhow::Result<()> {
401 if self.core.is_stopped() {
402 return Ok(());
403 }
404
405 log::info!("Stopping Hyperliquid execution client");
406
407 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
408 handle.abort();
409 }
410
411 self.abort_pending_tasks();
412 self.ws_client.abort();
413
414 self.core.set_disconnected();
415 self.core.set_stopped();
416
417 log::info!("Hyperliquid execution client stopped");
418 Ok(())
419 }
420
421 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
422 let order = self
423 .core
424 .cache()
425 .order(&cmd.client_order_id)
426 .cloned()
427 .ok_or_else(|| {
428 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
429 })?;
430
431 if order.is_closed() {
432 log::warn!("Cannot submit closed order {}", order.client_order_id());
433 return Ok(());
434 }
435
436 if let Err(e) = self.validate_order_submission(&order) {
437 self.emitter
438 .emit_order_denied(&order, &format!("Validation failed: {e}"));
439 return Err(e);
440 }
441
442 let http_client = self.http_client.clone();
443 let symbol = order.instrument_id().symbol.to_string();
444
445 let asset = match http_client.get_asset_index(&symbol) {
447 Some(a) => a,
448 None => {
449 self.emitter
450 .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
451 return Ok(());
452 }
453 };
454
455 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
457 let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset(
458 &order,
459 asset,
460 price_decimals,
461 self.config.normalize_prices,
462 ) {
463 Ok(req) => req,
464 Err(e) => {
465 self.emitter
466 .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
467 return Ok(());
468 }
469 };
470
471 if order.order_type() == OrderType::Market {
473 let instrument_id = order.instrument_id();
474 let cache = self.core.cache();
475 match cache.quote(&instrument_id) {
476 Some(quote) => {
477 let is_buy = order.order_side() == OrderSide::Buy;
478 hyperliquid_order.price =
479 derive_market_order_price(quote, is_buy, price_decimals);
480 }
481 None => {
482 self.emitter.emit_order_denied(
483 &order,
484 &format!(
485 "No cached quote for {instrument_id}: \
486 subscribe to quote data before submitting market orders"
487 ),
488 );
489 return Ok(());
490 }
491 }
492 }
493
494 log::info!(
495 "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
496 order.client_order_id(),
497 order.order_type(),
498 order.order_side(),
499 hyperliquid_order.price,
500 hyperliquid_order.size,
501 hyperliquid_order.kind,
502 );
503
504 let cloid = Cloid::from_client_order_id(order.client_order_id());
507 self.ws_client
508 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
509
510 self.emitter.emit_order_submitted(&order);
511
512 let emitter = self.emitter.clone();
513 let clock = self.clock;
514 let ws_client = self.ws_client.clone();
515 let cloid_hex = Ustr::from(&cloid.to_hex());
516
517 let builder = if self.http_client.has_vault_address() {
520 None
521 } else {
522 Some(HyperliquidExecBuilderFee {
523 address: NAUTILUS_BUILDER_ADDRESS.to_string(),
524 fee_tenths_bp: 0,
525 })
526 };
527
528 self.spawn_task("submit_order", async move {
529 let action = HyperliquidExecAction::Order {
530 orders: vec![hyperliquid_order],
531 grouping: HyperliquidExecGrouping::Na,
532 builder,
533 };
534
535 match http_client.post_action_exec(&action).await {
536 Ok(response) => {
537 if response.is_ok() {
538 if let Some(inner_error) = extract_inner_error(&response) {
539 log::warn!("Order submission rejected by exchange: {inner_error}");
540 let ts = clock.get_time_ns();
541 emitter.emit_order_rejected(&order, &inner_error, ts, false);
542 ws_client.remove_cloid_mapping(&cloid_hex);
543 } else {
544 log::info!("Order submitted successfully: {response:?}");
545 }
546 } else {
547 let error_msg = extract_error_message(&response);
548 log::warn!("Order submission rejected by exchange: {error_msg}");
549 let ts = clock.get_time_ns();
550 emitter.emit_order_rejected(&order, &error_msg, ts, false);
551 ws_client.remove_cloid_mapping(&cloid_hex);
552 }
553 }
554 Err(e) => {
555 log::error!("Order submission HTTP request failed: {e}");
559 }
560 }
561
562 Ok(())
563 });
564
565 Ok(())
566 }
567
568 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
569 log::debug!(
570 "Submitting order list with {} orders",
571 cmd.order_list.client_order_ids.len()
572 );
573
574 let http_client = self.http_client.clone();
575
576 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
577
578 let mut valid_orders = Vec::new();
580 let mut hyperliquid_orders = Vec::new();
581
582 for order in &orders {
583 let symbol = order.instrument_id().symbol.to_string();
584 let asset = match http_client.get_asset_index(&symbol) {
585 Some(a) => a,
586 None => {
587 self.emitter
588 .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
589 continue;
590 }
591 };
592
593 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
594
595 match order_to_hyperliquid_request_with_asset(
596 order,
597 asset,
598 price_decimals,
599 self.config.normalize_prices,
600 ) {
601 Ok(req) => {
602 hyperliquid_orders.push(req);
603 valid_orders.push(order.clone());
604 }
605 Err(e) => {
606 self.emitter
607 .emit_order_denied(order, &format!("Order conversion failed: {e}"));
608 }
609 }
610 }
611
612 if valid_orders.is_empty() {
613 log::warn!("No valid orders to submit in order list");
614 return Ok(());
615 }
616
617 for order in &valid_orders {
618 let cloid = Cloid::from_client_order_id(order.client_order_id());
619 self.ws_client
620 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
621 self.emitter.emit_order_submitted(order);
622 }
623
624 let emitter = self.emitter.clone();
625 let clock = self.clock;
626 let ws_client = self.ws_client.clone();
627 let cloid_hexes: Vec<Ustr> = valid_orders
628 .iter()
629 .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
630 .collect();
631
632 let builder = if self.http_client.has_vault_address() {
633 None
634 } else {
635 Some(HyperliquidExecBuilderFee {
636 address: NAUTILUS_BUILDER_ADDRESS.to_string(),
637 fee_tenths_bp: 0,
638 })
639 };
640
641 self.spawn_task("submit_order_list", async move {
642 let action = HyperliquidExecAction::Order {
643 orders: hyperliquid_orders,
644 grouping: HyperliquidExecGrouping::Na,
645 builder,
646 };
647 match http_client.post_action_exec(&action).await {
648 Ok(response) => {
649 if response.is_ok() {
650 let inner_errors = extract_inner_errors(&response);
651 if inner_errors.iter().any(|e| e.is_some()) {
652 let ts = clock.get_time_ns();
653 for (i, error) in inner_errors.iter().enumerate() {
654 if let Some(error_msg) = error {
655 if let Some(order) = valid_orders.get(i) {
656 log::warn!(
657 "Order {} rejected by exchange: {error_msg}",
658 order.client_order_id(),
659 );
660 emitter.emit_order_rejected(order, error_msg, ts, false);
661 }
662
663 if let Some(cloid_hex) = cloid_hexes.get(i) {
664 ws_client.remove_cloid_mapping(cloid_hex);
665 }
666 }
667 }
668 } else {
669 log::info!("Order list submitted successfully: {response:?}");
670 }
671 } else {
672 let error_msg = extract_error_message(&response);
673 log::warn!("Order list submission rejected by exchange: {error_msg}");
674 let ts = clock.get_time_ns();
675 for order in &valid_orders {
676 emitter.emit_order_rejected(order, &error_msg, ts, false);
677 }
678 for cloid_hex in &cloid_hexes {
679 ws_client.remove_cloid_mapping(cloid_hex);
680 }
681 }
682 }
683 Err(e) => {
684 log::error!("Order list submission HTTP request failed: {e}");
688 }
689 }
690
691 Ok(())
692 });
693
694 Ok(())
695 }
696
697 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
698 log::debug!("Modifying order: {cmd:?}");
699
700 let venue_order_id = match cmd.venue_order_id {
701 Some(id) => id,
702 None => {
703 let reason = "venue_order_id is required for modify";
704 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
705 self.emitter.emit_order_modify_rejected_event(
706 cmd.strategy_id,
707 cmd.instrument_id,
708 cmd.client_order_id,
709 None,
710 reason,
711 self.clock.get_time_ns(),
712 );
713 return Ok(());
714 }
715 };
716
717 let oid: u64 = match venue_order_id.as_str().parse() {
718 Ok(id) => id,
719 Err(e) => {
720 let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
721 log::warn!("{reason}");
722 self.emitter.emit_order_modify_rejected_event(
723 cmd.strategy_id,
724 cmd.instrument_id,
725 cmd.client_order_id,
726 Some(venue_order_id),
727 &reason,
728 self.clock.get_time_ns(),
729 );
730 return Ok(());
731 }
732 };
733
734 let order = match self.core.cache().order(&cmd.client_order_id).cloned() {
736 Some(o) => o,
737 None => {
738 let reason = "order not found in cache";
739 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
740 self.emitter.emit_order_modify_rejected_event(
741 cmd.strategy_id,
742 cmd.instrument_id,
743 cmd.client_order_id,
744 Some(venue_order_id),
745 reason,
746 self.clock.get_time_ns(),
747 );
748 return Ok(());
749 }
750 };
751
752 let http_client = self.http_client.clone();
753 let symbol = cmd.instrument_id.symbol.to_string();
754 let should_normalize = self.config.normalize_prices;
755
756 let quantity = cmd.quantity.unwrap_or(order.leaves_qty());
757 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
758 let asset = match http_client.get_asset_index(&symbol) {
759 Some(a) => a,
760 None => {
761 log::warn!(
762 "Asset index not found for symbol {symbol}, ensure instruments are loaded",
763 );
764 return Ok(());
765 }
766 };
767
768 let hyperliquid_order = match order_to_hyperliquid_request_with_asset(
771 &order,
772 asset,
773 price_decimals,
774 should_normalize,
775 ) {
776 Ok(mut req) => {
777 if let Some(p) = cmd.price.or(order.price()) {
779 let price_dec = p.as_decimal();
780 req.price = if should_normalize {
781 normalize_price(price_dec, price_decimals).normalize()
782 } else {
783 price_dec.normalize()
784 };
785 } else if let Some(tp) = cmd.trigger_price {
786 let is_buy = order.order_side() == OrderSide::Buy;
789 let base = tp.as_decimal().normalize();
790 let derived = derive_limit_from_trigger(base, is_buy);
791 let sig_rounded = round_to_sig_figs(derived, 5);
792 req.price =
793 clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
794 }
795 req.size = quantity.as_decimal().normalize();
798
799 if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
801 (cmd.trigger_price, &mut req.kind)
802 {
803 let tp_dec = tp.as_decimal();
804 trigger.trigger_px = if should_normalize {
805 normalize_price(tp_dec, price_decimals).normalize()
806 } else {
807 tp_dec.normalize()
808 };
809 }
810
811 req
812 }
813 Err(e) => {
814 log::warn!("Order conversion failed for modify: {e}");
815 return Ok(());
816 }
817 };
818
819 self.spawn_task("modify_order", async move {
820 let action = HyperliquidExecAction::Modify {
821 modify: HyperliquidExecModifyOrderRequest {
822 oid,
823 order: hyperliquid_order,
824 },
825 };
826
827 match http_client.post_action_exec(&action).await {
828 Ok(response) => {
829 if response.is_ok() {
830 if let Some(inner_error) = extract_inner_error(&response) {
831 log::warn!("Order modification rejected by exchange: {inner_error}");
832 } else {
833 log::info!("Order modified successfully: {response:?}");
834 }
835 } else {
836 let error_msg = extract_error_message(&response);
837 log::warn!("Order modification rejected by exchange: {error_msg}");
838 }
839 }
840 Err(e) => {
841 log::warn!("Order modification HTTP request failed: {e}");
842 }
843 }
844
845 Ok(())
846 });
847
848 Ok(())
849 }
850
851 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
852 log::debug!("Cancelling order: {cmd:?}");
853
854 let http_client = self.http_client.clone();
855 let emitter = self.emitter.clone();
856 let clock = self.clock;
857 let client_order_id = cmd.client_order_id;
858 let client_order_id_str = cmd.client_order_id.to_string();
859 let strategy_id = cmd.strategy_id;
860 let instrument_id = cmd.instrument_id;
861 let venue_order_id = cmd.venue_order_id;
862 let symbol = cmd.instrument_id.symbol.to_string();
863
864 self.spawn_task("cancel_order", async move {
865 let asset = match http_client.get_asset_index(&symbol) {
866 Some(a) => a,
867 None => {
868 emitter.emit_order_cancel_rejected_event(
869 strategy_id,
870 instrument_id,
871 client_order_id,
872 venue_order_id,
873 &format!("Asset index not found for symbol {symbol}"),
874 clock.get_time_ns(),
875 );
876 return Ok(());
877 }
878 };
879
880 let cancel_request =
881 client_order_id_to_cancel_request_with_asset(&client_order_id_str, asset);
882 let action = HyperliquidExecAction::CancelByCloid {
883 cancels: vec![cancel_request],
884 };
885
886 match http_client.post_action_exec(&action).await {
887 Ok(response) => {
888 if response.is_ok() {
889 if let Some(inner_error) = extract_inner_error(&response) {
890 emitter.emit_order_cancel_rejected_event(
891 strategy_id,
892 instrument_id,
893 client_order_id,
894 venue_order_id,
895 &inner_error,
896 clock.get_time_ns(),
897 );
898 } else {
899 log::info!("Order cancelled successfully: {response:?}");
900 }
901 } else {
902 emitter.emit_order_cancel_rejected_event(
903 strategy_id,
904 instrument_id,
905 client_order_id,
906 venue_order_id,
907 &extract_error_message(&response),
908 clock.get_time_ns(),
909 );
910 }
911 }
912 Err(e) => {
913 emitter.emit_order_cancel_rejected_event(
914 strategy_id,
915 instrument_id,
916 client_order_id,
917 venue_order_id,
918 &format!("Cancel HTTP request failed: {e}"),
919 clock.get_time_ns(),
920 );
921 }
922 }
923
924 Ok(())
925 });
926
927 Ok(())
928 }
929
930 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
931 log::debug!("Cancelling all orders: {cmd:?}");
932
933 let cache = self.core.cache();
934 let open_orders = cache.orders_open(
935 Some(&self.core.venue),
936 Some(&cmd.instrument_id),
937 None,
938 None,
939 Some(cmd.order_side),
940 );
941
942 if open_orders.is_empty() {
943 log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
944 return Ok(());
945 }
946
947 let symbol = cmd.instrument_id.symbol.to_string();
948 let client_order_ids: Vec<String> = open_orders
949 .iter()
950 .map(|o| o.client_order_id().to_string())
951 .collect();
952
953 let http_client = self.http_client.clone();
954
955 self.spawn_task("cancel_all_orders", async move {
956 let asset = match http_client.get_asset_index(&symbol) {
957 Some(a) => a,
958 None => {
959 log::warn!("Asset index not found for symbol {symbol}");
960 return Ok(());
961 }
962 };
963
964 let cancel_requests: Vec<_> = client_order_ids
965 .iter()
966 .map(|id| client_order_id_to_cancel_request_with_asset(id, asset))
967 .collect();
968
969 if cancel_requests.is_empty() {
970 return Ok(());
971 }
972
973 let action = HyperliquidExecAction::CancelByCloid {
974 cancels: cancel_requests,
975 };
976
977 if let Err(e) = http_client.post_action_exec(&action).await {
978 log::warn!("Cancel all orders request failed: {e}");
979 }
980
981 Ok(())
982 });
983
984 Ok(())
985 }
986
987 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
988 log::debug!("Batch cancelling orders: {cmd:?}");
989
990 if cmd.cancels.is_empty() {
991 log::debug!("No orders to cancel in batch");
992 return Ok(());
993 }
994
995 let cancel_info: Vec<(String, String)> = cmd
996 .cancels
997 .iter()
998 .map(|c| {
999 (
1000 c.client_order_id.to_string(),
1001 c.instrument_id.symbol.to_string(),
1002 )
1003 })
1004 .collect();
1005
1006 let http_client = self.http_client.clone();
1007
1008 self.spawn_task("batch_cancel_orders", async move {
1009 let mut cancel_requests = Vec::new();
1010
1011 for (client_order_id, symbol) in &cancel_info {
1012 let asset = match http_client.get_asset_index(symbol) {
1013 Some(a) => a,
1014 None => {
1015 log::warn!("Asset index not found for symbol {symbol}, skipping cancel");
1016 continue;
1017 }
1018 };
1019 cancel_requests.push(client_order_id_to_cancel_request_with_asset(
1020 client_order_id,
1021 asset,
1022 ));
1023 }
1024
1025 if cancel_requests.is_empty() {
1026 log::warn!("No valid cancel requests in batch");
1027 return Ok(());
1028 }
1029
1030 let action = HyperliquidExecAction::CancelByCloid {
1031 cancels: cancel_requests,
1032 };
1033
1034 if let Err(e) = http_client.post_action_exec(&action).await {
1035 log::warn!("Batch cancel request failed: {e}");
1036 }
1037
1038 Ok(())
1039 });
1040
1041 Ok(())
1042 }
1043
1044 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1045 let http_client = self.http_client.clone();
1046 let account_address = self.get_account_address()?;
1047 let emitter = self.emitter.clone();
1048 let clock = self.clock;
1049
1050 self.spawn_task("query_account", async move {
1051 let clearinghouse_state = http_client
1052 .info_clearinghouse_state(&account_address)
1053 .await
1054 .context("failed to fetch clearinghouse state")?;
1055
1056 let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
1057 .context("failed to deserialize clearinghouse state")?;
1058
1059 if let Some(ref cross_margin_summary) = state.cross_margin_summary {
1060 let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
1061 .context("failed to parse account balances and margins")?;
1062 let ts_event = clock.get_time_ns();
1063 emitter.emit_account_state(balances, margins, true, ts_event);
1064 } else {
1065 log::warn!("No cross margin summary in clearinghouse state");
1066 }
1067
1068 Ok(())
1069 });
1070
1071 Ok(())
1072 }
1073
1074 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
1075 log::debug!("Querying order: {cmd:?}");
1076
1077 let cache = self.core.cache();
1078 let venue_order_id = cache.venue_order_id(&cmd.client_order_id);
1079
1080 let venue_order_id = match venue_order_id {
1081 Some(oid) => *oid,
1082 None => {
1083 log::warn!(
1084 "No venue order ID found for client order {}",
1085 cmd.client_order_id
1086 );
1087 return Ok(());
1088 }
1089 };
1090 drop(cache);
1091
1092 let oid = match u64::from_str(venue_order_id.as_ref()) {
1093 Ok(id) => id,
1094 Err(e) => {
1095 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1096 return Ok(());
1097 }
1098 };
1099
1100 let account_address = self.get_account_address()?;
1101
1102 let http_client = self.http_client.clone();
1106 let runtime = get_runtime();
1107 runtime.spawn(async move {
1108 match http_client.info_order_status(&account_address, oid).await {
1109 Ok(status) => {
1110 log::debug!("Order status for oid {oid}: {status:?}");
1111 }
1112 Err(e) => {
1113 log::warn!("Failed to query order status for oid {oid}: {e}");
1114 }
1115 }
1116 });
1117
1118 Ok(())
1119 }
1120
1121 async fn connect(&mut self) -> anyhow::Result<()> {
1122 if self.core.is_connected() {
1123 return Ok(());
1124 }
1125
1126 log::info!("Connecting Hyperliquid execution client");
1127
1128 self.ensure_instruments_initialized_async().await?;
1130
1131 self.start_ws_stream().await?;
1133
1134 let post_ws = async {
1136 self.refresh_account_state().await?;
1137 self.await_account_registered(30.0).await?;
1138
1139 Ok::<(), anyhow::Error>(())
1140 };
1141
1142 if let Err(e) = post_ws.await {
1143 log::warn!("Connect failed after WS started, tearing down: {e}");
1144 let _ = self.ws_client.disconnect().await;
1145 self.abort_pending_tasks();
1146 return Err(e);
1147 }
1148
1149 self.core.set_connected();
1150
1151 log::info!("Connected: client_id={}", self.core.client_id);
1152 Ok(())
1153 }
1154
1155 async fn disconnect(&mut self) -> anyhow::Result<()> {
1156 if self.core.is_disconnected() {
1157 return Ok(());
1158 }
1159
1160 log::info!("Disconnecting Hyperliquid execution client");
1161
1162 self.ws_client.disconnect().await?;
1164
1165 self.abort_pending_tasks();
1167
1168 self.core.set_disconnected();
1169
1170 log::info!("Disconnected: client_id={}", self.core.client_id);
1171 Ok(())
1172 }
1173
1174 async fn generate_order_status_report(
1175 &self,
1176 cmd: &GenerateOrderStatusReport,
1177 ) -> anyhow::Result<Option<OrderStatusReport>> {
1178 let account_address = self.get_account_address()?;
1179
1180 if let Some(venue_order_id) = &cmd.venue_order_id {
1181 let oid: u64 = venue_order_id
1182 .as_str()
1183 .parse()
1184 .context("failed to parse venue_order_id as oid")?;
1185
1186 let report = self
1187 .http_client
1188 .request_order_status_report(&account_address, oid)
1189 .await
1190 .context("failed to generate order status report")?;
1191
1192 if let Some(mut report) = report {
1193 if let Some(coid) = &cmd.client_order_id {
1194 report.client_order_id = Some(*coid);
1195 }
1196 log::info!("Generated order status report for oid {oid}");
1197 return Ok(Some(report));
1198 }
1199
1200 log::info!("No order status report found for oid {oid}");
1201 return Ok(None);
1202 }
1203
1204 if let Some(client_order_id) = &cmd.client_order_id {
1205 let cached_oid: Option<u64> = self
1208 .core
1209 .cache()
1210 .venue_order_id(client_order_id)
1211 .and_then(|v| v.as_str().parse::<u64>().ok());
1212
1213 if let Some(oid) = cached_oid {
1215 let report = self
1216 .http_client
1217 .request_order_status_report(&account_address, oid)
1218 .await
1219 .context("failed to generate order status report by cached venue_order_id")?;
1220
1221 if let Some(mut report) = report {
1222 report.client_order_id = Some(*client_order_id);
1223 log::info!(
1224 "Generated order status report for {client_order_id} via cached oid {oid}"
1225 );
1226 return Ok(Some(report));
1227 }
1228 }
1229
1230 let report = self
1232 .http_client
1233 .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1234 .await
1235 .context("failed to generate order status report by client_order_id")?;
1236
1237 if report.is_some() {
1238 log::info!("Generated order status report for {client_order_id}");
1239 } else {
1240 log::info!("No order status report found for {client_order_id}");
1241 }
1242 return Ok(report);
1243 }
1244
1245 log::warn!("Cannot generate order status report without venue_order_id or client_order_id");
1246 Ok(None)
1247 }
1248
1249 async fn generate_order_status_reports(
1250 &self,
1251 cmd: &GenerateOrderStatusReports,
1252 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1253 let account_address = self.get_account_address()?;
1254
1255 let reports = self
1256 .http_client
1257 .request_order_status_reports(&account_address, cmd.instrument_id)
1258 .await
1259 .context("failed to generate order status reports")?;
1260
1261 let reports = if cmd.open_only {
1263 reports
1264 .into_iter()
1265 .filter(|r| r.order_status.is_open())
1266 .collect()
1267 } else {
1268 reports
1269 };
1270
1271 let reports = match (cmd.start, cmd.end) {
1273 (Some(start), Some(end)) => reports
1274 .into_iter()
1275 .filter(|r| r.ts_last >= start && r.ts_last <= end)
1276 .collect(),
1277 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1278 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1279 (None, None) => reports,
1280 };
1281
1282 log::info!("Generated {} order status reports", reports.len());
1283 Ok(reports)
1284 }
1285
1286 async fn generate_fill_reports(
1287 &self,
1288 cmd: GenerateFillReports,
1289 ) -> anyhow::Result<Vec<FillReport>> {
1290 let account_address = self.get_account_address()?;
1291
1292 let reports = self
1293 .http_client
1294 .request_fill_reports(&account_address, cmd.instrument_id)
1295 .await
1296 .context("failed to generate fill reports")?;
1297
1298 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1300 reports
1301 .into_iter()
1302 .filter(|r| r.ts_event >= start && r.ts_event <= end)
1303 .collect()
1304 } else if let Some(start) = cmd.start {
1305 reports
1306 .into_iter()
1307 .filter(|r| r.ts_event >= start)
1308 .collect()
1309 } else if let Some(end) = cmd.end {
1310 reports.into_iter().filter(|r| r.ts_event <= end).collect()
1311 } else {
1312 reports
1313 };
1314
1315 log::info!("Generated {} fill reports", reports.len());
1316 Ok(reports)
1317 }
1318
1319 async fn generate_position_status_reports(
1320 &self,
1321 cmd: &GeneratePositionStatusReports,
1322 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1323 let account_address = self.get_account_address()?;
1324
1325 let reports = self
1326 .http_client
1327 .request_position_status_reports(&account_address, cmd.instrument_id)
1328 .await
1329 .context("failed to generate position status reports")?;
1330
1331 log::info!("Generated {} position status reports", reports.len());
1332 Ok(reports)
1333 }
1334
1335 async fn generate_mass_status(
1336 &self,
1337 lookback_mins: Option<u64>,
1338 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1339 let ts_init = self.clock.get_time_ns();
1340
1341 let order_cmd = GenerateOrderStatusReports::new(
1342 UUID4::new(),
1343 ts_init,
1344 true, None,
1346 None,
1347 None,
1348 None,
1349 None,
1350 );
1351 let fill_cmd =
1352 GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1353 let position_cmd =
1354 GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1355
1356 let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1357 let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1358 let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1359
1360 if let Some(mins) = lookback_mins {
1363 let cutoff_ns = ts_init
1364 .as_u64()
1365 .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1366 let cutoff = UnixNanos::from(cutoff_ns);
1367
1368 fill_reports.retain(|r| r.ts_event >= cutoff);
1369 }
1370
1371 let mut mass_status = ExecutionMassStatus::new(
1372 self.core.client_id,
1373 self.core.account_id,
1374 self.core.venue,
1375 ts_init,
1376 None,
1377 );
1378 mass_status.add_order_reports(order_reports);
1379 mass_status.add_fill_reports(fill_reports);
1380 mass_status.add_position_reports(position_reports);
1381
1382 log::info!(
1383 "Generated mass status: {} orders, {} fills, {} positions",
1384 mass_status.order_reports().len(),
1385 mass_status.fill_reports().len(),
1386 mass_status.position_reports().len(),
1387 );
1388
1389 Ok(Some(mass_status))
1390 }
1391}
1392
1393impl HyperliquidExecutionClient {
1394 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1395 {
1396 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1397 if handle_guard.is_some() {
1398 return Ok(());
1399 }
1400 }
1401
1402 let user_address = self.get_user_address()?;
1403
1404 let subscription_address = self
1407 .config
1408 .account_address
1409 .as_ref()
1410 .or(self.config.vault_address.as_ref())
1411 .unwrap_or(&user_address)
1412 .clone();
1413
1414 let mut ws_client = self.ws_client.clone();
1415
1416 let instruments = self
1417 .http_client
1418 .request_instruments()
1419 .await
1420 .unwrap_or_default();
1421
1422 for instrument in instruments {
1423 ws_client.cache_instrument(instrument);
1424 }
1425
1426 ws_client.connect().await?;
1428 ws_client
1429 .subscribe_order_updates(&subscription_address)
1430 .await?;
1431 ws_client
1432 .subscribe_user_events(&subscription_address)
1433 .await?;
1434 log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1435
1436 if let Some(handle) = ws_client.take_task_handle() {
1438 self.ws_client.set_task_handle(handle);
1439 }
1440
1441 let emitter = self.emitter.clone();
1442 let runtime = get_runtime();
1443 let handle = runtime.spawn(async move {
1444 let mut pending_filled: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1449
1450 loop {
1451 let event = ws_client.next_event().await;
1452
1453 match event {
1454 Some(msg) => {
1455 match msg {
1456 NautilusWsMessage::ExecutionReports(reports) => {
1457 let mut immediate_cleanup: Vec<ClientOrderId> = Vec::new();
1458
1459 for report in &reports {
1460 if let ExecutionReport::Order(order_report) = report
1461 && let Some(id) = order_report.client_order_id
1462 && !order_report.order_status.is_open()
1463 {
1464 if order_report.order_status == OrderStatus::Filled {
1465 pending_filled.add(id);
1466 } else {
1467 immediate_cleanup.push(id);
1468 }
1469 }
1470 }
1471
1472 for report in &reports {
1473 if let ExecutionReport::Fill(fill_report) = report
1474 && let Some(id) = fill_report.client_order_id
1475 && pending_filled.contains(&id)
1476 {
1477 pending_filled.remove(&id);
1478 immediate_cleanup.push(id);
1479 }
1480 }
1481
1482 for report in reports {
1483 match report {
1484 ExecutionReport::Order(r) => {
1485 emitter.send_order_status_report(r);
1486 }
1487 ExecutionReport::Fill(r) => {
1488 emitter.send_fill_report(r);
1489 }
1490 }
1491 }
1492
1493 for id in immediate_cleanup {
1494 let cloid = Cloid::from_client_order_id(id);
1495 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1496 }
1497 }
1498 NautilusWsMessage::Reconnected => {}
1501 NautilusWsMessage::Error(e) => {
1502 log::error!("WebSocket error: {e}");
1503 }
1504 NautilusWsMessage::Trades(_)
1506 | NautilusWsMessage::Quote(_)
1507 | NautilusWsMessage::Deltas(_)
1508 | NautilusWsMessage::Candle(_)
1509 | NautilusWsMessage::MarkPrice(_)
1510 | NautilusWsMessage::IndexPrice(_)
1511 | NautilusWsMessage::FundingRate(_) => {}
1512 }
1513 }
1514 None => {
1515 log::debug!("WebSocket next_event returned None, stream closed");
1516 break;
1517 }
1518 }
1519 }
1520 });
1521
1522 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1523 log::info!("Hyperliquid WebSocket execution stream started");
1524 Ok(())
1525 }
1526}